一、OkHttp3的簡單實用
// 使用OkHttp至少需要4個類
// 1.OkHttpClient
// 2.Request
// 3.Call->一般是用RealCall
// 4.Response
OkHttpClient client = new OkHttpClient();
void get(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
Call call = client.newCall(request);
Response response = call.execute();
// 獲得響應
ResponseBody body = response.body();
System.out.println(body.string());
}
void post(String url) throws IOException {
RequestBody requestBody = new FormBody.Builder().add("city", "北京")
.add("key", "11111")
.build();
Request request = new Request.Builder()
.url(url)
.post(requestBody)
.build();
// 執(zhí)行網(wǎng)絡同步請求
Call call = client.newCall(request);
Response response = call.execute();
// 獲得響應
ResponseBody body = response.body();
System.out.println(body.string());
}
二、OkHttp3中的線程池配置
OkHttp中的線程池是定義在分發(fā)器中的,即定義在Dispatcher
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
public static ThreadFactory threadFactory(String name, boolean daemon) {
return runnable -> {
Thread result = new Thread(runnable, name);
result.setDaemon(daemon);
return result;
};
}
高并發(fā),最大吞吐量。SynchronousQueue隊列是無容量隊列,
在OkHttp中,配置的線程池的核心線程數(shù)為0,最大線程數(shù)為Integer.MAX_VALUE,線程的存活時間為60s,采用的隊列是SynchronousQueue。
一般線程池
一般的線程池,當加入新任務的時候,首先會進入核心線程,判斷當核心是否達到最大值,如果沒有,則由核心線程直接執(zhí)行;如果核心線程數(shù)達到最大值,那么判斷是否有空閑線程,這些空閑線程是由最大線程數(shù)中已經(jīng)創(chuàng)建的,但是沒有任務在執(zhí)行,則交由空閑線程執(zhí)行任務,如果沒有空閑線程,則任務進入阻塞隊列;當阻塞隊列滿的時候,再有新的任務加入,則判斷當前線程數(shù)是否小于最大線程數(shù),如果是小于,則創(chuàng)建新的線程,然后執(zhí)行新加入的任務。
OkHttp線程池
但是OkHttp3中的線程池,核心線程數(shù)為0,而最大線程數(shù)為Integer.MAX_VALUE,而使用的是SynchronousQueue隊列,這個隊列是無容量隊列,那么往這個隊列中去添加任務,那么一定會添加失敗,那么就會判斷當前線程數(shù)是否小于最大線程數(shù)還是大于等于最大線程數(shù),那么這里一定是小于,所以會創(chuàng)建新的線程,然后執(zhí)行任務;但是如果在加入阻塞隊列之前,有空閑線程,則優(yōu)先交由空閑線程執(zhí)行,因為線程在執(zhí)行完成之后,依然會存活60s才會銷毀。
三、OkHttp的調(diào)用流程

OkHttp的所有的邏輯大部分都是集中在攔截器中,但是在進入攔截器之前還需要依靠分發(fā)器來調(diào)配請求任務。
分發(fā)器:內(nèi)部維護隊列與線程池,完成請求調(diào)配;(Dispatcher)
攔截器:五大默認攔截器完成整個請求過程。(Interceptors)
execute是同步請求,enqueue是異步請求,請求之后就會把請求交給Dispatcher,然后Dispatcher分發(fā)請求交給攔截器,經(jīng)過5個攔截器之后,得到Response
四、Dispatcher分發(fā)器(主要依賴于3.14的源碼)
首先,先看一張基于3.10源碼的分發(fā)器異步工作的流程圖:

