本篇文章已授權微信公眾號guolin_blog(郭霖)獨家發(fā)布
okhttp一經(jīng)推出,讓其他的網(wǎng)絡請求框架變得黯然失色。網(wǎng)上關于okhttp的介紹文章一大堆,這里我就不繼續(xù)bb了,今天我嘗試從源碼的角度去分析一下okhttp的一個工作流程。其實在動筆寫這篇文章之前,我已經(jīng)在網(wǎng)上看過不少關于okhttp源碼分析的文章,怎么說呢,有些文章,一上來就是一個大總結或者一個UML圖,讓人看得晦澀難懂。對于大部分沒有閱讀過okhttp源碼的人來說,全盤托出的一上來一大堆接口啊類啊什么的,容易把人整暈。今天我嘗試用通俗易懂的語言來描述一下同步、異步的操作流程,以及okhttp背后到底干了啥。
先貼一下 okhttp的github地址:okhttp項目地址。最新的源碼是3.0+,而3.0與舊版本多少還是有些差別的,這里我就不介紹了,感興趣的話自行去百度。
同步請求
// 構建okHttpClient,相當于請求的客戶端,Builder設計模式
OkHttpClient okHttpClient = new OkHttpClient.Builder().readTimeout(5, TimeUnit.SECONDS).build();
// 構建一個請求體,同樣也是Builder設計模式
Request request = new Request.Builder().url("http://www.baidu.com").build();
// 生成一個Call對象,該對象是接口類型,后面會說
Call call = okHttpClient.newCall(request);
try {
// 拿到Response
Response response = call.execute();
Log.i("TAG",response.body().string());
} catch (IOException e) {
}
以上就是一個簡單的同步請求示例代碼,我們來看一看要經(jīng)過哪些步驟:
1.通過Builder模式創(chuàng)建OkHttpClient對象和Request對象
2.調用OkHttpClient的newCall方法,獲取一個Call對象,參數(shù)是Request
3.調用execute方法獲取一個Respone
(一) OkHttpClient源碼分析
public static final class Builder {
Dispatcher dispatcher;
...
...
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
proxySelector = new NullProxySelector();
}
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
callTimeout = 0;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
Builder是OkHttpClient一個靜態(tài)內部類,在Builder的構造函數(shù)中進行了一系列的初始化操作,其中Dispatcher中文是分發(fā)器的意思,和攔截器不同的是分發(fā)器不做事件處理,只做事件流向。他負責將每一次Requst進行分發(fā),壓棧到自己的線程池,并通過調用者自己不同的方式進行異步和同步處理,那具體是怎么操作的呢?后面會講。ConnectionPool是一個連接池對象,它可以用來管理連接對象,從它的構造方法中可以看到連接池的默認空閑連接數(shù)為5個,keepAlive時間為5分鐘。
(二) Request源碼分析
public static class Builder {
@Nullable HttpUrl url;
String method;
Headers.Builder headers;
@Nullable RequestBody body;
public Builder() {
this.method = "GET";
this.headers = new Headers.Builder();
}
}
Builder()構造函數(shù)中設置了默認的請求方法是GET方法,Request類的build()方法是用來創(chuàng)建一個Request對象,將當前的Builder對象傳進去,并完成了對象的賦值。我們來看一下Request的構造函數(shù):
Request(Builder builder) {
this.url = builder.url;
this.method = builder.method;
this.headers = builder.headers.build();
this.body = builder.body;
this.tags = Util.immutableMap(builder.tags);
}
將Builder類的相關屬性賦值給Request的相關屬性,這也是Builder模式的精髓。
(三) Call對象的創(chuàng)建:newCall()執(zhí)行分析
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false);
}
可以看到newCall方法內部調用了RealCall的newRealCall方法,Call是一個接口,RealCall是它的實現(xiàn)類,接下來我們去newRealCall方法內部看看
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
在newRealCall方法里,完成了RealCall對象的創(chuàng)建,并把它返回出去。至此,Call對象已經(jīng)創(chuàng)建完畢,實際上創(chuàng)建的對象是Call的實現(xiàn)類RealCall 對象。
(四) Response對象的創(chuàng)建: call.execute()執(zhí)行分析
前面已經(jīng)說到 Call call = okHttpClient.newCall(request)拿到的是一個RealCall對象,所以我們直接去RealCall類的execute()方法看它的源碼
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
timeout.enter();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
e = timeoutExit(e);
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
在同步代碼塊中,首先判斷excuted是不是true,它的含義是否有在執(zhí)行,如果是,拋出異常,如果沒有執(zhí)行過,將excuted置為true。eventListener.callStart(this)開啟事件監(jiān)聽,eventListener在RealCall對象創(chuàng)建的時候,也一起創(chuàng)建了。
接下來我們看看execute()的核心方法: client.dispatcher().executed(this)、 Response result = getResponseWithInterceptorChain()、client.dispatcher().finished(this)
public Dispatcher dispatcher() {
return dispatcher;
}
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
首先調用了Dispatcher的dispatcher()方法,dispatcher()方法返回一個Dispatcher對象,緊接著調用了Dispatcher的executed方法,往runningSyncCalls對象中添加了一個 call對象,runningSyncCalls是一個存放同步請求的隊列,Dispatcher類中維護了3種類型的請求隊列:
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
- readyAsyncCalls 是異步請求的就緒隊列
- runningAsyncCalls 是異步請求的執(zhí)行隊列
- runningSyncCalls 是同步請求的執(zhí)行隊列
調用完Dispatcher的execute()方法后,緊接著調用了getResponseWithInterceptorChain()方法。
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
該方法返回一個Response對象,我們可以看到,在方法的第一行中就創(chuàng)建了interceptors 集合,然后緊接著放進去很多攔截器對象,此處使用了責任鏈設計模式,依次在攔截器中做相應的操作。
在Response的excute方法的finally模塊中,最后調用了 client.dispatcher().finished(this)。我們點進去瞧一瞧
void finished(RealCall call) {
finished(runningSyncCalls, call);
}
finished方法內部調用了它的重載的方法,并把同步請求的消息隊列對象和RealCall對象傳過去,我們繼續(xù)往下看。
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}
在同步代碼快中,將call對象從同步請求消息隊列中移除,如果移除出問題,就會拋出異常。 緊接著調用了promoteAndExecute方法:
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
同步代碼塊有一個for循環(huán),去迭代readyAsyncCalls,也就是待準備消息隊列。但是我們從前面一步步分析過來,并沒有往readyAsyncCalls添加過數(shù)據(jù),所以當前的for循環(huán)并不會執(zhí)行,之后的一個for循環(huán)也不會執(zhí)行,isRunning返回false.
promoteAndExecute()方法返回false。
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}
并且idleCallback 已經(jīng)完成了初始化,所以會執(zhí)行 idleCallback的run()方法
總結:通過以上的分析,我們發(fā)現(xiàn)在同步請求中Dispatcher主要負責了兩件事,同步請求的保存和移除。
save.png


