前兩篇文章 放棄RxBus,擁抱RxJava(一):為什么避免使用EventBus/RxBus ,放棄RxBus,擁抱RxJava(二):Observable究竟如何封裝數(shù)據(jù)? 寫了一堆理論??雌饋聿]有什么實際用處,那么今天。我們實戰(zhàn)一下,來封裝我們需要的數(shù)據(jù),并且了解一下各種方式具體的區(qū)別。
前言: 很多朋友誤會我文章的意思。我寫這個系列文章的意思主要是幫助了解一下RxJava的常見用法。而不是使用一下自己或別人封裝好的RxBus就覺得自己的項目使用RxJava了。但是這也僅僅是個人口味問題,很多情況下確實RxBus/EventBus會很方便,很刺激,很上癮。所以從這篇文章開始,我把標(biāo)題中的"放棄RxBus"去除。
無論在簡書,微信平臺,GitHub等等分享平臺。一個名字上寫著 "MVP(MVVM) + RxJava + Retrofit + Dagger2 + ........"這樣的名字,再熟悉不過了。然而,大多數(shù)情況進(jìn)去看一下RxJava部分。要么就是簡單的把取到的數(shù)據(jù)用Observable.just()直接傳給下一層,要么就是直接使用Retrofit的Adapter來直接獲得Observable,而app中其他部分并沒有reactive。而且還有很多Observable用法錯誤,比如冷熱不分,連續(xù)太多的Map/FlatMap等等。
0. RxBus/Retrofit 足夠用了,我為什么要讓自己的App 更加的Reactive?
為什么不用RxBus我已經(jīng)寫了兩篇文章了,可能由于我不常寫文,很多人并沒有理解。在這里我再解釋一次:EventBus如果是一輛穿梭在所有代碼之間的公交車。那么Observable就是穿梭在少許人之間的Uber專車。他比起EventBus有很多優(yōu)勢,比如異常處理,線程切換,強大的操作符等等。你當(dāng)然可以做出一輛超級Uber來當(dāng)全局公交車(RxBus)使用,然而這卻損失了RxJava本來的許多優(yōu)勢,并且又給自己挖了許多坑。
0.1 一個常見誤區(qū),過多的operator
剛開始使用RxJava的時候,我們會覺得operator的鏈?zhǔn)秸{(diào)用會非常的爽,一個簡單的例子:
Observable.just("1", "2", "3", "4", "5", "6", "7")
.map(x -> Integer.valueOf(x))
.map(x -> x * 2)
.map(x -> x + 4)
.filter(x -> x >2)
// and much more operators
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
當(dāng)你只有很少數(shù)據(jù)的時候,這樣當(dāng)然可以,但是你數(shù)據(jù)量上來的時候,這就會有很多的overhead。 其實幾乎所有的operator都會給你生成一個新的Observable。所以在上面這個例子中,我們在過程中生成了至少7個Observable。然而我們完全可以將中間的.map().map().map().filter合并在一個FlatMap中,減少很多的overhead。
1. Observable.just()的局限性。
- 使用
Observable.just()即使你沒有調(diào)用subscribe方法。just()括號里面的代碼也已經(jīng)執(zhí)行了。顯然,Observable.just()不適合封裝網(wǎng)絡(luò)數(shù)據(jù),因為我們通常不想在subscribe之前做網(wǎng)絡(luò)請求。
舉個例子:
class TestClass{
TestClass(){
System.out.println("I'm created!");
}
}
Observable.just(new TestClass());
這時你運行代碼,你就看到確實你的TestClass 已經(jīng)被創(chuàng)建了:
I/System.out: I'm created!
當(dāng)然,這個可以簡單的用defer()/fromCallable()/create()操作符來是實現(xiàn)只有subscribe只有才加載。
比如:
// use fromCallable
Observable.fromCallable(TestClass::new);
//or
Observable.defer(() -> Observable.just(new TestClass()));
- Observable.just()不夠靈活。雖然說設(shè)計模式上我們追求 "Minimize Mutability" 但是如果我們的程序越來越 reactive的時候。一個 ObservableJust 往往是不滿足需求的。比如之前一定訂閱的subscriber。如果數(shù)據(jù)更新了,你不可以同過ObservableJust 來通知所有的Observable 新數(shù)據(jù)更新了,需要你的subscriber主動更新。這顯然有悖于我們追求的reactive programming。 主動pull數(shù)據(jù)而不是數(shù)據(jù)告訴你,我更新了然后再做出反應(yīng)。
當(dāng)然ObservableJust在很多情況下,確實不錯。如果你不需要監(jiān)聽后續(xù)的更新,那么ObservableJust可以滿足你的需求。
2. Hot Observable 和 cold Observable
很多人在封裝數(shù)據(jù)的時候,并沒有太多考慮冷熱的問題,通常情況下并不會出錯。因為目前很多開源項目(Demo)里除了RxBus,并沒有太多的RxJava的實時情況。然而,當(dāng)你的App越來越Reactive的時候,冷熱便是一個必須考慮的問題。
Hot Observable 意思是如果他開始傳輸數(shù)據(jù),你不主動喊停(dispose()/cancel()),那么他就不會停,一直發(fā)射數(shù)據(jù),即使他已經(jīng)沒有Subscriber了。而Cold Observable則是subscribe時才會發(fā)射數(shù)據(jù)。
然而,問題來了。我上篇文章講過,只有subscribeActual方法調(diào)用了的時候,Observable發(fā)射數(shù)據(jù),那為什么Hot Observable沒有Subscriber也會發(fā)射數(shù)據(jù),他把數(shù)據(jù)發(fā)射給誰了呢?我們在解決這個問題之前,先看一下Cold Observable:
2.1 Cold Observable
我們常見的工廠方法提供的都是ColdObservable,包括just(),fromXX,create(),interval(),defer()。 他們的共同點是當(dāng)你有多個Subscriber的時候,他們的事件是獨立的,舉個例子:
Observable interval = Observable.interval(1,TimeUnit.SECONDS);
如果我們有兩個subscriber,那么他們會各自有自己的計時器,并且互不干擾。效果如下圖:

2.2 Hot Observable
不同于Cold Observable, Hot Observable是共享數(shù)據(jù)的。對于Hot Observable的所有subscriber,他們會在同一時刻收到相同的數(shù)據(jù)。我們通常使用publish()操作符來將ColdObservable變?yōu)镠ot?;蛘呶覀冊赗xBus中常常用到的Subjects 也是Hot Observable。
剛剛我們剛剛提出了一個問題,
既然Hot Observable在沒有subscriber的時候,還會繼續(xù)發(fā)送數(shù)據(jù),那么數(shù)據(jù)究竟發(fā)給誰了呢?
其實Hot Observable其實并沒有發(fā)送數(shù)據(jù),而是他上層的Observable 發(fā)送數(shù)據(jù)給這個hot Observable。不信?我們來分別看一下:
2.2.1 ConnectableObservable
我們在上面的誤區(qū)中知道了,幾乎所有operator都會生成一個新的Observable。publish當(dāng)然不例外。但是有區(qū)別的是,publish會給你一個ConnectableObservable。具體實現(xiàn)類是ObservablePublish。這個Observable的區(qū)別是他提供一個connect()方法,如果你調(diào)用connect()方法,ConnectableObservable就會開始接收上游Observable的數(shù)據(jù)。我們來測試一下:
ConnectableObservable interval = Observable.interval(1, TimeUnit.SECONDS).publish();
//connect even when no subscribers
interval.connect();

