RxJava

RxJava概述

  • RxJava 是一種響應式編程,來創(chuàng)建基于事件的異步操作庫?;谑录鞯逆準秸{(diào)用、邏輯清晰簡潔。
  • RxJava 我的理解是將事件從起點(上游)流向終點(下游),中間有很多卡片對數(shù)據(jù)進操作并傳遞,每個卡片獲取上一個卡片傳遞下來的結(jié)果然后對事件進行處理然后將結(jié)果傳遞給下一個卡片,這樣事件就從起點通過卡片一次次傳遞直到流向終點。

RxJava觀察者模式

  • 傳統(tǒng)觀察者是一個被觀察者多過觀察者,當被觀察者發(fā)生改變時候及時通知所有觀察者
  • RXjava是一個觀察者多個被觀察者,被觀察者像鏈條一樣串起來,數(shù)據(jù)在被觀察者之間朝著一個方向傳遞,直到傳遞給觀察者 。

RxJava原理理解

  • 被觀察者通過訂閱將事件按順序依次傳遞給觀察者,


    image.png
//RxAndroid中包含RxJava的內(nèi)容,只引入RxAndroid還是會報錯
dependencies {
    ......
    compile 'io.reactivex.rxjava2:rxjava:2.1.3'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
image.png

創(chuàng)建Observer(觀察者)

        Observer<Integer> observer = new Observer<Integer>() {
 
            // 觀察者接收事件前  ,當 Observable 被訂閱時,觀察者onSubscribe方法會自動被調(diào)用 
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始采用subscribe連接");
            }

            // 當被觀察者生產(chǎn)Next事件 
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對Next事件作出響應" + value);
            }

            // 當被觀察者生產(chǎn)Error事件 
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應");
            }

            // 當被觀察者生產(chǎn)Complete事件 
            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應");
            }
        };
       //Subscriber類 = RxJava 內(nèi)置的一個實現(xiàn)了 Observer 的抽象類,對 Observer 接口進行了擴展 
       Subscriber<Integer> subscriber = new Subscriber<Integer>() {

           // 觀察者接收事件前 ,當 Observable 被訂閱時,觀察者onSubscribe方法會自動被調(diào)用 
            @Override
            public void onSubscribe(Disposable d) { 
                Log.d(TAG, "開始采用subscribe連接");
            }

            // 當被觀察者生產(chǎn)Next事件 
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對Next事件作出響應" + value);
            }

            // 當被觀察者生產(chǎn)Error事件 
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應");
            }

            // 當被觀察者生產(chǎn)Complete事件 
            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應");
            }
        };

Subscriber 抽象類與Observer 接口的區(qū)別

  • 二者基本使用方式一致(在RxJava的subscribe過程中,Observer會先被轉(zhuǎn)換成Subscriber再使用)
  • Subscriber抽象類對 Observer 接口進行了擴展,新增了兩個方法:
    1. onStart():在還未響應事件前調(diào)用,用于做一些初始化工作,他是在subscribe 所在的線程調(diào)用,不能切換線程,所以不能進行界面UI更新比如彈框這些。
    2. unsubscribe():用于取消訂閱。在該方法被調(diào)用后,觀察者將不再接收響應事件,比如在onStop方法中可以調(diào)用此方法結(jié)束訂閱。調(diào)用該方法前,先使用 isUnsubscribed() 判斷狀態(tài),確定被觀察者Observable是否還持有觀察者Subscriber的引用。

創(chuàng)建 Observable (被觀察者)

        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // 通過 ObservableEmitter類對象產(chǎn)生事件并通知觀察者
                // ObservableEmitter:定義需要發(fā)送的事件 & 向觀察者發(fā)送事件
                     
                emitter.onNext(1);
                emitter.onComplete();
            }
        });

RxJava 提供了其他方法用于 創(chuàng)建被觀察者對象Observable

// 方法1:just(T...):直接將傳入的參數(shù)依次發(fā)送出來
  Observable observable = Observable.just("A", "B", "C");
  // 將會依次調(diào)用:
  // onNext("A");
  // onNext("B");
  // onNext("C");
  // onCompleted();

