第三方開源庫 RxJava - 基本使用和源碼分析

RxJava 歷史有點(diǎn)悠久,目前最新版是 2.x 的版本,網(wǎng)絡(luò)上有很多關(guān)于 RxJava 的文章, 隨便搜搜一大堆。為什么還要來寫一些文章,畢竟那是別人的東西,并沒有變成我的知識,其次課程具體的內(nèi)容有安排,所以我們還是自己動(dòng)手寫寫吧。還是老套路從源碼的角度出發(fā),當(dāng)然 RxJava 用了這么久,我們應(yīng)該也有自己的一些理解,其實(shí)就是三個(gè)字:事件流

很多人一開始就從觀察者設(shè)計(jì)模式入手去分析,這個(gè)也不說行不通也蠻好的,這里我用事件流的方式來講解一些。到底什么是事件流?你可以想象一條河流最終將涌入大海,那么中間會(huì)經(jīng)過湖泊山川,合并、分流、等等,這就是一個(gè)流,中間會(huì)經(jīng)歷很多,但最終會(huì)流入大海,整個(gè)過程是一條連起來的線。

根據(jù) android 的應(yīng)用來分析,我們打開 app 最終又會(huì)退出 app,那么整個(gè) app 的應(yīng)用我們都可以看成是一個(gè)大的事件流,里面像 Click 點(diǎn)擊事件,權(quán)限申請,線程切換,網(wǎng)絡(luò)訪問,第三方分享登錄等等,也都可以看成是事件流。所以在 RxJava 之后就接連著有很多像 RxAndroid 、RxBus 、RxPermission 這些好用的一些第三方庫,以后肯定還會(huì)有很多基于事件流的第三方庫。那按照你這么說,是不是所有的代碼都可以基于事件流去寫?按理解是這個(gè)樣子的,比如說第三方分享和第三方登錄等等。接下來我通過一個(gè)非常簡單的小事例來講解一下,比如下載圖片加水印顯示到 ImageView 控件上。

當(dāng)然這些是我個(gè)人的理解,你應(yīng)該也有自己的理解。這也是我為什么呼吁大家自己去寫文章的原因,實(shí)在拿不出手就設(shè)為私密的嘛,但終歸自己總結(jié)吸收了一下。就拿設(shè)計(jì)模式來說,你就不再是背了,也不會(huì)拿模式去套,收貨肯定是有的。當(dāng)然我們寫好一篇文章得要花一天左右的時(shí)間,的確需要這么久,不信你可以試試,就看你是不是很閑了。

1.通俗寫法


下載圖片加水印顯示到 ImageView 控件上,我們之前寫代碼應(yīng)該是,開個(gè)線程去下載圖片 Bitmap,給 Bitmap 加上水印,通過 Handler 切換到主線程調(diào)用 ImageView 的 setImageBitmap() 方法我們來看下具體代碼:

public class MainActivity extends AppCompatActivity {

    private ImageView mImageView;
    private Handler mHandler = new Handler() {
        @Override
        public void handleMessage(Message msg) {
            Bitmap bitmap = (Bitmap) msg.obj;
            mImageView.setImageBitmap(bitmap);
        }
    };

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        mImageView = (ImageView) findViewById(R.id.image_view);

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    URL url = new URL("http://img5.imgtn.bdimg.com/it/u=263117812,2521637551&fm=27&gp=0.jpg");
                    HttpURLConnection connection = (HttpURLConnection) url.openConnection();
                    connection.connect();
                    InputStream inputStream = connection.getInputStream();
                    // 通過流解析到 Bitmap
                    Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                    inputStream.close();
                    // 給圖片 Bitmap 加水印
                    bitmap = BitmapUtils.drawText2Bitmap(bitmap,"RxJava");
                    // 通過 Handler 發(fā)送消息切換到主線程
                    Message message = Message.obtain();
                    message.obj = bitmap;
                    mHandler.sendMessage(message);
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        }).start(); 
    }
}

2.事件流寫法


