[譯] RxJava 的全面介紹:Observable 類型、背壓、錯誤處理

原文:https://www.ericdecanini.com/2019/01/07/comprehensive-introduction-to-rxjava-observable-types-backpressure-and-error-handling/
作者:https://www.ericdecanini.com/about/

前言

RxJava 是一個不斷更新的工具庫,適用于除 Android 以外的許多平臺的開發(fā)人員(如:RxSwift)。RxJava 最大的優(yōu)勢是 以不使用回調的方式處理異步操作。

相反,ObservablesObservers 結合使用來發(fā)射數(shù)據(jù)(一次或多次),并且還可以通過各自包含的方法來處理每次數(shù)據(jù)發(fā)射時要做的事情。

什么是 Observable 和 Observer

val myObservable = Observable.just(1, 2, 3)
 
val myObserver = myObservable.subscribe { receivedNumber ->
    Log.d(LOG_TAG, "Received Number $receivedNumber")
}

Observable – 發(fā)射數(shù)據(jù)流并與接受改數(shù)據(jù)的 Observer 一起工作的對象。
Observer – 訂閱一個 Observable 讓它開始發(fā)射數(shù)據(jù),并處理接受數(shù)據(jù)時要做的事情。

在上面的例子中,Observable.just(1, 2, 3) 將按照順序發(fā)射整數(shù) 1, 2, 3。 Observer 一旦訂閱 Observable,將以相同的順序接受這些數(shù)字。

Received Number 1
Received Number 2
Received Number 3

Observable 的生命周期

Observable 有兩個重要的方法,來處理傳入的數(shù)據(jù)。

  • onNext - 每當發(fā)射新數(shù)據(jù)時調用,正如你在上面示例中的 lambda 函數(shù)看到的一樣(在 subscribe 之后調用)。

  • onComplete - 在沒有更多數(shù)據(jù)發(fā)射時調用。顧名思義,數(shù)據(jù)流完全發(fā)射完畢。

不同類型的 Observable

最基本的 Observable 會發(fā)射連續(xù)的數(shù)據(jù)流,直到調用 onComplete 為止,這并不總是你想要的。你可能想發(fā)射 單個值,或者發(fā)射一個 無法接受該值的值,亦或是 在執(zhí)行沒有返回值的異步任務后調用其他函數(shù)。

val mySingle = Single.just(1)
val singleObserver = mySingle.subscribe { data ->
    Log.d(LOG_TAG, "Received $data")
}
 
val myMaybe = Maybe.empty<Int>()
val maybeObserver = myMaybe
        .defaultIfEmpty(1)
        .subscribe { data ->
            Log.d(LOG_TAG, "Received $data")
        }
 
val myCompletable = Completable.complete()
val completableObserver = myCompletable
        .subscribe {
            Log.d(LOG_TAG, "Task completed")
        }

Single 僅發(fā)射一個值。onNext 只調用一次,并且onComplete 將立即被調用。

Mayble 發(fā)射一個或零個值,當發(fā)射零個值時,將跳過 onNext 并立即調用 onComplete??梢允褂?defalutIfEmpyty 函數(shù)發(fā)射默認值。

Completable 不發(fā)射任何值,你可以像沒有返回值的回調一樣訂閱它。

Flowables 和背壓

還有一種類型的 Observable,它就是 Flowable。和 Observable 一樣,Flowable 也發(fā)射連續(xù)的數(shù)據(jù)流,直到完成為止。但有一個關鍵的區(qū)別:

想象一下,上游 Observable 數(shù)據(jù)發(fā)射的速度,大于下游 Observer 處理數(shù)據(jù)的速度,這就是 背壓。在大部分情況下,將會導致錯誤發(fā)生。Flowable 是一種包含背壓策略的 Observable,具有當背壓發(fā)生時,如何處理數(shù)據(jù)的能力。

val myFlowable = Observable.range(1, 100000).toFlowable(BackpressureStrategy.DROP)
val flowableObserver = myFlowable.subscribe {data ->
    Log.d(LOG_TAG, "Received $data")
}

有 5 種不同的背壓策略,我們需要了解下:

  • Buffer - 在內存中緩存事件,直到 Observer 可以處理它們。默認情況下,在錯誤發(fā)生之前,緩沖區(qū)的大小為 128 個 items??梢孕薷木彌_區(qū)的大小,但請注意,這將會帶來性能上的開銷。
  • Drop - 丟棄 Observer無法處理的事件。
  • Latest - 僅保留最新發(fā)射的值,直到 Observer 可以使用它并丟棄其它的值。
  • Error - 如果發(fā)生背壓,將拋出異常。
  • Missing - 缺乏背壓策咯,如果你想在客戶端處理背壓,你可以使用它(因為背壓策咯是由 Observable 創(chuàng)建的)。未能說出策咯會在背壓下拋出異常。
Observable vs Flowable

已知的是,當你明確知道發(fā)射的數(shù)據(jù),不會導致發(fā)生背壓。你應當使用 Observable,而不是 Flowable。老實說,我還沒有發(fā)現(xiàn)使用
Flowable 的場景,也許 Flowables 將使用額外的內存?

錯誤處理

沒有任何代碼可以避免錯誤,你已經(jīng)知道,如果不處理背壓將導致錯誤發(fā)生。最重要的是,在 Observersubscribe 方法中,由自己的代碼發(fā)生的異常,都將被視為由Observer 應處理的錯誤。

高興的是,RxJava 包含了幾種處理這些錯誤的方法。

  • doOnError - 當錯誤發(fā)生,只需執(zhí)行該方法
val observer = myObservable
    .doOnError { Log.e(LOG_TAG, "ErrorOccurred") }
    .subscribe()
  • onErrorReturnItem - 如果發(fā)生錯誤,返回一個默認值
val observer = myObservable
    .onErrorReturnItem(0)
    .subscribe()
    }
  • onErrorReturn - 和 onErrorReturnItem 一樣,但接受一個返回所需數(shù)據(jù)類型的函數(shù)(對于動態(tài)默認值)
val observer = myObservable
        .onErrorReturn{ throwable -> throwable.message}
        .subscribe()
  • onErrorResumeNext - 如果發(fā)生錯誤,則返回一個默認數(shù)據(jù)流,也可以為動態(tài)數(shù)據(jù)采取功能。
val observer = myObservable
        .onErrorResumeNext(Observable.just(2, 4, 6))
        .subscribe()
  • retry - 如果發(fā)生錯誤,嘗試重新訂閱 Observable,你可以設置最多的重試次數(shù),或者設置空值以便無限次重試。
val observer = myObservable
        .retry(3)
        .subscribe()

你也可以通過布爾值,來實現(xiàn) 重試條件。

val observer = myObservable
        .retry{ integer, throwable -> integer > 0 }
        .subscribe()

如果沒有使用上述的操作符,在錯誤發(fā)生時,將導致程序崩潰。

結語

到目前為止,此 博客 的大部分內容都是關于 Firebase的。有充分的理由相信,Google 通過強大的 云數(shù)據(jù)庫機器學習云服務,將推動 Firebase 成為最優(yōu)秀的云解決方案之一,并也為服務器開發(fā)打開了新的世界。

雖然我會回到 Firebase 上,但從現(xiàn)在開始,我將潛心學習 RxJava 并深入了解它的細節(jié)。因為它還有更多的閃光點待我去學習:修復回調地獄、提高行能、線程調度.....

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容