// 方法2:fromArray(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組 / Iterable 拆分成具體對象后,依次發(fā)送出來
  String[] words = {"A", "B", "C"};
  Observable observable = Observable.fromArray(words);
  // 將會依次調(diào)用:
  // onNext("A");
  // onNext("B");
  // onNext("C");
  // onCompleted();

以上兩種方法創(chuàng)建出來的觀察者都是繼承Observable,比如ObservableCreate、ObservableFromArray、ObservableMap...,

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

--------------------------------------------------------------------------------------------------------

public abstract class Observable<T> implements ObservableSource<T> {

   ...

    protected abstract void subscribeActual(Observer<? super T> observer);
 
    @Override
    public final void subscribe(Observer<? super T> observer) {
     ...
        try {
            ...
            subscribeActual(observer);
        }  catch (Throwable e) {
           ...
        }
    }
}

public final class ObservableCreate<T> extends Observable<T> {

   final ObservableOnSubscribe<T> source;

   @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

   static final class CreateEmitter<T> extends AtomicReference<Disposable> 
 implements ObservableEmitter<T>, Disposable {
  
 ...

        @Override
        public void onNext(T t) {
            if (t == null) {
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        ...
    }
}

public final class ObservableFromArray<T> extends Observable<T> {

    final T[] array;
   
    @Override
    public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

        s.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }

        d.run();
    }

    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {

        final Observer<? super T> actual; //對應觀察者

        final T[] array;
        
        ...

        @Override
        public void dispose() {
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }

        void run() {
            T[] a = array;
            int n = a.length;

            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                actual.onNext(value);
            }
            if (!isDisposed()) {
                actual.onComplete();
            }
        }
    }
}

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    ... 

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), 
                      "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t),
                      "The mapper function returned a null value.") : null;
        }
    }
}

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T call() {
        return value;
    }
}

---------------------------------------------------------------------------------------------

 public static final class ScalarDisposable<T>
    extends AtomicInteger
    implements QueueDisposable<T>, Runnable {

        final Observer<? super T> observer;

        final T value;
  
        @Override
        public void dispose() {
            set(ON_COMPLETE);
        }

       ....
         
        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }
    }

觀察者和被觀察者通過subscribe訂閱,訂閱完成后被觀察者就可以像觀察者發(fā)送數(shù)據(jù)

 
        Observable.create(new ObservableOnSubscribe<Integer>() {
       
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
   
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始采用subscribe連接");
            }
 
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對Next事件"+ value +"作出響應"  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應");
            }

        });
    }
}

image.png

鏈式調(diào)用


image.png
     Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
            }
        }).map(new Function<String, String>() {

            @Override
            public String apply(@NonNull String s) throws Exception {
                return null;
            }
        }).map(new Function<String, String>() {

            @Override
            public String apply(@NonNull String s) throws Exception {
                return null;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
            }

            @Override
            public void onNext(String s) {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

這個訂閱的過程就如同洋蔥一樣一層層封裝,當訂閱完成后就像剝洋蔥一樣一層層剝,用發(fā)射器發(fā)送數(shù)據(jù),用onNext方法一層層發(fā)送,發(fā)送給每一層的時候就回調(diào)每一層的Function類apply方法,這個方法由開發(fā)者實現(xiàn),該方法處理數(shù)據(jù)后就返回處理后的數(shù)據(jù),然后數(shù)據(jù)又往下一層傳遞,直到傳遞到觀察者手里,然后觀察者接收數(shù)據(jù)


image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 前言 我從去年開始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboar...
    Kepler_II閱讀 1,390評論 0 3
  • 最近項目里面有用到Rxjava框架,感覺很強大的巨作,所以在網(wǎng)上搜了很多相關文章,發(fā)現(xiàn)一片文章很不錯,今天把這篇文...
    Scus閱讀 6,994評論 2 50
  • 我從去年開始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    huqj閱讀 2,000評論 0 21
  • 其它文章 RxJava操作符大全 1、RxJava之一——一次性學會使用RxJava RxJava簡單的使用和使用...
    麻薯帥比閱讀 508評論 0 0
  • 16宿命:用概率思維提高你的勝算 以前的我是風險厭惡者,不喜歡去冒險,但是人生放棄了冒險,也就放棄了無數(shù)的可能。 ...
    yichen大刀閱讀 8,248評論 0 4

友情鏈接更多精彩內(nèi)容