上面代碼運(yùn)行起來了,一看沒毛病挺好的堪稱完美,也不覺得麻煩(作)。接下來我們看下基于事件流的寫法,當(dāng)然這里我們使用 RxJava,先來畫個(gè)流向圖:


事件流向
        Observable.just("http://img5.imgtn.bdimg.com/it/u=263117812,2521637551&fm=27&gp=0.jpg")
                .map(new Function<String, Bitmap>() { // 下載網(wǎng)絡(luò)圖片
                    @Override
                    public Bitmap apply(@NonNull String imagePath) throws Exception {
                        URL url = new URL(imagePath);
                        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
                        connection.connect();
                        InputStream inputStream = connection.getInputStream();
                        Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                        inputStream.close();
                        return bitmap;
                    }
                })
                .map(new Function<Bitmap, Bitmap>() {// 給圖片加水印
                    @Override
                    public Bitmap apply(@NonNull Bitmap bitmap) throws Exception {
                        bitmap = BitmapUtils.drawText2Bitmap(bitmap,"RxJava");
                        return bitmap;
                    }
                })
                .subscribeOn(Schedulers.io())// 上面之前的執(zhí)行在子線程中(線程的調(diào)度)
                .observeOn(AndroidSchedulers.mainThread())// 下面之后的執(zhí)行在主線程中(線程的調(diào)度)
                .subscribe(new Consumer<Bitmap>() {// 顯示圖片
                    @Override
                    public void accept(Bitmap bitmap) throws Exception {
                        imageView.setImageBitmap(bitmap);
                    }
                });

我記得第一次采用 RxJava 的時(shí)候還是很不習(xí)慣的,漸漸用多了就好了,關(guān)鍵還是多用多熟悉 RxJava 的 API 和事件流的思想就好了,上面這樣寫就簡單很多,代碼閱讀起來也是非常清晰的。接下來我們自己動(dòng)手來寫一個(gè)事件流的庫,不是為了重復(fù)造輪子,而是為了更好了解這種編程思想,在這之前我們得先把 RxJava 的源碼走一遍。

3. RxJava 的前奏

        String imageUrl = "http://img5.imgtn.bdimg.com/it/u=263117812,2521637551&fm=27&gp=0.jpg";

        Observable.just(imageUrl)
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        Log.e("TAG","onSubscribe");
                    }

                    @Override
                    public void onNext(@NonNull String s) {
                        Log.e("TAG","s = "+s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.e("TAG","onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.e("TAG","onComplete");
                    }
                });

上面這段代碼只是一個(gè)小事例,代碼的本身不具備任何意義,我們先來看下 Observable 這是我們之前所講的觀察者設(shè)計(jì)模式中的被觀察對象,Observer 是觀察者對象,不知道之前講的我們是否還有印象。只不過這個(gè)有點(diǎn)特別,特別在哪里?我們之前都是首先去訂閱注冊,當(dāng)被觀察者發(fā)生改變時(shí)去通知觀察者發(fā)生改變,但這里是我們只要一訂閱注冊就通知觀察者發(fā)生改變,可以理解為觀察者設(shè)計(jì)模式的變異版本。客觀的角度也說明了我們不要去套和記某一種設(shè)計(jì)模式??聪戮唧w的源碼:

// 返回的是這個(gè) ObservableJust 
public static <T> Observable<T> just(T item) {
    ObjectHelper.requireNonNull(item, "The item is null");
    return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

// 主要看 subscribeActual 方法
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    // value 就是值 ,說具體一點(diǎn)就是上面的 imageUrl 
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        // 調(diào)用 Observer 的 onSubscribe 方法
        s.onSubscribe(sd);
        // 調(diào)用 sd 的 run 方法
        sd.run();
    }

    @Override
    public T call() {
        return value;
    }
}

// 主要看 run 方法
public static final class ScalarDisposable<T>
    extends AtomicInteger
    implements QueueDisposable<T>, Runnable {
         // ...... 省略一些簡單代碼
        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                // 調(diào)用 observer 的 onNext 方法
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    // 調(diào)用 observer 的 onComplete 方法
                    observer.onComplete();
                }
            }
        }
    }

