okhttp,retrofit,rxjava源碼流程解析

okhttp

1.基本使用

初始化可以添加自定義的攔截器

OkHttpClient okHttpClient = new OkHttpClient.Builder()
              .connectTimeout(30, TimeUnit.SECONDS)
              .writeTimeout(30, TimeUnit.SECONDS)
              .readTimeout(30, TimeUnit.SECONDS)
              .addInterceptor(interceptorImpl).builder();//創(chuàng)建OKHttpClient的Builder

使用方法

String url = "http://wwww.baidu.com";
final Request request = new Request.Builder()
        .url(url)
        .get()//默認就是GET請求,可以不寫
        .build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        Log.d(TAG, "onFailure: ");
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {
        Log.d(TAG, "onResponse: " + response.body().string());
    }
});

一般的使用大致就是這樣的

2.從OkHttpClient創(chuàng)建開始入手分析

OkHttpClient.Builder()使用builder模式,用戶可以自定義相應(yīng)的參數(shù)

開發(fā)一般會用到的是

  .connectTimeout(30, TimeUnit.SECONDS)
  .writeTimeout(30, TimeUnit.SECONDS)
  .readTimeout(30, TimeUnit.SECONDS)
  .addInterceptor(interceptorImpl)

連接時間,寫時間,讀時間以及對應(yīng)的Interceptor相關(guān)的攔截器

3.構(gòu)建Request

Request用的也是Builder模式,好處主要是可以動態(tài)配置相應(yīng)的參數(shù)

 Request(Builder builder) {
    this.url = builder.url;
    this.method = builder.method;
    this.headers = builder.headers.build();
    this.body = builder.body;
    this.tags = Util.immutableMap(builder.tags);
  }

tag主要是做標識的,請求返回為null時候的標識操作

4.構(gòu)建Call

構(gòu)建Call,主要是調(diào)用RealCall.newRealCall方法,并在其內(nèi)部添加了一個事件回調(diào)監(jiān)聽

/**
   * Prepares the {@code request} to be executed at some point in the future.
   */
  @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

  //RealCall
  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    //添加一個事件回調(diào),后續(xù)會有用處
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }

而在newRealCall方法中同時也調(diào)用了RealCall的構(gòu)造方法
構(gòu)造方法中加入了RetryAndFollowUpInterceptor重試攔截器,okhttp中加入了很多攔截器,這也是一大特色

 private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
  }

5. 執(zhí)行異步請求enqueue

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

executed以及synchronized主要是用來防止重復(fù)操作和多線程同步用的

接下來的方法

private void captureCallStackTrace() {
    Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
    retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
  }

重試監(jiān)聽器做一些棧StackTrace記錄,以及eventListener.callStart(this);事件監(jiān)聽做回調(diào)處理,不影響流程

接著就到了Dispatcher的enqueue方法

 /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

 synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

Dispatcher中定義三個隊列分別是readyAsyncCalls異步等待,同步運行runningAsyncCalls以及runningSyncCalls異步運行隊列,enqueue方法中,當(dāng)運行異步隊列個數(shù)小于最大請求數(shù)(64)并且同一Host請求個數(shù)小于maxRequestsPerHost(5)則加入異步運行隊列,并且用線程執(zhí)行,否則加入異步等待隊列中,這是okhttp的線程隊列優(yōu)化

6.查看AsyncCall的run方法

AsyncCall 繼承了NamedRunnable,其內(nèi)部會run方法會調(diào)用execute(),代碼如下

 @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

signalledCallback這個標識用來處理是否打印對應(yīng)的日志,這里可以看到Response類,說明網(wǎng)絡(luò)請求是在getResponseWithInterceptorChain中完成的,之后會回調(diào)當(dāng)前的Call狀態(tài)值

7.真正的網(wǎng)絡(luò)請求的getResponseWithInterceptorChain

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    //失敗重試攔截器
    interceptors.add(retryAndFollowUpInterceptor);
    //request和response攔截器
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //緩存攔截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //網(wǎng)絡(luò)請求連接攔截器
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
        //網(wǎng)絡(luò)攔截器
      interceptors.addAll(client.networkInterceptors());
    }
    //實際網(wǎng)絡(luò)請求的攔截器
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

加入各式各樣的攔截器,各個攔截器之間不耦合,易于用戶的自己配置,最后調(diào)用RealInterceptorChain的proceed方法

8.RealInterceptorChain的proceed方法

public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
      HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
      EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
    this.interceptors = interceptors;
    this.connection = connection;
    this.streamAllocation = streamAllocation;
    this.httpCodec = httpCodec;
    this.index = index;
    this.request = request;
    this.call = call;
    this.eventListener = eventListener;
    this.connectTimeout = connectTimeout;
    this.readTimeout = readTimeout;
    this.writeTimeout = writeTimeout;
  }

  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    //...

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // .....

    return response;
  }

構(gòu)造方法中加入了eventListener事件監(jiān)聽,看來okhttp中eventListener的監(jiān)聽一直延伸到這里,還加入了

    this.connectTimeout = connectTimeout;
    this.readTimeout = readTimeout;
    this.writeTimeout = writeTimeout;

連接時間的配置

要重點關(guān)注的是index這個字段,前面?zhèn)鬟M來的時候,默認是0,而在proceed方法中,又重新執(zhí)行了RealInterceptorChain的構(gòu)造方法,并通過 interceptors.get(index)獲取下一個攔截器,并且執(zhí)行interceptor.intercept(next)方法,隨便找一個攔截器看看