至此,okhttp3的同步請求源碼已經(jīng)分析完了,接下來,我們看看okhttp3的異步請求源碼分析。
異步請求
// 構建okHttpClient,相當于請求的客戶端,Builder設計模式
OkHttpClient okHttpClient = new OkHttpClient.Builder().readTimeout(5, TimeUnit.SECONDS).build();
// 構建一個請求體,同樣也是Builder設計模式
Request request = new Request.Builder().url("http://www.baidu.com").build();
// 生成一個Call對象,該對象是接口類型,后面會說
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
簡單的看下異步請求的幾個步驟:
1.通過Builder模式創(chuàng)建OkHttpClient對象和Request對象
2.調用OkHttpClient的newCall方法,獲取一個Call對象,參數(shù)是Request
3.調用call對象的enqueue()方法
步驟1和步驟2跟同步請求的步驟一致,主要看一下步驟3
異步請求源碼:call.enqueue() 源碼分析
異步請求和同步請求的步驟1和步驟2一致,都是在做準備工作,并沒有發(fā)起請求,所以這次我們直接忽略了步驟1和步驟2的分析,直接分析步驟3的源碼,我們點開RealCall的enqueue方法:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
我們看看它的核心代碼: client.dispatcher().enqueue(new AsyncCall(responseCallback)),
client.dispatcher()返回一個Dispatcher對象沒什么可講的,緊接著調用Dispatcher的enqueue方法,參數(shù)是AsyncCall對象,我們先看看AsyncCall是什么?
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
private volatile AtomicInteger callsPerHost = new AtomicInteger(0);
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
AtomicInteger callsPerHost() {
return callsPerHost;
}
}
只截取了部分代碼,該類繼承自NamedRunnable,我們看看NamedRunnable:
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
NamedRunnable 實現(xiàn)了Runnable 接口。
我們直接去Dispatcher的enqueue()看看做了哪些操作。
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();
}
在同步代碼塊中,將當前的call請求添加到待準備消息隊列中去,注意這里跟同步請求的區(qū)別,同步請求的時候,并沒有把當前的call添加到準備消息隊列中去。然后又調用了 promoteAndExecute()方法,同步請求的時候也調用了promoteAndExecute()方法
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
此時,readyAsyncCalls不為空了,我們單獨的把這個for循環(huán)拎出來講:
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
這段代碼的含義是:把符合條件的call請求從readyAsyncCalls提升為runningAsyncCalls,我們看這段代碼中的兩個if語句,第一個if語句判斷當前異步請求的執(zhí)行隊列長度大于等于請求最大值,如果滿足直接跳出for循環(huán),maxRequests的值為64,第二個if語句判斷當前執(zhí)行的異步請求隊列中相同主機的請求數(shù)是否大于等于maxRequestsPerHost(每個主機最大請求數(shù),默認為5),如果這兩個條件都不滿足的情況下,把從readyAsyncCalls取出來的call請求,存到臨時的
executableCalls 隊列中去。
緊接著去遍歷executableCalls:
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
從executableCalls獲取AsyncCall對象,并且調用它的executeOn方法,executeOn()方法參數(shù)是executorService(),我們看看executorService():
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
可以看出,該方法是一個同步方法,返回的是一個線程池對象,ThreadPoolExecutor()的第二個參數(shù)傳入了Integer的最大值,即線程池所能容納的最大線程數(shù)為Integer.MAX_VALUE,雖然這里設置了很大的值,但是實際情況下并非會達到最大值,因為上面enqueue()方法中有做了判斷。
回到 asyncCall.executeOn(executorService())這里,executorService返回了一個線程池對象,緊接著調用線程池對象的execute方法,execute()方法傳入實現(xiàn)了Runable接口的AsyncCall對象,前面在分析同步請求的時候,說了AsyncCall實現(xiàn)了Runable接口

