簡介
這篇文章沒有介紹,因為這是我們最后一篇文章的延續(xù),但在開始之前,我想我們會對上一篇文章進行修改。在最后一部分,Rx可以與我們分享Hot Vs Cold這些概念的一個具體的例子。在那之后,我問了一些問題,但是Rx可以觀察到,我們應該在Subject API之前了解觀察者API。因此,我們將繼續(xù)我們的對話,從觀察者API,開始。
對話
Observable:在我們了解之前Observables之前,這里有一個訂閱者的概念,這個訂閱者我們在項目中早就已經(jīng)使用到,我們需要在api之前了解他。
me:好 我們怎么開始?
Observable:? 我的新朋友Observer?? 他會好好教你
Observer: 在 開始了解我之前 我有一個問題。 你了解關于Functional?接口內(nèi)容?
me:了解
Observer : 正如你知道的訂閱者是觀察Observable數(shù)據(jù)流變化的 如果有變了。注意到他變化的訂閱者 。 你知道可以呦不同類型的訂閱者 沒有我。 Observable is什么都不是
Observable:哈哈。百分之分沒錯。甜心
Observer任何你能看見Observable的地方。我給你百分之百的保證我會在那里。相反,你可以說我是一個介于Observable和開發(fā)人員之間的橋梁。如果你是Rx的新用戶,你想要使用第三方庫,使用Rx。所以,只有當你知道我的時候,你才真的會使用這個庫。我認為這就足夠了。
me:o((≧▽≦o)!!
Observer:任何時候你都想知道數(shù)據(jù)發(fā)生怎么樣的變化,或者Observable 觀察的數(shù)據(jù)或者事件發(fā)生變化的時候,你需要通過使用我來訂閱這個Observable的,后來,當Observable想要告訴你任何變化時,他會通知我,我會通知你。所以基本上你可以用我的很多方法但是首先我要從我最基本的API開始。
me:我對一句話有點困擾“你可以用到我的很多種方式”
Observer: 等一下并我講,在我基本的用法中有四種
public interface Observer {
void onSubscribe(Disposable var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}
這里T是Java中的泛型。我并不像在這里討論Java泛型。簡而言之,如果你在數(shù)據(jù)類型是Person的數(shù)據(jù),那么T應該是一個Person對象。
現(xiàn)在,這并不是強制使用基本的四種方法觀察者API。這完全取決于你的要求。我將給您一些例子,您可以很容易地確定何時使用這個基本API,或者何時可以使用非常簡單的API,稍后我將與您分享。
現(xiàn)在我要一次用一種方法。
void onSubscribe(Disposable var1);:
無論何時,當你觀察和被觀察者連接到一起(訂閱)的時候,你就會得到一個Disposable?對象。它有一個非常簡單的API,如下所示。附加
public interface Disposable {
void dispose();
boolean isDisposed();
}
因此dispose 就是當你對Observable的變化不感興趣的時候你調(diào)用這個方法。所以你想讓我脫離Observable的時候,讓我調(diào)用這個方法就可以,就類似我和Observable離婚了。在這之后發(fā)生任何事情我都不關心
第二個isDisposed方法。這個方法就是在我自己不確定自己是不是離婚的時候調(diào)用,我用這個方法去檢查如果是false 那么我還沒有離婚。那么我就需要調(diào)用dispose去離婚
void onNext(T var1);:方法
這個方法的作用。就是當我訂閱Observable的時候。它有變化或者新的數(shù)據(jù)的時候,我就可以使用
我想我可以換一種方式去解釋,當Observable 和我結(jié)婚的時候,他將onSubscribe給了我,因此我答應了求婚。這也是重要的一點。意味著我可以隨時離婚,現(xiàn)在,當我們結(jié)婚時,Observable 總是告訴我他的數(shù)據(jù)或事件流中會發(fā)生什么變化。在那時,可以觀察到我的onNext(任何數(shù)據(jù))方法。因此,用簡單的話來說,當他的數(shù)據(jù)有任何變化時,他總是用我的onNext(T data)方法告訴我,開發(fā)人員。
void onError(Throwable var1);:
這個api是很重要的 任何時候他掛的時候通過這個方式通知我。他能告訴我他死前面對的是什么類型,當調(diào)用這個方法的時候disposable isDispose()返回的都是true;所以有的時候 我并沒有要求離婚。但是當他掛了我可以通過這個方法去檢查一下、
void onComplete();:
這個方法對我來說也是很重要。這個是Observable 準備找死或者想和我離婚的時候調(diào)用這個俄方法、 ,它總是用onComplete 來通知我。就像我們之前討論的那樣。 希望你理順了這一切。
me: 還有最后一個小小的問題,onError and onComplete?,都是告訴我再有新的數(shù)據(jù)或者改變也不用通知我!那么這有什么不同呢
Observer:onError? 相當于死于疾病,onComplete相當于正常死亡。他們兩個只會調(diào)用其中的一個。另一個肯定不會調(diào)用,希望你明白
me:wow 好
Observer 現(xiàn)在我給你一個例子告訴你顯示中我是怎么被用到的
在這個例子中,我將創(chuàng)建一Observer,每隔一秒就會給我數(shù)據(jù)。我將使用這些數(shù)據(jù),以不同的方式觀察,讓你對我的API有一個清晰的了解。
private staticObservablegetObservable() {
return Observable.create(observableEmitter -> {
Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe(aLong -> observableEmitter.onNext(new Object()));
});
}
這一個方法可能會讓你感到困惑, 我和Observable結(jié)婚之后他總是每一秒之后都給我新的數(shù)據(jù),并且你還可以看到他的數(shù)據(jù)類型。,所以當訂閱或者結(jié)婚的時候你就會知道數(shù)據(jù)類型。我將方法的調(diào)用
Observerobserver= new Observer() {
@Override
public voidonSubscribe(Disposable disposable) {ObserverLecture.disposable= disposable;
}
@Override
public voidonNext(Object o) {
System.out.println("onNext called");
}
@Override
public voidonError(Throwable throwable) {
System.out.println("onError called. Die due to reason: "+throwable.getMessage());
}
@Override
public voidonComplete() {
System.out.println("onComplete: Die with natural death");
}
};
、
看這就是我。 并不需要過多的解釋。當我們訂閱會或者想要結(jié)婚的時候 通過subscribe方法
Complete code:
public class ObserverLecture {
private static Disposable disposable;
public static void main(String[] args) {
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable disposable) {
ObserverLecture.disposable = disposable;
}
@Override
public void onNext(Object o) {
System.out.println("onNext called");
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError called. Die due to reason: "+throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete: Die with natural death");
}
};
getObservable().subscribe(observer);
while (true);
}
private static Observable getObservable() {
return Observable.create(observableEmitter -> {
Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe(aLong -> observableEmitter.onNext(new Object()));
});
}
}
If I run this code. I will get below output for infinite time, means this program never exit.
Output:
onNext called
onNext called
onNext called
onNext called
onNext called
下面我就給展示一下Disposable 的一些用法
/**
* Created by waleed on 14/05/2017.
*/
public class ObserverLecture {
private static Disposable disposable;
public static void main(String[] args) throws InterruptedException {
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable disposable) {
ObserverLecture.disposable = disposable;
}
@Override
public void onNext(Object o) {
System.out.println("onNext called");
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError called. Die due to reason: "+throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete: Die with natural death");
}
};
getObservable().subscribe(observer);while (true){Thread.sleep(1000);System.out.println("disposable.isDisposed(): "+disposable.isDisposed());}}
private static Observable getObservable() {
return Observable.create(observableEmitter -> {
Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe(aLong -> observableEmitter.onNext(new Object()));
});
}
}
This is a same code like above only one change in while loop. In while loop after every one second I am showing the value of Disposable is Observer is divorced or not.
Output:
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
… infinite
So you can easily see false, mean I am not divorced because I never called Disposable dispose() method. Now its time to show you what will happen when I will call dispose().
public class ObserverLecture {
private static Disposable disposable;
public static void main(String[] args) throws InterruptedException {
Observer observer = new Observer() {
@Override public void onSubscribe(Disposable disposable) {ObserverLecture.disposable = disposable;}
@Override public void onNext(Object o) {System.out.println("onNext called");}
@Override public void onError(Throwable throwable) {System.out.println("onError called. Die due to reason: " + throwable.getMessage());}
@Override public void onComplete() {System.out.println("onComplete: Die with natural death");}
};
getObservable().subscribe(observer);int count = 0;while (true) {
Thread.sleep(1000);
System.out.println("disposable.isDisposed(): " + disposable.isDisposed());count++;if (count == 3)disposable.dispose();}
}
private static Observable getObservable() {
return Observable.create(observableEmitter -> {
Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe(aLong -> {
observableEmitter.onNext(new Object());
});
});
}
}
here again code is same only difference in while loop. This time I added a one count variable. So as I got data from Observable three time’s I will call dispose. Its mean I want divorce from Observable.
Output:
onNext called
disposable.isDisposed(): false
onNext called
disposable.isDisposed(): false
onNext called
disposable.isDisposed(): false
disposable.isDisposed():true
disposable.isDisposed():true
disposable.isDisposed():true
現(xiàn)在你能一眼就看出來輸出,在三次之后我獲取到的是true,他意味我離婚了?,F(xiàn)在的問題是Observable怎了、是掛了還是沒掛?因此我想使用Hot vs Cold Observable.?的概念。如果這是Hot Observable,他就不會死,但如果他 Cold Observable,他雖然不會死,而是會停止發(fā)送數(shù)據(jù)。
因此,是時候討論關于疾病或自然死亡的onError()和onComplete()或死亡
public class ObserverLecture {
private static Disposable disposable;
public static void main(String[] args) throws InterruptedException {
Observer observer = new Observer() {
@Override public void onSubscribe(Disposable disposable) {ObserverLecture.disposable = disposable;}
@Override public void onNext(Object o) {System.out.println("onNext called");System.out.println("disposable.isDisposed(): " + disposable.isDisposed());}
@Override public void onError(Throwable throwable) {System.out.println("onError called. Die due to reason: " + throwable.getMessage());}
@Override public void onComplete() {System.out.println("onComplete: Die with natural death");}
};
getObservable().subscribe(observer);
while (true) {
Thread.sleep(1000);
System.out.println("disposable.isDisposed(): " + disposable.isDisposed());
}
}
private static Observable getObservable() {
return Observable.create(observableEmitter -> {observableEmitter.onNext(new Object());
observableEmitter.onNext(new Object());
observableEmitter.onNext(new Object());
observableEmitter.onNext(new Object());
observableEmitter.onError(new RuntimeException("Die due to cancer"));});
}
}
這里我們可以明確的看到這個onerror的作用
Output:
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed(): false
onError called. Die due to reason: Die due to cancer
disposable.isDisposed():true
disposable.isDisposed():true
…
現(xiàn)在你可以很容易地看到了。我們Observable死亡。他把我的錯誤方法稱為“我的錯誤方法”,而且在死后我的is處置()給了我真實的信息。也就是說,我離婚了,或者是寡婦。
現(xiàn)在是檢查onComplete()的時候了。
public class ObserverLecture {
private static Disposable disposable;
public static void main(String[] args) throws InterruptedException {
Observer observer = new Observer() {
@Override public void onSubscribe(Disposable disposable) {ObserverLecture.disposable = disposable;}
@Override public void onNext(Object o) {System.out.println("onNext called"); System.out.println("disposable.isDisposed(): " + disposable.isDisposed());}
@Override public void onError(Throwable throwable) {System.out.println("onError called. Die due to reason: " + throwable.getMessage());}
@Override public void onComplete() {System.out.println("onComplete: Die with natural death");}
};
getObservable().subscribe(observer);
while (true) {
Thread.sleep(1000);
System.out.println("disposable.isDisposed(): " + disposable.isDisposed());
}
}
private static Observable getObservable() {
return Observable.create(observableEmitter -> {
observableEmitter.onNext(new Object());
observableEmitter.onNext(new Object());
observableEmitter.onNext(new Object());
observableEmitter.onNext(new Object());observableEmitter.onComplete();});
}
}
Here you can see I have only one change. Observable called onComplete on his own.
Output:
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onComplete: Die with natural death
disposable.isDisposed():true
disposable.isDisposed():true
disposable.isDisposed():true
更加直觀
當我disposable.isDisposed()我獲取到的是false,我并沒有離婚我還在一直獲取數(shù)據(jù),但是當我調(diào)取onComplete的時候isDispose 變成了true,這意味了Observable?正常死亡
me:wow 謝謝Observer,這真是一個很棒的解釋,你減少了我很多的迷惑。我對你有過一些了解,但現(xiàn)在我很好奇,有些時候,人們只在用戶中使用一種方法。該方法是什么?
Observer:首先感謝,我可以項目解釋更多api,但是首先我感覺你應該使用一些相同的概念 在android x下面我將給你介紹一個例子
me: 我同意你的觀點,但我認為首先我們會了解你的一切,然后我會在Android上給出一個真實的例子,我將使用你上面所有的API。
Observer:正如你希望的那樣,有時,場景并不復雜,您可以使用觀察者4方法API,但是我有一種感覺,不需要使用四種方法,您可以使用更少的代碼來實現(xiàn)該場景。為此,我將自己劃分為功能接口,或者可以說,這是觀察者的一種語法糖。例如
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(String string) {
System.out.println("onNext: "+string);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
}
Output:
onNext: A
onNext: B
onNext: C
onNext: D
onComplete
現(xiàn)在您可以看到,我只對數(shù)據(jù)感興趣,但我需要實現(xiàn)on訂閱、onError和onComplete。這是一個樣板,在下一個例子中,我們?nèi)绾斡酶俚拇a實現(xiàn)
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(s -> System.out.println(s));
}
}
在上面的例子中,上面的例子都是相同的,但是您可以看到,這一次我只使用了兩行代碼,在此之前,它是一個非常長的代碼?,F(xiàn)在我將與你們分享我所有的功能接口以及如何在你們的應用程序中使用。
public interface Consumer {
void accept(@NonNull T var1) throws Exception;
}
public interface Action {
void run() throws Exception;
}
所以我有兩個函數(shù)接口。一個是Consumer的,這是非常有用的,第二個是Action。
首先,我們將討論Consumer接口
你知道的,我只對數(shù)據(jù)感興趣,我不關心任何其他的狀態(tài),比如我不想是不是使用Disposable離婚。還是我不想知道的是可以被自然死亡或某種疾病致死的。在這種情況下,我可以使用這個消費者API,我也想說,感謝Observable ,他們給了我使用我的功能接口來訂閱的選項。
Observable: O(∩_∩)O
Observer:下面看一下使用過的代碼
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(new Consumer() {@Overridepublic void accept(String s) throws Exception {System.out.println(s);}});
}
這里我只訂閱onNext()從Observable的回調(diào),您可以很容易地看到,我將一個匿名類發(fā)送到Observable到的訂閱。這是一個神奇的地方,正如我已經(jīng)告訴你們的,我有函數(shù)接口,所以它的意思是,我可以將Lambda表達式發(fā)送到Observable的訂閱,而不是匿名類或接口對象。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(s -> System.out.println(s));
}
}
Wow. You can see in above example, only one line.
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(System.out::println);
}
}
哇。更少的單詞。這里我使用的是方法引用,但最終所有的代碼塊都給了我相同的功能。還有一種技術將保留在一個相同的示例中,如下所示。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer);
}private static Consumer consumer = System.out::print;//private static Consumer consumer2 = s->{};}
在這里,我分別定義了我的消費者功能接口,我正在使用該對象進行訂閱。
接下來,我們還想知道是否出現(xiàn)了錯誤。如何使用相同的功能接口來通知我。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer,new Consumer() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("Die due to "+throwable.getMessage());}});
}
private static Consumer consumer = System.out::print;
}
在這里,您可以很容易地看到在訂閱方法中可以看到的第二個參數(shù)是onError。同樣,我也發(fā)送了相同的消費者功能接口,T=可投擲。這是很簡單的。
接下來,如何通過使用Lambda表達式實現(xiàn)相同的目標。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer,throwable -> System.out.println("Die due to "+throwable.getMessage()));
}
private static Consumer consumer = System.out::print;
}
接下來,我如何使用方法引用來實現(xiàn)這一目標。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer,System.out::print);
}
private static Consumer consumer = System.out::print;
}
Wow. Only one thing to mention. Here method reference call throwable.toString() not able to show our custom message. Like how we are showing in one above example (System.out.println(“Die due to “+throwable.getMessage()).
Now its time to show you by defining my Error Consumer API and sending that object to subscribe.
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer,error);
}
private static Consumer consumer = System.out::print;private static Consumer error = System.out::print;}
現(xiàn)在我知道您很好奇如何知道是否可以看到onComplete()是否被調(diào)用,為此,我有一個Action 接口。我需要把它作為第三個參數(shù)來訂閱可觀察的。因此,作為完整的Observable ,我將在 action interface得到信號。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer, error,new Action() {@Overridepublic void run() throws Exception {System.out.println("OnComplete");}});
}
private static Consumer consumer = System.out::print;
private static Consumer error = System.out::print;
}
在這你能看到這個Action這個匿名接口作為第三個參數(shù)接下來我要展示給你們看。我們?nèi)绾问褂肔ambda表達式作為一個方法引用,最后作為一個單獨的定義對象。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer, error,() -> System.out.println("OnComplete"));
}
private static Consumer consumer = System.out::print;
private static Consumer error = System.out::print;
}
所以你可以看到第三個參數(shù)是Action而不是Consumer。這一點很重要。
最后一件事是Disposable。如果我想離婚,我怎么能得到Disposable。這是第四個參數(shù),即T=可支配的消費者。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer, error, complete,new Consumer() {@Overridepublic void accept(Disposable disposable) throws Exception {}});
}
private static Consumer consumer = System.out::print;
private static Consumer error = System.out::print;
private static Action complete = ()-> System.out.println("onComplete");
}
這里我可以獲取Disposable。
我也可以在這里看到。我可以作為一個觀察者來實現(xiàn)我自己,或者我可以通過使用功能接口來實現(xiàn)同樣的目標。它的意思是
Observer subscription=4個功能接口訂閱(onsumer, Consumer, Action, Consumer)