public final class BridgeInterceptor implements Interceptor {

  @Override public Response intercept(Chain chain) throws IOException {
    //省略部分代碼
    Response networkResponse = chain.proceed(requestBuilder.build());
    //省略部分代碼
    return responseBuilder.build();
  }
}

攔截器內(nèi)部又重新調(diào)用了chain.proceed的方法,這和遞歸操作類似,也是okHttp最經(jīng)典的責(zé)任鏈模式。

9.同步操作

 @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
  }

同步請求也是通過getResponseWithInterceptorChain來完成的,流程更簡單

10.大致的流程圖

image

總結(jié)

OkHttp 主要是通過 5 個攔截器和 3 個隊列(同步隊列,異步隊列,等待隊列)工作,內(nèi)部實現(xiàn)通過一個責(zé)任鏈模式完成,將網(wǎng)絡(luò)請求的各個階段封裝到各個鏈條中,實現(xiàn)了各層的解耦。具體流程是>1.構(gòu)建OkHttpClient對象,通過RealCall發(fā)起同步或異步請求,而決定是異步還是同步請求的是由線程分發(fā)器dispatcher來決定。
2.當(dāng)發(fā)起同步請求時會將請求加入到同步隊列中開啟子線程并執(zhí)行,當(dāng)發(fā)起異步請求時會創(chuàng)建一個線程池,并且判斷請求隊列是否大于最大請求隊列64,請求主機數(shù)是否大于5,如果大于請求添加到異步等待隊列中,否則添加到異步執(zhí)行隊列并執(zhí)行任務(wù)
3.通過Connection實例執(zhí)行請求,經(jīng)過攔截器鏈后拿到data,然后回調(diào)callback的onresponse返回數(shù)據(jù)。

Dispatcher 功能?

他負責(zé)將每一次Requst進行分發(fā),壓棧到自己的線程池,并通過調(diào)用者自己不同的方式進行異步和同步處理。很好的維護了任務(wù)隊列。

攔截器的作用

**設(shè)置任意數(shù)量的 Intercepter 來對網(wǎng)絡(luò)請求及其響應(yīng)做任何中間處理,比如設(shè)置緩存,Https證書認證,請求加密,過濾請求,打印log等等。

BridgeInterceptor(橋接):為請求添加請求頭,為響應(yīng)添加響應(yīng)頭,完成應(yīng)用層和網(wǎng)絡(luò)層的橋接

RetryAndFollowUpInterceptor(重定向阻):請求失敗重試
CacheInterceptor(緩存):緩存get請求
ConnectInterceptor(連接):內(nèi)部維護一個連接池,負責(zé)連接復(fù)用、創(chuàng)建連接、釋放連接。
CallServerInterceptor(網(wǎng)絡(luò)):真正發(fā)起了網(wǎng)絡(luò)請求。里面用的okio庫,主要是segment的機制運用內(nèi)存共享和復(fù)用,數(shù)據(jù)不需要進行二次copy,盡可能少的去申請內(nèi)存,同時也就降低了GC的頻率。強了流與流交互,優(yōu)化緩存策略減小內(nèi)存壓力和性能消耗。

okhttp底層用的socket通信,而socket底層是tcp/ip傳輸協(xié)議,每次都需要進行三次握手四次揮手過程,而請求過程也經(jīng)常是頻繁的,碎片化的,為了提高網(wǎng)絡(luò)連接的效率,Okhttp3還實現(xiàn)了connectionPool網(wǎng)絡(luò)連接池進行復(fù)用。

okhttp優(yōu)勢

1.支持 http2,對一臺機器的所有請求共享同一個 Socket
2.支持透明的 gzip 壓縮響應(yīng)體
3.請求失敗時自動重試主機的其他 ip,自動重定向
4.響應(yīng)緩存可以完全避免網(wǎng)絡(luò)重復(fù)請求
5.內(nèi)置連接池,支持連接復(fù)用,減少延遲
6.豐富的 API,可擴展性好
7.框架使用了很多設(shè)計模式

Okhttp 運用了設(shè)計模式?

1.構(gòu)造者模式(OkhttpClient,Request 等各種對象的創(chuàng)建)
2.工廠模式(在 Call 接口中,有一個內(nèi)部工廠 realCallFactory 接口。)
3.單例模式(每個 OkHttpClient 對象都管理自己獨有的線程池和連接池。 這一點很多同學(xué),甚至在我經(jīng)歷的團隊中就有人踩過坑, 每一個請求都創(chuàng)建一個 OkHttpClient 導(dǎo)致內(nèi)存爆掉)
5.責(zé)任鏈模式(攔截器的鏈式調(diào)用)降低邏輯的耦合,相互獨立的邏輯寫到自己的攔截器中,也無需關(guān)注其它攔截器所做的事情。還擴展性強,可以添加新的攔截器。*
6.享元模式(Dispatcher 的線程池中實現(xiàn)了對象復(fù)用)

retrofit

1.使用

  • 1、創(chuàng)建HTTP接口首先創(chuàng)建HTTP的API服務(wù)接口,接口下的一個方法對應(yīng)HTTP的一個請求,方法上面的注解表示請求的接口地址部分,返回類型是請求的返回值類型,方法的注解參數(shù)即是請求的參數(shù)。
public interface ApiService {
/**
 * 登錄:
 * @param body
 * @return
 */
@POST("/ny/consumer/login")
Observable<BaseResponse<User>> login(@Body RequestBody body);
}
  • 2、構(gòu)建Retrofit實例配置OkHttpClient實例;設(shè)置HTTP接口的域名地址;添加RxJava2網(wǎng)絡(luò)請求適配器工廠;添加Gson數(shù)據(jù)轉(zhuǎn)換器工廠;
