寫在前面的話:RxJava的強(qiáng)大之處不僅在于它的設(shè)計(jì)思想,更是因?yàn)橛兄鞣N強(qiáng)大的操作符,操作符讓你可以靈活的處理變換、組合、操縱和處理Observable發(fā)射的數(shù)據(jù),接下來的幾篇文章會(huì)按照操作符的分類逐步講解各個(gè)操作符的作用,合理使用這些操作符也許你會(huì)達(dá)到事半功倍的效果。
本文適用于那些RxJava的初學(xué)者進(jìn)階使用,但是如果你還不知道什么是RxJava,那么我只能說
這篇的主題是創(chuàng)建型操作符,也就是創(chuàng)建Observable的操作符,先來看看都有那些操作符吧:
- create()
- from()
- just()
- interval()
- timer()
- empty()
- error()
- range()
- repeat()
- defer()
- never()
1.Create
這個(gè)就沒什么好說的了吧?大多數(shù)時(shí)候我們都是用這個(gè)函數(shù)來創(chuàng)建Observable的,
經(jīng)典的流試調(diào)用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(1);
}
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
}
});
2.from
** from的作用是將一個(gè)Iterable, 一個(gè)Future, 或者一個(gè)數(shù)組轉(zhuǎn)換成一個(gè)Observable**,from方法具體有一下幾種:
常用的有frmoArray和fromIterable這兩個(gè),來看個(gè)具體例子吧
集合遍歷
List<Integer> ls = new ArrayList<>();
ls.add(1);
ls.add(2);
ls.add(3);
Observable.fromIterable(ls).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "accept: " + integer);
}
});
其他的方法大家自行體會(huì)吧
3.just
just是將一個(gè)或多個(gè)對(duì)象轉(zhuǎn)換成發(fā)射這個(gè)或這些對(duì)象的一個(gè)Observable,在某些情況下它其實(shí)和from的作用有些類似,或者你也可以理解為這是一個(gè)簡版的create方法。來看具體例子:
Observable.just(1).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "accept: "+integer);
}
});
just可以傳入一個(gè)或多個(gè)對(duì)象,然后一次發(fā)射出來,對(duì)于數(shù)據(jù)量較小時(shí)可以使用這個(gè)方法。
4.interval
interval可以創(chuàng)建一個(gè)按照給定的時(shí)間間隔發(fā)射整數(shù)序列的Observable,通常我們可以用它來來完成一些周期行的操作,比如心跳,Handler最好的替代品了,來看具體例子:
Observable.interval(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.i(TAG, "accept: "+aLong);
}
});
第一個(gè)參數(shù)為時(shí)間間隔,第二個(gè)為時(shí)間單位,上述accept()會(huì)2S執(zhí)行一次,在這里你就可以完成一些周期性任務(wù)了。
5.timer
timer可以創(chuàng)建一個(gè)在給定的延時(shí)之后發(fā)射單個(gè)數(shù)據(jù)的Observable,通常可以用它完成一些定時(shí)任務(wù),注意它和interval的區(qū)別,interval是周期性的(多次),timer是定時(shí)的(一次),具體例子:
Observable.timer(5, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.i(TAG, "accept: "+aLong);
}
});
參數(shù)和interval一樣,5s之后執(zhí)行accept方法
6.empty
empty可以創(chuàng)建一個(gè)什么都不做直接通知完成的Observable。這個(gè)操作符的使用范圍就比較窄了,舉一個(gè)應(yīng)用場景,我們做數(shù)據(jù)緩存時(shí)通常的實(shí)現(xiàn)邏輯是先加載本地緩存,如果本地沒有或者已經(jīng)過期那么才請(qǐng)求網(wǎng)絡(luò),這里就可以使用empty,假設(shè)背地緩存不存在,那么是拋出異常還是返回null呢?顯然這兩種都不合理,因?yàn)槿绻镜夭淮嬖谖覀冃枰苯诱?qǐng)求網(wǎng)絡(luò),一種解決辦法就是當(dāng)為空的時(shí)候直接返回Observable.empty(),同時(shí)借助switchIfEmpty(observable1,observable2)方法完成后續(xù)的網(wǎng)絡(luò)請(qǐng)求。當(dāng)然你也可以通過concat和first來完成數(shù)據(jù)的獲取,這是我們后續(xù)的文章要說的了。場景說忘了,那到底是不是直接通知完成呢?測試一下就知道了
Observable<Integer> o1 = Observable.empty();
o1.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext: ");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
});
果然是只有onComplete執(zhí)行了。
7.error
error可以創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)以一個(gè)錯(cuò)誤終止的Observable,需要一個(gè)Throwable參數(shù),使用場景還是拿緩存舉例,比如內(nèi)存緩存不存在,網(wǎng)絡(luò)數(shù)據(jù)也請(qǐng)求失敗了,那么我們就可以調(diào)用這個(gè)操作符拋出一個(gè)Throwable。那么就直接執(zhí)行到了Observer的onError方法里里。測試一下,上面的例子不變,我們把empty換成error,并隨便傳入一個(gè)Throwable:
Observable<Integer> o1 = Observable.error(new IOException());
o1.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext: ");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
});
直接執(zhí)行了onError方法。
8.range
range可以創(chuàng)建一個(gè)發(fā)射指定范圍的整數(shù)序列的Observable。直接上例子:
Observable.range(0,5).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "accept: "+integer);
}
});
第一個(gè)參數(shù)為數(shù)列的起始元素,第二個(gè)為長度,打印結(jié)果為:
需要注意的是如果你將第二個(gè)參數(shù)設(shè)為0,將導(dǎo)致Observable不發(fā)射任何數(shù)據(jù)(如果設(shè)置為負(fù)數(shù),會(huì)拋異常)
9. repeat
repeat可以創(chuàng)建一個(gè)重復(fù)發(fā)射指定數(shù)據(jù)的Observable,你也可以通過repeat(n)指定重復(fù)次數(shù)。更多時(shí)候我們會(huì)使用repeat(n)這個(gè)方法,具體例子:
Observable.just(1).repeat(5).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "accept: "+integer);
}
});
結(jié)果為:
10.defer
defer的作用是只有當(dāng)Observer訂閱才創(chuàng)建Observable并且是為每個(gè)Observer創(chuàng)建一個(gè)新的Observable。
正常情況下:
int a = 10;
Observable<Integer> o2 = Observable.just(a);
a = 20;
o2.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "accept= " + integer);
}
});
輸出為accept=10,但是如果用defer
Observable<Integer> o1 = Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> call() throws Exception {
return Observable.just(a);
}
});
a = 20;
o1.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "accept:= " + integer);
}
});
輸出為accept=20,大家仔細(xì)看一下上邊兩個(gè)例子就理解defer了。
11. never
never可以創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)也不終止的Observable,具體還沒有使用過,也沒想到應(yīng)用場景大家知道有這么一個(gè)操作符就可以了。
以上就是Observable的常見的創(chuàng)建操作符了,有遺漏的歡迎大家補(bǔ)充,下一篇會(huì)介紹常見的變換操作符,敬請(qǐng)期待。