ok,現(xiàn)在我們要看看線程池做了什么操作,直接去NamedRunnable的run方法看看做了什么操作。
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
execute()是一個抽象方法,所以我們直接去NamedRunnable的實現(xiàn)類AsyncCall的execute()方法看:
@Override protected void execute() {
boolean signalledCallback = false;
timeout.enter();
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
e = timeoutExit(e);
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
這段代碼才是真正執(zhí)行異步請求的邏輯,getResponseWithInterceptorChain()返回Response對象,然后判斷retryAndFollowUpInterceptor是否取消回調CallBack接口的onFailure()或onResponse()方法,最后finally中,和同步請求的處理一樣,調用了Dispatcher對象的finished()方法。
void finished(RealCall call) {
finished(runningSyncCalls, call);
}
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}
看完promoteAndExecute()方法的具體操作,我們發(fā)現(xiàn),調用eneque的時候會把call請求添加到readyAsyncCalls(異步請求準備隊列)中,而readyAsyncCalls隊列中的請求什么時候執(zhí)行呢,看完promoteAndExecute的代碼就恍然大悟了。
總結:異步請求中,我們先去遍歷異步請求的就緒隊列,并判斷異步請求的執(zhí)行隊列的隊列大小是否小于設置的最大數(shù)的時候,如果條件滿足,把該請求添加到異步請求的執(zhí)行隊列中去,同時把該請求添加到臨時的異步請求的執(zhí)行隊列去中。之后,遍歷這個臨時的異步請求的執(zhí)行隊列,去執(zhí)行AsyncCall的execute()方法。