mRetrofit = new Retrofit.Builder()
        .client(sOkHttpClient)
        .baseUrl(HOST)
        .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
        .addConverterFactory(GsonConverterFactory.create())
        .build();
  • 3、生成ApiService的動態(tài)代理通過Retrofit生成動態(tài)代理,用于發(fā)起HTTP請求。
mApiService = sRetrofit.create(ApiService.class);
  • 4、發(fā)起請求:使用動態(tài)代理發(fā)起HTTP請求。
getApiService().login(requestBody);

2、源碼解析

2.1、Retrofit實例的構(gòu)建

Retrofit實例的構(gòu)建使用建造者模式,包括
1、okhttp3.Call.Factory也就是OkHttpClient,因為OkHttpClient實現(xiàn)了okhttp3.Call.Factory,用于發(fā)起請求。
2、Executor 用于提交回調(diào)任務(wù),默認使用Platform.Android的MainThreadExecutor,其實現(xiàn)了Executor接口,并在執(zhí)行回調(diào)中實現(xiàn)了主線程Handler的handler.post(runnable)操作,用于將異步請求的回調(diào)結(jié)果從子線程切換到主線程。

static class MainThreadExecutor implements Executor {
  private final Handler handler = new Handler(Looper.getMainLooper());

  @Override public void execute(Runnable r) {
    handler.post(r);
  }
}

3、List<CallAdapter.Factory>
網(wǎng)絡(luò)請求適配器工廠,默認使用Platform.Android的ExecutorCallAdapterFactory,該網(wǎng)絡(luò)請求適配器工廠適配的網(wǎng)絡(luò)請求是ExecutorCallbackCall。

@Override public @Nullable CallAdapter<?, ?> get(
    Type returnType, Annotation[] annotations, Retrofit retrofit) {
  if (getRawType(returnType) != Call.class) {
    return null;
  }
  final Type responseType = Utils.getCallResponseType(returnType);
  return new CallAdapter<Object, Call<?>>() {
    @Override public Type responseType() {
      return responseType;
    }

    @Override public Call<Object> adapt(Call<Object> call) {
      return new ExecutorCallbackCall<>(callbackExecutor, call);
    }
  };
}

4、List<Converter.Factory> 數(shù)據(jù)轉(zhuǎn)換器工廠,默認使用的是Platform.Android的OptionalConverterFactory,該數(shù)據(jù)轉(zhuǎn)換器工廠使用的是默認的OptionalConverter。

2.2、生成ApiService的動態(tài)代理并發(fā)起HTTP請求

采用動態(tài)代理可以非常靈活地實現(xiàn)解耦,傳入ApiService的Class對象,Proxy提供了用于創(chuàng)建動態(tài)代理對象的靜態(tài)方法,執(zhí)行動態(tài)代理實例的每個方法時都會被替換為執(zhí)行InvocationHandler對象的invoke方法。

public <T> T create(final Class<T> service){
}

InvocationHandler對象的invoke方法中最后調(diào)用的是ServiceMethod的invoke方法:

return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);

也就是HttpServiceMethod實現(xiàn)的invoke方法:
方法中進行了網(wǎng)絡(luò)請求適配器對OkHttpCall進行的適配,也就是說網(wǎng)絡(luò)請求是使用OkHttpCall進行的,但返回類型由網(wǎng)絡(luò)請求適配器進行適配。

@Override ReturnT invoke(Object[] args) {
  return callAdapter.adapt(
      new OkHttpCall<>(requestFactory, args, callFactory, responseConverter));
}

從上文配置請求網(wǎng)絡(luò)適配器工廠我們知道,默認的網(wǎng)絡(luò)請求適配器適配的是ExecutorCallbackCall,故默認使用ExecutorCallbackCall的enqueue做異步網(wǎng)絡(luò)請求:

@Override public void enqueue(final Callback<T> callback) {
  delegate.enqueue(new Callback<T>() {
    @Override public void onResponse(Call<T> call, final Response<T> response) {
      callbackExecutor.execute(new Runnable() {
        @Override public void run() {
          if (delegate.isCanceled()) {
            callback.onFailure(ExecutorCallbackCall.this, new IOException("Canceled"));
          } else {
            callback.onResponse(ExecutorCallbackCall.this, response);
          }
        }
      });
    }

    @Override public void onFailure(Call<T> call, final Throwable t) {
      callbackExecutor.execute(new Runnable() {
        @Override public void run() {
          callback.onFailure(ExecutorCallbackCall.this, t);
        }
      });
    }
  });
}

這里的delegate就是OkHttpCall實例,callbackExecutor就是MainThreadExecutor實例,OkHttpCall異步請求回調(diào)后,使用MainThreadExecutor提交回調(diào)任務(wù),該任務(wù)執(zhí)行的就是在異步請求回調(diào)的子線程中將異步請求的回調(diào)結(jié)果從子線程切換到主線程。
其中OkHttpCall的異步請求方法中,調(diào)用的就是okhttp3.Call的異步請求,回調(diào)結(jié)果中會使用Converter<ResponseBody, T>對數(shù)據(jù)進行轉(zhuǎn)換并返回。

T body = responseConverter.convert(catchingBody);

2.3、RxJava網(wǎng)絡(luò)請求適配器工廠

使用RxJava2CallAdapter做網(wǎng)絡(luò)請求適配器,將Call轉(zhuǎn)換為Observable<Response>。