4. RxJava 的線程調(diào)度

Rxjava 到底應(yīng)該怎么做線程調(diào)度切換?我們其實(shí)可以猜一下,料想它也不可能寫出花來。我們最上面的第一種寫法是采用 線程 + Handler,那么我想 RxJava 肯定也是封裝的 線程 + Handler 。而對于線程池和Handler源碼不是特別熟悉的,文章看到這里應(yīng)該可以停一下了,我們得去學(xué)習(xí)了解一下基礎(chǔ)。

.subscribeOn(Schedulers.io())// 上面之前的執(zhí)行在子線程中
.observeOn(AndroidSchedulers.mainThread())// 下面之后的執(zhí)行在主線程中

總共就兩行代碼,前幾年作為小白的我剛開始用的時(shí)候,第一感覺就是真的好神奇??!

// 創(chuàng)建了一個(gè) ObservableSubscribeOn 
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

// 主要看 subscribeActual 方法
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        // 創(chuàng)建了一個(gè) SubscribeOnObserver ,也就是把 SubscribeOnObserver 進(jìn)行了一層包裝
        // 這里其實(shí)就是之前所講的代理設(shè)計(jì)模式
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        // 調(diào)用代理的 Observer 的 onSubscribe 方法
        s.onSubscribe(parent);
        // 把下面這個(gè)代碼變?yōu)閮尚?,容易看懂一點(diǎn)
        // parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        Disposable disposable = scheduler.scheduleDirect(new SubscribeTask(parent));
        parent.setDisposable(disposable);
    }
}
// 看到這里差不多要明白了 implements Runnable 看樣子要開線程的節(jié)奏
final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}
// Schedulers.io() 是它
static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
}
// 單例設(shè)計(jì)模式 - 靜態(tài)內(nèi)部類
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}
// 線程池 + 線程 + Runnable
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    // 創(chuàng)建 線程池
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    // 代理
    DisposeTask task = new DisposeTask(decoratedRun, w);
    // 利用線程池去執(zhí)行任務(wù)
    w.schedule(task, delay, unit);

    return task;
}
// 在子線程中執(zhí)行 source.subscribe(parent); // 又要往上走,這是子線程處理的邏輯

source.subscribe(parent); 執(zhí)行在子線程中,source 是啥這個(gè)不用說了,然后開始往前走,所以這就是子線程的處理部分,其實(shí)挺簡單的。接下來看下主線程的切換:

// 創(chuàng)建了一個(gè) ObservableObserveOn 
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

// 主要還是看 subscribeActual 方法
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
       // ...... 省略部分代碼
       // 創(chuàng)建一個(gè) Scheduler.Worker 
       Scheduler.Worker w = scheduler.createWorker();
       // ObserveOnObserver 
       source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
// 最后的 onNext 是 schedule() 
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
}
// MAIN_THREAD 的 Scheduler 
public final class AndroidSchedulers {
    private static final class MainHolder {
        // new Handler(Looper.getMainLooper()) 創(chuàng)建一個(gè)主線程的 Handler 對象
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }
    
    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
        new Callable<Scheduler>() {
            @Override 
            public Scheduler call() throws Exception {
                return MainHolder.DEFAULT;
            }
    });
}
// Handler 切換到主線程
private static final class HandlerWorker extends Worker {
    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.
        // 但是 handler 并沒有復(fù)寫 handleMessage 方法,那是怎么調(diào)用了方法?一切都在 Handler 源碼中
        handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    }
}

至于像 map 、flatMap、delay、filter 等等我們都可以去看一下源碼,但是注意別太深究細(xì)節(jié),因?yàn)楹芏嗟胤缴婕暗綌?shù)據(jù)結(jié)構(gòu)和算法,有時(shí)候看一些細(xì)節(jié)代碼的確比較頭疼。后面我們還是自己動(dòng)手寫一下,加深一下事件流(響應(yīng)式)編程思想。

所有分享大綱:Android進(jìn)階之旅 - 系統(tǒng)架構(gòu)篇

視頻講解地址:https://pan.baidu.com/s/1jIf7SqU

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容