這是3.10源碼的Dispatcher的異步工作流程,而在3.14中,這部分略有點區(qū)別,在3.14中,是先將請求直接放入ready隊列中,然后在執(zhí)行的時候,將請求從ready隊列中取出,加入到running隊列中。
OkHttp使用的流程,創(chuàng)建Request,然后通過OkHttpClient.newCall方法獲取對應的Call接口實現(xiàn),再調(diào)用Call的同步或者異步請求方法,這里因為是看異步的,所以直接看enqueue方法。
1.OkHttpClient.newCall
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
OkHttpClient創(chuàng)建的Call實現(xiàn),其實就是創(chuàng)建了RealCall實例,通過調(diào)用RealCall.newRealCall方法創(chuàng)建對應的RealCall實例
2.RealCall.newRealCall
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.transmitter = new Transmitter(client, call);
return call;
}
在獲取到Call實現(xiàn)對象之后,然后調(diào)用Call的execute()或者enqueue()進行同步或者異步請求。
3.RealCall.enqueue
執(zhí)行異步請求
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.callStart();
// RealCall.enqueue異步請求,調(diào)用OkHttpClient中的dispatcher分發(fā)器
// 通過分發(fā)器的enqueue方法,分發(fā)一個封裝了的Callback的異步Call
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
調(diào)用分發(fā)器Dispatcher.enqueue方法,將Callback封裝成一個AsyncCall實例對象。
在分發(fā)器中,異步請求有兩個隊列,一個是readyAsyncCalls準備隊列,一個是runningAsyncCalls執(zhí)行隊列。放在ready準備隊列中,只是在ready隊列中保存,并不會做請求;如果是放在runningAsyncCalls執(zhí)行隊列中,則會把AsyncCall提交到線程池當中去,AsyncCall其實是實現(xiàn)了Runnable接口的一個Task,能夠被線程執(zhí)行的Runnable實現(xiàn)任務。線程池中,接收到任務,就會去執(zhí)行請求任務接口,當任務完成之后,又會調(diào)用分發(fā)器中的finished接口,然后Dispatcher又會調(diào)用promoteCalls方法對ready隊列執(zhí)行一個遍歷,如果ready隊列中的任務滿足條件,那么就會移動到running隊列中被執(zhí)行。
ready隊列和running隊列,是兩個雙向隊列
4.Dispatcher.enqueue(AsyncCall call)
在這個方法中,使用3.14和3.10的源碼做一個對比
OkHttp3.14的源碼:
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
// 這一步是從running隊列和ready隊列中找出已經(jīng)存在的AsyncCall
// 如果這里不為null,則修改新的AtomicInteger
// 使其與同一主機共享現(xiàn)有正在運行的調(diào)用的AtomicInteger
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();
}
// 遍歷準備隊列
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
// 對同一個域名的主機請求個數(shù)+1
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
// 執(zhí)行AsyncCall.executeOn方法,其實就是調(diào)用線程池的execute方法
// 通過線程池中的線程執(zhí)行AsyncCall任務,即執(zhí)行AsyncCall中的execute()方法
asyncCall.executeOn(executorService());
}
return isRunning;
}
OkHttp3.10的源碼
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
3.10和3.14源碼中,針對AsyncCall加入到ready和running隊列中的做法的區(qū)別:
對比3.10和3.14的源碼:在3.14源碼中,會先將AsyncCall加入ready準備隊列中,而不會出現(xiàn)直接加入到running執(zhí)行隊列中的情況,并且判斷是否是webSocket,如果不是,則找出雙向隊列中已經(jīng)存在的對應的host相同的call,并且修改已經(jīng)存在的AsyncCall的AtomicInteger,用以記錄當前同一個host有多少個請求存在;接著調(diào)用promoteAndExecute()方法對準備隊列進行遍歷,在遍歷的過程中,會判斷running隊列中的請求數(shù)是否大于等于最大請求數(shù),即maxRequests=64,如果大于等于64,則break,不加入到running隊列中,如果是小于則執(zhí)行下一個判斷;判斷ready隊列中當前遍歷到的AsyncCall對同一域名主機的請求數(shù)是否大于等于5個,如果是大于等于5個,則也不加入到請求隊列中;那么如果running隊列中正在請求數(shù)小于64個,并且對同一域名主機的請求小于5個,則從ready出隊,加入到running隊列中。
所以想要從ready中將請求加入到running隊列中,需要滿足兩個條件:即running隊列中的正在請求數(shù)小于64,并且對同一域名主機的請求數(shù)小于5個,則加入running隊列中。
而3.10的源碼,則是判斷滿足這兩個條件之后,直接加入到running隊列中,而不是優(yōu)先加入到ready隊列中。
3.10和3.14源碼中,對AsyncCall任務執(zhí)行的區(qū)別:
加入到running隊列后,則通過線程池執(zhí)行任務。在3.14的源碼中,執(zhí)行任務是通過遍歷一個List集合executableCalls,因為3.14源碼中,是先將請求隊列加入到ready隊列中,然后再遍歷放到List集合executableCalls和running隊列中,所以在執(zhí)行的時候,是需要對executableCalls這個List集合進行遍歷,調(diào)用AsyncCall.executeOn(executorService())方法,即遍歷List集合拿到AsyncCall對象,然后調(diào)用該對象的executeOn方法。
而3.10的源碼,是將AsyncCall加入到running隊列中以后,直接調(diào)用executorService().execute(call);方法直接執(zhí)行任務。
3.14源碼這樣做,是因為加入新的AsyncCall的時候,ready隊列中依然有AsyncCall存在,那么就有可能一次性從ready隊列中加入多個AsyncCall到running隊列中,所以遍歷集合。而3.10的做法,是在新的AsyncCall進入以后,如果是被允許加入running隊列中,則直接執(zhí)行,但是這個時候并沒有考慮ready中可能存在能被執(zhí)行的AsyncCall。
running隊列需要滿足的條件:maxRequests和maxRequestsPerHost
maxRequests:說明最大請求并發(fā),即正在請求的數(shù)量是有限制的,默認最大是64
maxRequestsPerHost:同一主機同一域名正在請求的個數(shù)也是有限制的,默認最大是5
這默認是5個,如果同一個手機對同一個服務器的請求個數(shù)太多,對服務器的壓力就更大,所以這里設(shè)置默認5個。這是采用一個volatile修飾的AtomicInteger來計數(shù)。
5.AsyncCall詳解
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
private volatile AtomicInteger callsPerHost = new AtomicInteger(0);
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
AtomicInteger callsPerHost() {
return callsPerHost;
}
void reuseCallsPerHostFrom(AsyncCall other) {
this.callsPerHost = other.callsPerHost;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
/**
* Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
* 3.14:異步執(zhí)行是,用于接收線程池,然后執(zhí)行AsyncCall本身。
*/
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
transmitter.noMoreExchanges(ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
// 這個方法其實就是最終在線程池中的線程中執(zhí)行的run()方法中調(diào)用的
// 所以當AsyncCall任務被執(zhí)行的時候,最終就是執(zhí)行execute()方法
@Override protected void execute() {
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
// 通過攔截器,獲取得到最終的請求結(jié)果
Response response = getResponseWithInterceptorChain();
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} catch (Throwable t) {
cancel();
if (!signalledCallback) {
IOException canceledException = new IOException("canceled due to " + t);
canceledException.addSuppressed(t);
responseCallback.onFailure(RealCall.this, canceledException);
}
throw t;
} finally {
client.dispatcher().finished(this);
}
}
}
因為AsyncCall繼承自NamedRunnable,而NamedRunnable其實就是實現(xiàn)了Runnable接口,并且在NamedRunnable的run方法中,其實就是調(diào)用了NamedRunnable的execute()抽象方法,所以AsyncCall需要實現(xiàn)execute()方法,而線程執(zhí)行其實就是執(zhí)行run方法,所以就是執(zhí)行了AsyncCall.execute()方法,當AsyncCall調(diào)用executeOn方法的時候,其內(nèi)部就會調(diào)用executorService.execute(this);執(zhí)行當前的AsyncCall,這樣就會執(zhí)行AsyncCall.execute()方法。
當AsyncCall.execute()執(zhí)行完成之后,就會調(diào)用OkHttpClient.dispatcher().finished(this);方法,其實就是調(diào)用了Dispatcher.finished方法,目的就是當前running隊列中的一個任務已經(jīng)執(zhí)行結(jié)束的時候,就會再次去從ready隊列中尋找可以被執(zhí)行的任務,加入到running隊列中。
這里的Dispatcher.finished方法,不管當前任務執(zhí)行是否是成功還是失敗,都會執(zhí)行。
6.Dispatcher.finished(AsyncCall call)
void finished(AsyncCall call) {
// 調(diào)整同一個域名的主機的請求個數(shù),減少一
// 因為當前的請求已經(jīng)執(zhí)行結(jié)束,不管是成功還是失敗
call.callsPerHost().decrementAndGet();
finished(runningAsyncCalls, call);
}
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
// 從running中移除AsyncCall任務
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
// 移動隊列,其實就是之前第一次過程中
// 調(diào)用Dispatcher.enqueue方法的過程中,加入到ready隊列以后執(zhí)行的方法
// 即判斷AsyncCall任務是否可以從ready隊列中移動到running隊列中
// 又是判斷maxRequests和maxRequestsPerHost這兩個條件
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}
在Dispatcher.finished方法中,又會再一次執(zhí)行promoteAndExecute()方法,該方法其實就是遍歷ready隊列,查詢可以執(zhí)行的AsyncCall加入到running隊列中,然后調(diào)用這些AsyncCall的executeOn方法執(zhí)行任務。
總結(jié):
3.14的源碼中,分發(fā)器的異步執(zhí)行過程,其實就是先將AsyncCall這個Runnable任務添加到ready準備隊列中,然后遍歷準備隊列,接著判斷running執(zhí)行隊列正在請求數(shù)是否小于64,以及對同一個域名主機的請求數(shù)是否小于5,如果滿足,則將AsyncCall任務添加到running執(zhí)行隊列中,接著遍歷這個時候加入到running執(zhí)行隊列的AsyncCall,對這些AsyncCall任務,分別調(diào)用線程池執(zhí)行AsyncCall任務,而執(zhí)行任務,其實就是執(zhí)行AsyncCall的run方法,而AsyncCall的run方法其實就是執(zhí)行了AsyncCall的execute方法,當一個任務執(zhí)行完成的時候,即execute中最后會調(diào)用Dispatcher.finished()方法,在這里,就會繼續(xù)遍歷準備隊列,將準備隊列中的AsyncCall任務做再一次的條件判斷,判斷是否允許加入到running執(zhí)行隊列中,然后再一次執(zhí)行這些新加入到running執(zhí)行隊列的任務。
在這個過程中,入隊出隊的過程,基本都是采用synchronized同步代碼塊的方式在分發(fā)器中進行。
而最終的請求,其實就是在線程中執(zhí)行AsyncCall任務的時候,調(diào)用的execute方法中調(diào)用RealCall.getResponseWithInterceptorChain()方法,通過n個攔截器,得到最終的Response。
五、Dispatcher的同步執(zhí)行流程
上面的流程,其實都是異步請求的流程,而同步請求,其實分發(fā)器就做一件事情,就是將RealCall添加到running執(zhí)行隊列中。
RealCall.execute()
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.timeoutEnter();
transmitter.callStart();
try {
client.dispatcher().executed(this);
return getResponseWithInterceptorChain();
} finally {
client.dispatcher().finished(this);
}
}
Dispatcher.executed()
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
添加到running執(zhí)行隊列之后,就直接調(diào)用RealCall.getResponseWithInterceptorChain()執(zhí)行請求。
參考:
https://blog.csdn.net/json_it/article/details/78404010
https://blog.csdn.net/hello2mao/article/details/53159151