@Override public Object adapt(Call<R> call) {
  Observable<Response<R>> responseObservable = isAsync
      ? new CallEnqueueObservable<>(call)
      : new CallExecuteObservable<>(call);

  Observable<?> observable;
  if (isResult) {
    observable = new ResultObservable<>(responseObservable);
  } else if (isBody) {
    observable = new BodyObservable<>(responseObservable);
  } else {
    observable = responseObservable;
  }

  if (scheduler != null) {
    observable = observable.subscribeOn(scheduler);
  }

  if (isFlowable) {
    return observable.toFlowable(BackpressureStrategy.LATEST);
  }
  if (isSingle) {
    return observable.singleOrError();
  }
  if (isMaybe) {
    return observable.singleElement();
  }
  if (isCompletable) {
    return observable.ignoreElements();
  }
  return observable;
}

2.4、Gson數(shù)據(jù)轉(zhuǎn)換器工廠
使用GsonResponseBodyConverter做數(shù)據(jù)轉(zhuǎn)換器,將ResponseBody轉(zhuǎn)換為T。

@Override public T convert(ResponseBody value) throws IOException {
  JsonReader jsonReader = gson.newJsonReader(value.charStream());
  try {
    T result = adapter.read(jsonReader);
    if (jsonReader.peek() != JsonToken.END_DOCUMENT) {
      throw new JsonIOException("JSON document was not fully consumed.");
    }
    return result;
  } finally {
    value.close();
  }
}

2.5 流程圖

image.png

總結(jié)

定義:Retrofit就是一個網(wǎng)絡(luò)請求框架的封裝,通過java接口以及注解來描述網(wǎng)絡(luò)請求,并用動態(tài)代理的方式在調(diào)用接口方法時注入自己的方法,本身只是簡化了用戶網(wǎng)絡(luò)請求的參數(shù)配置等,底層的網(wǎng)絡(luò)請求還是Okhttp,請求完成后將返回的response通過converterFactorty轉(zhuǎn)換成相應(yīng)的數(shù)據(jù)model,最后通過calladapter轉(zhuǎn)換成其他數(shù)據(jù)類型,比如Rxjava的 Observable)。所以可以很好的與Rxjava相結(jié)合,使用起來簡潔方便

代理模式:通過訪問代理對象的方式來間接訪問目標對象

分為靜態(tài)代理 & 動態(tài)代理:
靜態(tài)代理:代理類在程序運行前已經(jīng)存在的代理方式
動態(tài)代理:代理類在程序運行前不存在、運行時由程序動態(tài)生成的代理方式
retrofit通過Proxy.newProxyInstance產(chǎn)生的代理類,當(dāng)調(diào)用接口方法時都會調(diào)用InvocationHandler#invoke方法得到Http請求鏈接、請求方法、請求路徑、請求參數(shù)等請求信息,構(gòu)建一個OkHttp的請求并執(zhí)行。

Retrofit 優(yōu)點

1.可以配置不同 HTTP client 來實現(xiàn)網(wǎng)絡(luò)請求,如 okhttp、httpclient 等;
2.請求的方法參數(shù)注解都可以定制;
3.支持同步、異步和 RxJava;
4.超級解耦;
5.可以配置不同的反序列化工具來解析數(shù)據(jù),如 json、xml;
6.框架使用了很多設(shè)計模式:代理模式,構(gòu)造者模式,工廠模式,適配器模式,觀察者模式,外觀模式。

RXjava:

RxJava采取的是觀察者模式,使用時要先分別創(chuàng)建一個觀察者Observer或Subscriber處理接收到數(shù)據(jù)后要做的處理(onNext,onError,onCompleted),一個被觀察者Observable用來發(fā)送要處理的數(shù)據(jù),最后由被觀察者訂閱觀察者(subscribe),這時要發(fā)送的數(shù)據(jù)就會由被觀察發(fā)出,然后觀察者做出相應(yīng)處理。用代碼來簡單描述為:

//創(chuàng)建被觀察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("hello world");
            }
});
//創(chuàng)建觀察者
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {
                //completed
            }

            @Override
            public void onError(Throwable e) {
                //error
            }

            @Override
            public void onNext(String s) {
                //do it
            }
};
//訂閱事件
observable.subscribe(subscriber);

這里有個問題,為什么是被觀察者訂閱觀察者而不觀察者訂閱被觀察者呢?我認為應(yīng)該是這樣:被觀察者,即發(fā)送數(shù)據(jù)方,他的數(shù)據(jù)可以發(fā)送給多個觀察者,即可以有多個觀察者觀察他,因此他是占據(jù)主導(dǎo)權(quán)的,他想讓哪個觀察者看就訂閱哪個觀察者把數(shù)據(jù)發(fā)給他。

當(dāng)然,觀察者,被觀察者及訂閱的代碼還有很多簡單的書寫方式,如直接使用just()等方法發(fā)送數(shù)據(jù),不創(chuàng)建觀察者而是在subscribe()方法中傳遞幾個Action()方法等等,在這里我只是展示了最基本的一套用法用來比較清晰地梳理一下工作流程。除此之外,RxJava還可以切換線程,可以對數(shù)據(jù)進行變換,這些都是在訂閱過程中完成的,代碼如下:

observable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Func1<String,Integer>() {
                    @Override
                    public Integer call(String s) {
                        //do String --> Integer
                        return 0;
                    }
                })
                .filter(new Func1<Integer,Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        //return your boolean
                        return integer>10;
                    }
                })
                .subscribe(subscriber);