果然,由于我們subscribe晚了一些。0這個數(shù)據(jù)沒有收到,當(dāng)我們兩個 Subscriber 都dispose的時候,ConnectableObservable 也仍在接受數(shù)據(jù),導(dǎo)致我們6這個數(shù)據(jù)沒有接收到。
ConnectableObservable 其實在內(nèi)部,有一個PublishObserver,他有兩個作用。一個是當(dāng)我們調(diào)用 connect()方法時, PublishObserver開始接受上游的數(shù)據(jù),我們的例子里便是 Observable.interval(1, TimeUnit.SECONDS) 。所以才能在我們沒有調(diào)用 subscribe方法時,他也能開始發(fā)送數(shù)據(jù)。第二個作用是 PublishObserver存儲所有的下游Subscriber, 也就是我們例子中的Subscriber1 和Subscriber2,在 PublishObserver 每次接到一個上游數(shù)據(jù),就會將接收到的結(jié)果,依次分發(fā)給他存儲的所有 Subscribers ,如果下游 Subscriber 調(diào)用了 dispose方法,那么他就會在自己的緩存中刪除這個 Subscriber,下次接受到上游數(shù)據(jù)便不會傳給這個Subscriber。
那么這時候,有同學(xué)應(yīng)該要問了:
我們可不可以停止從上游接受數(shù)據(jù)?
我們當(dāng)然可以。但是從設(shè)計的角度,RxJava為了提供鏈?zhǔn)秸{(diào)用。 connect()方法會返回一個 Disposable 給我們來控制是否繼續(xù)接受上游的數(shù)據(jù)。
2.2.2 ConnectableObservable的常用操作符
我們當(dāng)然不希望每次都手動控制 ConnectableObservable的開關(guān)。RxJava給我們提供了一些常用的控制操作符
- refCount()
refCount()可以說是最常用的操作符了。他會把ConnectableObservable變?yōu)橐粋€通常的Observable但又保持了HotObservable的特性。也就是說,如果出現(xiàn)第一個Subscriber,他就會自動調(diào)用connect()方法,如果他開始接受之后,下游的Subscribers全部dispose,那么他也會停止接受上游的數(shù)據(jù)。具體看圖:
每個 Subscriber 每次都會接受同樣的數(shù)據(jù),但是當(dāng)所有 subscriber 都 dispose時候,他也會自動dipose上游的 Observable 。所以我們重新subscribe的時候,又重新從0開始。
這個操作符常用到,RxJava將他和publish合并為一個操作符 :share()。
- autoConnect()
autoConnect()看名字就知道,他會自動鏈接,如果你單純調(diào)用autoConnect(),那么,他會在你鏈接第一個Subscriber的時候調(diào)用connect(),或者你調(diào)用autoConnect(int Num),那么他將會再收到Num個subscriber的時候鏈接。
但是,這個操作符的關(guān)鍵在于,由于我們?yōu)榱随準(zhǔn)秸{(diào)用,autoConnect會返回Observable給你,你不會在返回方法里獲得一個Disposable來控制上游的開關(guān)。 不過沒問題,autoConnect提供了另一種重載方法 :
autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection)
他會在這個Consumer傳給你 你需要的那個總開關(guān)。而且,autoConnect并不會autoDisconnect, 也就是如果他即使沒有subscriber了。他也會繼續(xù)接受數(shù)據(jù)。 - replay()
replay()方法和publish()一樣,會返回一個ConnectableObservable,區(qū)別是,replay()會為新的subscriber重放他之前所收到的上游數(shù)據(jù),我們再來舉個例子:
//only replay 3 values
Observable.interval(1, TimeUnit.SECONDS).replay(3).refCount();

果然,Subscriber2在subscribe時候,立即收到了之前已經(jīng)錯過的三個數(shù)據(jù),然后繼續(xù)接受后面的數(shù)據(jù)。
但是,這里有幾點需要考慮:replay() 會緩存上游發(fā)過來的數(shù)據(jù),所以并不需要擔(dān)心重新生成新數(shù)據(jù)給新的 Subscriber。
- ReplayingShare()
其實ReplayingShare并不能算是ConnectableObservable的一個操作符,他是JakeWhaton的一個開源庫,只有百來行。實現(xiàn)的功能是幾乎和replay(1).refCount()差不多。但是如果中斷 Conncection之后,重新開始subscribe,他仍然會給你一個重放他上一次的結(jié)果。 具體看圖:
我們看到和剛才的replay不同,即使兩個Subscriber都 dispose, 重新開始仍然會接收到我們緩存過的一個數(shù)據(jù)。
2.3 Subjects
Subjects 作為一個Reactive世界中的特殊存在,他特殊在于他自己既是一個Observable又是一個Observer(Subscriber)。你既可以像普通Observable一樣讓別的Subscriber來訂閱,也可以用Subjects來訂閱別人。更方便的是他甚至暴露了OnXX(),方法給你。你直接調(diào)用可以通知所有的Subscriber。 這也是RxBus的基礎(chǔ),RxBus幾乎離不開Subjects。 蜘蛛俠的老爹告訴我們,力量越大,責(zé)任就也大。Subjects也一樣。 Subjects因為暴露了OnXX()方法,使得Subjects的數(shù)據(jù)來源變得難以控制。而且,Subjects一直是HotObservable,我們來看下Subject的OnNext()方法的實現(xiàn):
@Override
public void onNext(T t) {
if (subscribers.get() == TERMINATED) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (PublishDisposable<T> s : subscribers.get()) {
s.onNext(t);
}
}
可以看出來Subjects只要調(diào)用了OnNext()方法就會立即發(fā)送數(shù)據(jù)。所以,使用時一定要注意Subjects和Subscriber的鏈接時序問題。具體Subjects的用法我想介紹帖子已經(jīng)足夠多了。這里就不贅述了。
3. 在Android中常見的幾種封裝和注意事項
1.封裝View 的Listener
View 的各種Listener 我們常用create方法來封裝,比如OnClickListener:
Observable.create(emitter -> {
button.setOnClickListener(v -> emitter.onNext("I'm Clicked"));
emitter.setCancellable(() -> button.setOnClickListener(null));
});
這里非常關(guān)鍵的一點是一定要設(shè)置解除綁定,否則你將持續(xù)使用這個會造成內(nèi)存泄漏。而且最好配合使用share()。否則只有最后一個Subscriber能收到OnClick。當(dāng)然,如果不考慮方法數(shù)的話,推薦配合使用RxBinding。
而且,用create()方法封裝Listener適合幾乎所有的callback, 并且安全。
2.封裝簡單的數(shù)據(jù)源
設(shè)想一個場景,我們有一個User類。里面有我們的用戶名,頭像,各種信息。然而在我們的app中,可能有三四個Fragment/Activity需要根據(jù)這個User做出不同的反應(yīng)。這時我們就可以簡單的使用Subject來封裝User類。
public class UserRepository {
private User actualUser;
private Subject<User> subject = ReplaySubject.createWithSize(1);
/**
*
*Get User Data from wherever you want Network/Database etc
*/
public Observable<User> getUpdate(){
return subject;
}
public void updateUser(User user){
actualUser = user;
subject.onNext(actualUser);
}
}
如果我們某些模塊需要這個User,那么只需要subscribe到這個Repository,如果User有更新,每一個Subscriber都會收到更新后的User并且互相不影響。而且我們使用ReplaySubject,即使有新的Subscriber,也會收到最新的一個Subject。
但是使用的時候一定要注意,因為用的是Subject.所以在onNext方法中一旦出現(xiàn)了error。那么所有的Subscriber都將和這個subject斷開了鏈接。這里也可以用RxRelay代替Subject,簡單來說Relay就是一個沒有onError和onComplete的Subject。
3.簡單的使用concat().first()來處理多來源
Dan Lew在他的博客Loading data from multiple sources with RxJava
中介紹過他這種處理方法,
// Our sources (left as an exercise for the reader)
Observable<Data> memory = ...;
Observable<Data> disk = ...;
Observable<Data> network = ...;
// Retrieve the first source with data
Observable<Data> source = Observable
.concat(memory, disk, network)
.first();
然后在每次做不同請求的時候刷新緩存
Observable<Data> networkWithSave = network.doOnNext(data -> {
saveToDisk(data);
cacheInMemory(data);
});
Observable<Data> diskWithCache = disk.doOnNext(data -> {
cacheInMemory(data);
});
具體也可以看這篇簡書,我也不在過多贅述 :RxJava(八)concat符操作處理多數(shù)據(jù)源
4.自己繼承Observable 手動寫subscribeActual()方法
這可能是最靈活的寫法?如果你想用RxJava封裝自己的庫,推薦這種方法封裝。因為這樣不僅僅可以有效的進(jìn)行錯誤處理,并且不會暴露過多邏輯給外面,許多優(yōu)秀的RxJava相關(guān)庫都是這樣封裝,就連RxJava自己也是把一個個的operator封裝成一個個不同的Observable。但是這種方法確實要求很高,要做很多考慮,比如異步,多線程沖突,錯誤處理。對新手不是很推薦。