其中,在切換線程時,subscribeOn指定subscribe發(fā)生的線程,observeOn指定Subscriber的回調(diào)發(fā)生的線程,其他操作符如過濾、變換操作符可以自己琢磨一下。

以上,就是RxJava大體上的使用流程,接下來我將從源碼角度看一下他的實際工作過程。

工作原理

1.創(chuàng)建Observable

首先看看Observable里面有哪些變量:

final OnSubscribe<T> onSubscribe;

然后看看create的源碼:

public final static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
}

通過傳入一個OnSubscribe對象,將其作為參數(shù)傳入hook.onCreate()方法,將返回值作為參數(shù)構(gòu)造一個Observable對象。這里的 hook 是一個static的RxJavaObservableExecutionHook對象,他的create()方法是這樣的:

public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
        return f;
}

他接受一個OnSubscribe對象,然后就將他返回。hook.create(f) 可以等價的看作是 f 本身,Observable 的構(gòu)造器接受的就是一個OnSubscribe對象了,看看他的構(gòu)造器:

protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
}

這里將傳入的OnSubscribe對象賦給了自己的onSubscribe。

分析完了,創(chuàng)建Observable就是給他一個OnSubscribe對象,把他傳入構(gòu)造器創(chuàng)建一個Observable對象。那么OnSubscribe是什么?

2.OnSubscribe是什么?

看看OnSubscribe源碼:

/**
     * Invoked when Observable.subscribe is called.
     */
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>>{
        // cover for generics insanity
}

原來是繼承自 Action1 的一個接口,注釋說他在subscribe被調(diào)用的時候喚醒,OnSubscribe應(yīng)該就是所謂的“事務(wù)”,他的call方法負責(zé)發(fā)起事務(wù),即notifyObservers()。結(jié)合前面分析的使用過程,在創(chuàng)建Observable時傳入的OnSubscribe中實現(xiàn)了call方法并且執(zhí)行了subscriber的一些方法。

3.創(chuàng)建觀察者Observer/Subscriber

首先看Observer的定義:

public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}

只是一個簡單的接口,再看看Subscriber:

public abstract class Subscriber<T> implements Observer<T>,Subscription{

Subscriber是一個實現(xiàn)了Observer接口的抽象類,并且還擴充了許多方法。既然如此,那我們在使用RxJava時就應(yīng)當(dāng)盡量用Subscriber代替Observer了。

看看他有哪些屬性:

// represents requested not set yet
    private static final Long NOT_SET = Long.MIN_VALUE;

    private final SubscriptionList subscriptions;
    private final Subscriber<?> subscriber;
    /* protected by `this` */
    private Producer producer;
    /* protected by `this` */
    private long requested = NOT_SET; // default to not set

他持有一個自己的引用,一個SubscriptionList引用。分析一下,創(chuàng)建一個觀察者Subscriber,就必須要實現(xiàn)來自O(shè)bserver接口的三個方法:onNext(), onError(), onCompleted()。

4.訂閱subscribe

創(chuàng)建好觀察者和被觀察者之后,就可以進行訂閱了:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
}

在subscribe方法中又調(diào)用了Observer的一個私有方法:

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("observer can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
}

        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See Guideline 6.4: Protect calls to user code from within an operator · Issue #216 · ReactiveX/RxJava for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would 
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
}

精簡一下這個方法,逐步分析。首先,對傳入的subscriber對象和observable.onSubscribe方法判空,然后執(zhí)行了sunscriber的start()方法,之后對subscriber做了安全性封裝:

if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
}

所有檢測完畢,開始執(zhí)行下列方法:

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);

又看到那個熟悉的hook了,看看他的onSubscribeStart方法是怎樣實現(xiàn)的:

public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass-thru by default
        return onSubscribe;
}

傳給了observer對象和他的onSubscribe對象,結(jié)果直接把后者返回了……有趣的設(shè)計,返回之后又繼續(xù)調(diào)用了onSubscribe對象的call()方法,并傳入了subscriber對象。

5.Subscription

在分析訂閱部分代碼時,我發(fā)現(xiàn)了subscribe()方法完成后,執(zhí)行了Subscriptions的unsubscribed()方法并返回。這個Subscription是什么呢?

/**
 * Subscription returns from {@link Observable#subscribe(Subscriber)} to allow unsubscribing.
 * <p>
 * See the utilities in {@link Subscriptions} and the implementations in the {@code rx.subscriptions} package.
 * <p>
 * This interface is the RxJava equivalent of {@code IDisposable} in Microsoft's Rx implementation.
 */
public interface Subscription {

    /**
     * Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
     * was received.
     * <p>
     * This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
     * onCompleted is called).
     */
    void unsubscribe();

    /**
     * Indicates whether this {@code Subscription} is currently unsubscribed.
     *
     * @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
     */
    boolean isUnsubscribed();

}

一直說觀察者模式中要訂閱,要訂閱,怎么取消訂閱呢?原來就在這里。我的理解,Subscription可以理解為一件訂閱事務(wù),他有一個取消訂閱和檢測是否取消的方法。每一個訂閱事件,最后是可以返回這樣一個subscription對象的。我們完全可以把這個對象收集起來,在需要的時候?qū)⑺∠嗛?。例如像下面這樣:

private CompositeSubscription subscriptions = new CompositeSubscription();
//創(chuàng)建一個異步任務(wù)
subscriptions.add(subscription);//將subscription加入合集中
subscriptions.unsubscribe();//取消訂閱

6.變換:map()

RxJava的操作符很多,我這里只選一個最基礎(chǔ)的map來看看,首先看map代碼如下:

//這段代碼是我在1.1版本中分析的,在1.2.2中已經(jīng)更改了實現(xiàn)方式,多謝評論區(qū)提醒~
public final <R> Observable<R> map(Func1<? super T, ? extends R> func){
        return lift(new OperatorMap<T, R>(func));
}

在內(nèi)部調(diào)用了lift()方法,并將結(jié)果返回了??梢钥吹剑儞Q的過程中,將包含T在內(nèi)的T的基類變換為了包含R在內(nèi)的R的子類,所以這里重點要看兩個地方,一是lift()如何實現(xiàn),二是OperatorMap是什么。先看看lift()方法的實現(xiàn):

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators 
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        Exceptions.throwIfFatal(e);
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don't have the operator available to us
                    o.onError(e);
                }
            }
        });
    }

可以看到,在lift內(nèi)部用類型R又重新創(chuàng)建了一個Observable。注意觀察,這里的代碼和調(diào)用subscribe()時很像,但又不同。對比一下發(fā)現(xiàn),在subscribe()時,是由Observable自己的OnSubscribe調(diào)用了call()方法,并將自己的subscriber作為參數(shù)傳入call()。但是在這里,通過一個新的OnSubscribe創(chuàng)建了一個新的Observable,在內(nèi)部先創(chuàng)建了一個新的Subscriber,然后由舊的onSubscribe調(diào)用自己的call()方法,這里傳入的又是新的Subscriber。新舊之間的關(guān)聯(lián)就在于新的SUbscriber創(chuàng)建的過程:

Subscriber<? super T> st = hook.onLift(operator).call(o);

可以看到,創(chuàng)建新的Subscriber時用到了我們傳入的operator,看看hook的lift()實現(xiàn):

public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> lift) {
        return lift;
}

把傳入的operator又原樣返回了。那么前面的代碼就可以簡化為operator直接調(diào)用了call()方法。我們自己寫的變換代碼就是實現(xiàn)了這個operator的call()方法。

.map(new Func1<String,Integer>() {
                    @Override
                    public Integer call(String s) {
                        //do String --> Integer
                        return 0;
                    }
                })

看看前面的代碼,我們傳入的這個Func1,在內(nèi)部會由他創(chuàng)建一個OperatorMap,然后將OperatorMap傳入了lift(),這個OperatorMap就是我們剛才講的operator的來源。

再來看看OperatorMap是什么:

public final class OperatorMap<T, R> implements Operator<R, T> {

    private final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }

        };
    }

}

可以看到,當(dāng)構(gòu)造一個OperatorMap時,傳入了一個func,在OperatorMap構(gòu)造器中,是將其賦給了自己記得一個叫做transformer的屬性,這個transformer是一個Func1對象,因此我們的實現(xiàn)變換的主要細節(jié)其實就在于這個Func1。

7.變換:compose()

除了最基礎(chǔ)的mao進行變換外,我們常用的還有compose變換,看看他是怎么實現(xiàn)的:

public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
        return ((Transformer<T, R>) transformer).call(this);
}

通過compose變換時,傳入的是一個transfomer,最后調(diào)用的是他的call()方法。transfomer就是前面Map變換中用到的那個,綜合來看,在RxJava中進行變換時,是通過創(chuàng)建新的Observable進行代理來實現(xiàn)的,而具體實現(xiàn)細節(jié)使用了transformer。

8.線程切換:subscrieOn()

subscribeOn()指定了subscribe()所發(fā)生的線程,看看他是怎樣實現(xiàn)的:

public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}

先調(diào)用了nest(),返回一個Observable對象,然后調(diào)用lift()進行變換,進行變換時傳入的是一個由線程調(diào)度器scheduler構(gòu)造的OperatorSubscribeOn對象。先看看nest中發(fā)生了什么:

public final Observable<Observable<T>> nest() {
        return just(this);
}

把自己傳給了just()方法:

public final static <T> Observable<T> just(final T value) {
        return ScalarSynchronousObservable.create(value);
}

這個ScalarSynchronousObservable是繼承自O(shè)bservabel的,到頭來還是調(diào)用create()創(chuàng)建了一個Observable對象。lift()方法前面已經(jīng)分析過了,lift()中創(chuàng)建了一個新的Observable,這里不同的地方在于傳入的是一個線程調(diào)度器scheduler而非OperatorMap,所以線程調(diào)度的具體實現(xiàn)應(yīng)該就是由scheduler和OperatorSubscribeOn來決定的了。那么接下來就看看OperatorSubscribeOn是如何實現(xiàn)線程控制的。首先根據(jù)上面的分析,這里傳入了一個scheduler對象給構(gòu)造器,點進來看看他的實現(xiàn),:

public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {

    private final Scheduler scheduler;

    public OperatorSubscribeOn(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        return new Subscriber<Observable<T>>(subscriber) {

            @Override
            public void onCompleted() {
                // ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }

            @Override
            public void onNext(final Observable<T> o) {
                inner.schedule(new Action0() {

                    @Override
                    public void call() {
                        final Thread t = Thread.currentThread();
                        o.unsafeSubscribe(new Subscriber<T>(subscriber) {

                            @Override
                            public void onCompleted() {
                                subscriber.onCompleted();
                            }

                            @Override
                            public void onError(Throwable e) {
                                subscriber.onError(e);
                            }

                            @Override
                            public void onNext(T t) {
                                subscriber.onNext(t);
                            }

                            @Override
                            public void setProducer(final Producer producer) {
                                subscriber.setProducer(new Producer() {

                                    @Override
                                    public void request(final long n) {
                                        if (Thread.currentThread() == t) {
                                            // don't schedule if we're already on the thread (primarily for first setProducer call)
                                            // see unit test 'testSetProducerSynchronousRequest' for more context on this
                                            producer.request(n);
                                        } else {
                                            inner.schedule(new Action0() {

                                                @Override
                                                public void call() {
                                                    producer.request(n);
                                                }
                                            });
                                        }
                                    }

                                });
                            }

                        });
                    }
                });
            }

        };
    }
}

可以看到,在構(gòu)造器中,傳入的scheduler賦給了自己的scheduler,然后在call方法中,通過scheduler創(chuàng)建了一個worker對象,名叫inner,之后的所有操作都是由inner完成的??偨Y(jié)一下,就是傳入的scheduler創(chuàng)建了一個worker對象,由這個對象進行了實際上的線程控制。所以線程控制的關(guān)鍵就在于這個scheduler。而scheduler就是我們在使用過程中傳入的http://Schedulers.io()等,這里就拿http://Schedulers.io()看看。

public static Scheduler io() {
        return INSTANCE.ioScheduler;
}

再看看Schedulers類的構(gòu)造器,可以知道INSTANCE.ioSchduler是在構(gòu)造器中進行初始化的:

Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler();
        if (io != null) {
            ioScheduler = io;
        } else {
            ioScheduler = new CachedThreadScheduler();
        }

再結(jié)合:

/**
     * Scheduler to return from {@link rx.schedulers.Schedulers#io()} or null if default should be used.
     *
     * This instance should be or behave like a stateless singleton;
     */
    public Scheduler getIOScheduler() {
        return null;
    }

可知,ioScheduler是由CacheThreadSchduler這個類創(chuàng)建的,這個類繼承自Scheduler,那么也就是說抽象類Scheduler的createWorker()方法由子類CacheThreadSchduler實現(xiàn)了。那就來看看這個方法具體的實現(xiàn):

@Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

可以看到,createWorker()方法返回了一個EventLoopWorker對象。而這個類是CacheThreadSchduler類的內(nèi)部類?;貞浺幌拢拔覀儎?chuàng)建好worker之后,他是如何工作的?:

inner.schedule(new Action0() {...};

是有這個worker對象調(diào)用了schedule()方法,并且傳入了一個Action0。那么就來看看worker的源頭,EventLoopWorker,在schedule()時做了什么:

private static final class EventLoopWorker extends Scheduler.Worker {
        /* 省略部分代碼*/
        @Override
        public Subscription schedule(Action0 action) {
            return schedule(action, 0, null);
        }

        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            if (innerSubscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed
                return Subscriptions.unsubscribed();
            }

            ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
            innerSubscription.add(s);
            s.addParent(innerSubscription);
            return s;
        }
    }

可以看到,最終實際上是調(diào)用了thread.scheduleActual()方法,并將action傳給了他,返回一個 ScheduledAction 對象。那么看看這個方法內(nèi)部是如何實現(xiàn)的:

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        Action0 decoratedAction = schedulersHook.onSchedule(action);
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }

創(chuàng)建了一個ScheduledAction的對象,并將其返回,而ScheduledAction類是實現(xiàn)了Runnable接口的:

public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {

因此,具體対線程的操作就是在這里了??偨Y(jié)一下,SubscribeOn() 是通過 life() 變換來完成的,而在變換中實際上是通過 CachedThreadScheduler 類提供的 schedule() 方法,用Runnable來完成的線程控制。

9.線程切換:observeOn()

和 subscribeOn() 方法一樣,observeOn() 方法實現(xiàn)原理也是通過 lift() 變換:

public final Observable<T> observeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler));
}

類似的,這里也是傳入了一個Operator,不同的是這里傳入的是通過scheduler創(chuàng)建的OperatorObserveOn對象。先來看看OperatorObserveOn的構(gòu)造器:

public OperatorObserveOn(Scheduler scheduler) {
        this.scheduler = scheduler;
}

類似的,將傳入的 scheduler 賦給了自己的 scheduler 屬性。這個scheduler 在哪里用到了呢?首先是在回調(diào)方法 call() 中:

@Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
            parent.init();
            return parent;
        }
}

通過scheduler創(chuàng)建了一個 ObserveOnSubscriber 對象 parent ,并調(diào)用了 init() 方法。這個 ObserveOnSubscriber 類是一個內(nèi)部類,他的構(gòu)造器:

public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
            if (UnsafeAccess.isUnsafeAvailable()) {
                queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
            } else {
                queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
            }
            this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
}

在這里就調(diào)用了 scheduler 的 createWorker() 方法,并將返回結(jié)果賦給了自己的 recursiveScheduler ,然后由他創(chuàng)建了 ScheduledUnsubscribe 對象,將這個對象賦給了 scheduledUnsubscribe。好像有點亂,大概理一下,這里創(chuàng)建了一個 Subscriber 對象,在內(nèi)部做了一些初始化的操作,而這個Subscriber 對象實際上就是 ObserveOnSubscriber 的對象。觀察 ObserveOnSubscriber 類:

@Override
        public void onNext(final T t) {
            if (isUnsubscribed()) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }

        @Override
        public void onCompleted() {
            if (isUnsubscribed() || finished) {
                return;
            }
            finished = true;
            schedule();
        }

        @Override
        public void onError(final Throwable e) {
            if (isUnsubscribed() || finished) {
                return;
            }
            error = e;
            // unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
            unsubscribe();
            finished = true;
            // polling thread should skip any onNext still in the queue
            schedule();
        }

會發(fā)現(xiàn),在這三個被調(diào)用的方法中都會調(diào)用 schedule() 方法,而 schedule() 方法的實現(xiàn)是這樣的:

protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(action);
            }
}

注意,這個 recursiveScheduler 就是前面創(chuàng)建的worker。所以控制線程切換的關(guān)鍵還是在于傳入的 scheduler及他所創(chuàng)建的 worker 和worker的 schedule() 方法。傳入的 scheduler 有很多種,就拿 AndroidSchedulers.mainThread() 來說:

public final class AndroidSchedulers {
    private AndroidSchedulers() {
        throw new AssertionError("No instances");
    }

    // See Unit testing support for AndroidSchedulers · Issue #238 · ReactiveX/RxAndroid
    // Initialization-on-demand holder idiom
    private static class MainThreadSchedulerHolder {
        static final Scheduler MAIN_THREAD_SCHEDULER =
                new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    /** A {@link Scheduler} which executes actions on the Android UI thread. */
    public static Scheduler mainThread() {
        Scheduler scheduler =
                RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();
        return scheduler != null ? scheduler : MainThreadSchedulerHolder.MAIN_THREAD_SCHEDULER;
    }
}

可以看到,scheduler就是一個 HandlerScheduler 對象,看HandlerScheduler類的實現(xiàn):

public final class HandlerScheduler extends Scheduler {
    /*省略部分代碼*/

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }
        /*省略部分代碼*/
    }
}

HandlerScheduler 類也繼承了 Scheduler ,他的createWorker() 創(chuàng)建了一個HandlerWorker 對象。所以前面創(chuàng)建的worker其實就是 HandlerWorker。HandlerWorker 類是HandlerScheduler 的內(nèi)部類,他的schedule 方法:

static class HandlerWorker extends Worker {

        private final Handler handler;

        private final CompositeSubscription compositeSubscription = new CompositeSubscription();

        HandlerWorker(Handler handler) {
            this.handler = handler;
        }

        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            if (compositeSubscription.isUnsubscribed()) {
                return Subscriptions.unsubscribed();
            }

            action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action);

            final ScheduledAction scheduledAction = new ScheduledAction(action);
            scheduledAction.addParent(compositeSubscription);
            compositeSubscription.add(scheduledAction);

            handler.postDelayed(scheduledAction, unit.toMillis(delayTime));

            scheduledAction.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    handler.removeCallbacks(scheduledAction);
                }
            }));

            return scheduledAction;
        }

        @Override
        public Subscription schedule(final Action0 action) {
            return schedule(action, 0, TimeUnit.MILLISECONDS);
        }

可以看到,在 schedule 內(nèi)部還是創(chuàng)建了一個 ScheduledAction 對象,之后所有的操作都有他來完成。由前面分析可知,ScheduledAction 類實現(xiàn)了 Runnable。所以歸根結(jié)底,兩個線程控制都是由 Runnable 來實現(xiàn)的。

總結(jié)

RxJava 是一個 基于事件流、實現(xiàn)異步操作的庫

原理:被觀察者 (Observable) 通過 訂閱(Subscribe) 按順序發(fā)送事件 給觀察者 (Observer)
觀察者(Observer) 按順序接收事件 & 作出對應(yīng)的響應(yīng)動作

Scheduler的原理(線程切換)

首先是observeOn切換線程,他根據(jù)傳入的參數(shù)(newThread(),io(),singleThread()、AndroidmainThread可以生成不同的線程,它在調(diào)用onNext方法中執(zhí)行schedule方法內(nèi)部有一個work對象實現(xiàn)了runnable接口,完成了線程切換。但如果是AndroidmainThread,則由handle發(fā)送postdeply完成到主線程的切換。subscribeOn切換線程與observeOn類似,但是observeOn是改變它所在線程所以每次切換都有效,而subscribeOn是改變數(shù)據(jù)源的運行線程,只在第一次有效,后續(xù)切換都無效,因為subscribeOn自下而上每次在指定線程中向上級訂閱,下次再執(zhí)行subscribeOn只會在改變的線程里進行,用戶感受不到線程切換。

優(yōu)點

1)采用鏈式調(diào)用,代碼簡潔優(yōu)雅有美感,并且可讀性增強。
2)rxjava中采用觀察者模式。模塊之間劃定了清晰的界限,降低了模塊間的耦合性,提高了代碼的可維護性和重用性。
3)rxjava中提供了強大的操作符。
:just:將同種數(shù)據(jù)源組合放到被觀察者上面
from:將類似數(shù)組、集合的數(shù)據(jù)源放到被觀察者上面
map:將一種數(shù)據(jù)源轉(zhuǎn)化成另外一種,可以是任意類型變換是1對1
flatmap:將一種數(shù)據(jù)源轉(zhuǎn)化成另外一種數(shù)據(jù),返回ObservableSource對象??梢詫?shù)據(jù)進行一對多,多對多的變換。flatMap并不保證數(shù)據(jù)有序。
zip:處理多種不同結(jié)果集的數(shù)據(jù)發(fā)射,一般用得多的地方是多個網(wǎng)絡(luò)請求組合然后統(tǒng)一處理業(yè)務(wù)邏輯。
除此之外還有經(jīng)常用到compose操作符,因為rxjava發(fā)布訂閱如果沒及時取消會內(nèi)存泄漏,通過compose與rxlivercycle配合使用綁定容器生命周期。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容