引言
? ?select/poll、epoll這些詞匯相信諸位都不陌生,因?yàn)樵?code>Redis/Nginx/Netty等一些高性能技術(shù)棧的底層原理中,大家應(yīng)該都見過它們的身影,接下來重點(diǎn)講解這塊內(nèi)容,不過在此之前,先上一張圖概述Java-NIO的整體結(jié)構(gòu):

觀察上述結(jié)構(gòu),其實(shí)
Buffer、Channel的定義并不算復(fù)雜,僅是單純的三層結(jié)構(gòu),因此對(duì)于源碼這塊不再去剖析,有興趣的根據(jù)給出的目錄結(jié)構(gòu)去調(diào)試源碼,自然也能摸透其原理實(shí)現(xiàn)。
而最關(guān)鍵的是Selector選擇器,它是整個(gè)NIO體系中較為復(fù)雜的一塊內(nèi)容,同時(shí)它也作為Java-NIO與內(nèi)核多路復(fù)用模型的“中間者”,但在上述體系中,卻出現(xiàn)了之前未曾提及過的SelectorProvider系定義,那么它的作用是干嘛的呢?主要目的是用于創(chuàng)建選擇器,在Java中創(chuàng)建一般是通過如下方式:
// 創(chuàng)建Selector選擇器
Selector selector = Selector.open();
// Selector類 → open()方法
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
從源碼中可明顯得知,選擇器最終是由SelectorProvider去進(jìn)行實(shí)例化,不過值得一提的是:Selector的實(shí)現(xiàn)是基于工廠模式與SPI機(jī)制構(gòu)建的。對(duì)于不同OS而言,其對(duì)應(yīng)的具體實(shí)現(xiàn)并不相同,因此在Windows系統(tǒng)下,我們只能觀測(cè)到WindowsSelectorXXX這一系列的實(shí)現(xiàn),而在Linux系統(tǒng)時(shí),對(duì)于的則是EPollSelectorXXX這一系列的實(shí)現(xiàn),所以要牢記的是,Java-NIO在不同操作系統(tǒng)的環(huán)境中,提供了不同的實(shí)現(xiàn),如下:
-
Windows:select -
Unix:poll -
Mac:kqueue -
Linux:epoll
當(dāng)然,本次則重點(diǎn)剖析Linux系統(tǒng)下的select、poll、epoll的具體實(shí)現(xiàn),對(duì)于其他系統(tǒng)而言,原理大致相同。
一、JDK層面的源碼入口
? ?簡(jiǎn)單的對(duì)于Java-NIO體系有了全面認(rèn)知后,接下來以JDK源碼作為入口進(jìn)行剖析。在Java中,會(huì)通過Selector.select()方法去監(jiān)聽事件是否被觸發(fā),如下:
// 輪詢監(jiān)聽選擇器上注冊(cè)的通道是否有事件被觸發(fā)
while (selector.select() > 0){}
// Selector抽象類 → select()抽象方法
public abstract int select() throws IOException;
// SelectorImpl類 → select()方法
public int select() throws IOException {
return this.select(0L);
}
// SelectorImpl類 → select()完整方法
public int select(long var1) throws IOException {
if (var1 < 0L) {
throw new IllegalArgumentException("Negative timeout");
} else {
return this.lockAndDoSelect(var1 == 0L ? -1L : var1);
}
}
當(dāng)調(diào)用Selector.select()方法后,最終會(huì)調(diào)用到SelectorImpl類的select(long var1)方法,而在該方法中,又會(huì)調(diào)用lockAndDoSelect()方法,如下:
// SelectorImpl類 → lockAndDoSelect()方法
private int lockAndDoSelect(long var1) throws IOException {
// 先獲取鎖確保線程安全
synchronized(this) {
// 在判斷當(dāng)前選擇是否處于開啟狀態(tài)
if (!this.isOpen()) {
// 如果已關(guān)閉則拋出異常
throw new ClosedSelectorException();
} else { // 如若處于開啟狀態(tài)
// 獲取所有注冊(cè)在當(dāng)前選擇器上的事件
Set var4 = this.publicKeys;
int var10000;
// 再次加鎖
synchronized(this.publicKeys) {
// 獲取所有已就緒的事件
Set var5 = this.publicSelectedKeys;
// 再次加鎖
synchronized(this.publicSelectedKeys) {
// 真正的調(diào)用select邏輯,獲取已就緒的事件
var10000 = this.doSelect(var1);
}
}
// 返回就緒事件的數(shù)量
return var10000;
}
}
}
在該方法中,對(duì)于其他邏輯不必太過在意,重點(diǎn)可注意:最終會(huì)調(diào)用doSelect()觸發(fā)真正的邏輯操作,接下來再看看這個(gè)方法:
// SelectorImpl類 → doSelect()方法
protected abstract int doSelect(long var1) throws IOException;
// WindowsSelectorImpl類 → doSelect()方法
protected int doSelect(long var1) throws IOException {
// 先判斷一下選擇器上是否還有注冊(cè)的通道
if (this.channelArray == null) {
throw new ClosedSelectorException();
} else { // 如果有的話
// 先獲取一下阻塞等待的超時(shí)時(shí)長(zhǎng)
this.timeout = var1;
// 然后將一些取消的事件從選擇器上移除
this.processDeregisterQueue();
// 再判斷一下是否存在線程中斷喚醒
// 這里主要是結(jié)合之前的wakeup()方法喚醒阻塞線程的
if (this.interruptTriggered) {
this.resetWakeupSocket();
return 0;
} else { // 如果沒有喚醒阻塞線程的需求出現(xiàn)
// 先判斷一下輔助線程的數(shù)量(守護(hù)線程),多則減,少則增
this.adjustThreadsCount();
// 更新一下finishLock.threadsToFinish為輔助線程數(shù)
this.finishLock.reset();
// 喚醒所有的輔助線程
this.startLock.startThreads();
try {
// 設(shè)置主線程中斷的回調(diào)函數(shù)
this.begin();
try {
// 最終執(zhí)行真正的poll邏輯,開始拉取事件
this.subSelector.poll();
} catch (IOException var7) {
this.finishLock.setException(var7);
}
// 喚醒并等待所有未執(zhí)行完的輔助線程完成
if (this.threads.size() > 0) {
this.finishLock.waitForHelperThreads();
}
} finally {
this.end();
}
// 檢測(cè)狀態(tài)
this.finishLock.checkForException();
this.processDeregisterQueue();
// 獲取當(dāng)前選擇器監(jiān)聽的事件的觸發(fā)數(shù)量
int var3 = this.updateSelectedKeys();
// 本輪poll結(jié)束,重置WakeupSocket,為下次執(zhí)行做準(zhǔn)備
this.resetWakeupSocket();
// 最終返回獲取到的事件數(shù)
return var3;
}
}
}
整個(gè)過程下來其實(shí)也并不短暫,但大體就分為三步:
- ①前置動(dòng)作:判斷通道數(shù)、獲取阻塞時(shí)長(zhǎng)、移除取消的事件以及判斷是否需要被喚醒。
- ②核心動(dòng)作:更新并喚醒所有輔助線程、設(shè)置主線程中斷的回調(diào)、執(zhí)行
poll拉取事件。 - ③后置動(dòng)作:?jiǎn)拘演o助線程完成工作、檢測(cè)狀態(tài)、重置條件、獲取事件數(shù)并返回。
在這里面,有一個(gè)輔助線程的概念,這跟最大文件描述符有關(guān),每當(dāng)選擇器上注冊(cè)的通道數(shù)超過
1023時(shí),新增一條線程來管理這些新增的通道。其實(shí)是1024,但其中有一個(gè)要用于喚醒,所以是1023(這里看可能有些懵,但待會(huì)分析過后就理解了)。
在這個(gè)過程中,最最最關(guān)鍵點(diǎn)在于其中的一行代碼:
this.subSelector.poll();
在這里調(diào)用了poll方法,執(zhí)行具體的事件拉取邏輯,進(jìn)一步往下走:
// WindowsSelectorImpl類 → poll()方法
private int poll() throws IOException {
return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress,
Math.min(WindowsSelectorImpl.this.totalChannels, 1024),
this.readFds, this.writeFds, this.exceptFds,
WindowsSelectorImpl.this.timeout);
}
// WindowsSelectorImpl類 → poll0()方法
private native int poll0(long var1, int var3, int[] var4,
int[] var5, int[] var6, long var7);
最后會(huì)調(diào)用WindowsSelectorImpl.poll()方法,而該方法最終會(huì)調(diào)用本地的native方法:poll0()方法,而在JVM的源碼實(shí)現(xiàn)中,該方法最終會(huì)調(diào)用內(nèi)核所提供的函數(shù)。
OK~,由于
Windows有IDEA工具輔助,所以方便調(diào)試源碼,因此這里以WindowsSelectorXXX系的舉例說明,但由于整個(gè)Java-NIO的核心組件,都是基于工廠模式編寫的源碼,所以其他操作系統(tǒng)下的源碼位置也相同,僅最終調(diào)用的內(nèi)核函數(shù)不同?。。?/p>
最終稍做總結(jié),JDK層面的源碼入口,核心流程如下:
- ①
Selector抽象類 →select()抽象方法 - ②
SelectorImpl類 →select()方法 - ③
SelectorImpl類 →lockAndDoSelect()方法 - ④
SelectorImpl類 →doSelect()方法 - ⑤
XxxSelectorImpl類 →doSelect()方法 - ⑥
XxxSelectorImpl類 →poll()方法 - ⑦
XxxSelectorImpl類 →JNI本地的poll0()方法
如若在Windows系統(tǒng)下,上述的XxxSelectorImpl類則為WindowsSelectorImpl,同理,如若在Linux系統(tǒng)下,XxxSelectorImpl類則為EpollSelectorImpl。
最后,如果大家對(duì)于JDK層面的
EPoll感興趣,可自行反編譯Linux版的JDK源碼,EpollSelectorXXX的相關(guān)定義位于:jdk\src\solaris\classes\sun\nio\ch\目錄下。
二、JDK源碼級(jí)別的入口
? ?經(jīng)過第一階段的分析后,會(huì)發(fā)現(xiàn)最終其實(shí)調(diào)用了native本地方法poll0(),在之前的《JVM運(yùn)行時(shí)數(shù)據(jù)區(qū)-本地方法?!?/a>的文章提到過,當(dāng)程序執(zhí)行時(shí)碰到native關(guān)鍵字修飾的方法時(shí),會(huì)調(diào)用C/C++所編寫的本地方法庫(kù)中的實(shí)現(xiàn),那么又該如何查找native方法對(duì)應(yīng)的源碼呢?接著一起來聊一下。
①由于Oracle-jdk是收費(fèi)的,所以咱們首先下載open-jdk1.8的源碼,可以自行在Open-JDK官網(wǎng)下載,但官網(wǎng)下載時(shí),常常會(huì)由于網(wǎng)絡(luò)不穩(wěn)定而中斷,下載起來相當(dāng)費(fèi)勁,因此也為大家提供一下《open-jdk1.8》的源碼鏈接。
②下載之后解壓源碼包,然后進(jìn)入jdk8-master\jdk\src\目錄,在其中你會(huì)看到不同操作系統(tǒng)下的Java實(shí)現(xiàn),JDK源碼會(huì)以操作系統(tǒng)的類型分包,不同系統(tǒng)的對(duì)應(yīng)不同的實(shí)現(xiàn),如下:

但關(guān)于
Linux系統(tǒng)下的Java-NIO實(shí)現(xiàn),實(shí)際上并不在linux目錄中,而是在solaris目錄,進(jìn)入solaris目錄如下:
solaris目錄中還包含了LinuxOS、SunOS(SolarisOS/UnixOS)以及MacOS等操作系統(tǒng)下的Java-NIO實(shí)現(xiàn),但關(guān)于MacOS下的Java-NIO完整實(shí)現(xiàn),則位于前面的macosx目錄中,這里僅包含一部分,結(jié)構(gòu)如下:

觀察上圖會(huì)發(fā)現(xiàn),solaris目錄中包含了KQueue、EPoll、Poll、DevPoll等IO多路復(fù)用模型的Java實(shí)現(xiàn),但關(guān)于Mac-KQueue的完整實(shí)現(xiàn)則在macosx目錄。
OK~,到目前為止大家對(duì)于
JDK源碼的目錄結(jié)構(gòu)應(yīng)該有了基本認(rèn)知。
稍微總結(jié)一下,重點(diǎn)就是搞清楚兩個(gè)位置:
- ?
jdk8-master\jdk\src\xxxOS\classes\sun\nio\ch:對(duì)應(yīng)nio包下的Java代碼。 - ?
jdk8-master\jdk\src\xxxOS\native\sun\nio\ch:對(duì)應(yīng)nio包中native方法的JNI代碼。
③搞清楚JDK源碼目錄的結(jié)構(gòu)后,那以之前分析的Windows-NIO為例:
private native int poll0(long var1, int var3, int[] var4,
int[] var5, int[] var6, long var7);
對(duì)于poll0()這個(gè)本地方法,又該如何查找對(duì)應(yīng)的源碼呢?根據(jù)上述的源碼結(jié)構(gòu),先去到\windows\native\sun\nio\ch目錄中,然后找到與之對(duì)應(yīng)的WindowsSelectorImpl.c文件,最終就能在該文件中定位到對(duì)應(yīng)的JNI方法:Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(名字略微有些長(zhǎng))。
④找到對(duì)應(yīng)的JNI方法源碼后,其中存在這么一行:

觀察之后不難發(fā)現(xiàn),其實(shí)最終還會(huì)調(diào)用到OS內(nèi)核的提供的select()函數(shù),所以poll0()實(shí)際上會(huì)依賴OS提供的多路復(fù)用函數(shù)實(shí)現(xiàn)相應(yīng)的功能,對(duì)于其他操作系統(tǒng)而言,也是同理。
但是接下來只會(huì)重點(diǎn)敘述
Linux下的三大IO多路復(fù)用函數(shù):select、poll、epoll,而對(duì)于Windows-select、Mac-kqueue不會(huì)進(jìn)行深入講解(不是不想分析,而是由于Windows、Mac系統(tǒng)都屬于閉源的,想分析也無法獲取其具體的源碼實(shí)現(xiàn)過程)。
三、文件描述符與自實(shí)現(xiàn)網(wǎng)絡(luò)服務(wù)器
? ?到目前可得知:Java中的NIO最終會(huì)依賴于操作系統(tǒng)所提供的多路復(fù)用函數(shù)去實(shí)現(xiàn),而Linux系統(tǒng)下對(duì)應(yīng)的則是epoll模型,但epoll的前身則是select、poll,因此我們先分析select、poll多路復(fù)用函數(shù),再分析其缺點(diǎn),逐步引出epoll的由來,最終進(jìn)一步對(duì)其進(jìn)行全面剖析。
? ?相信大家在學(xué)習(xí)Linux時(shí),都聽說過“Linux本質(zhì)上就是一個(gè)文件系統(tǒng)”這句話,在Linux-OS中,萬事萬物皆為文件,連網(wǎng)絡(luò)連接也不例外,因此在分析多路復(fù)用模型之前,咱們首先對(duì)這些基礎(chǔ)概念做一定了解。
3.1、文件描述符(FD)
? ?在上述中提到過:Linux的理念就是“一切皆文件”,在Linux中幾乎所有資源都是以文件的形式呈現(xiàn)的。如磁盤的數(shù)據(jù)是文件,網(wǎng)絡(luò)套接字是文件,系統(tǒng)配置項(xiàng)也是文件等等,所有的數(shù)據(jù)內(nèi)容在Linux都是通過文件系統(tǒng)來管理的。
? ?既然所有的內(nèi)容都是文件,那當(dāng)我們要操作這些內(nèi)容時(shí),又該如何處理呢?為了方便系統(tǒng)執(zhí)行,Linux都是通過文件描述符File Descriptor對(duì)文件進(jìn)行操作,對(duì)于文件描述符這個(gè)概念可以通過一個(gè)例子來理解:
Object obj = new Object();
上述是Java創(chuàng)建對(duì)象的一行代碼,類比Linux的文件系統(tǒng),后面new Object()實(shí)例化出來的對(duì)象可以當(dāng)成是具體的文件內(nèi)容,而前面的引用obj則可理解為是文件描述符。Linux通過FD操作文件,其實(shí)本質(zhì)上與Java中通過reference引用操作對(duì)象的過程無異。
而當(dāng)出現(xiàn)網(wǎng)絡(luò)套接字連接時(shí),所有的網(wǎng)絡(luò)連接都會(huì)以文件描述符的形式在內(nèi)核中存在,也包括后面會(huì)提及的多路復(fù)用函數(shù)
select、poll、epoll都會(huì)基于FD對(duì)網(wǎng)絡(luò)連接進(jìn)行操作,因此先闡明這點(diǎn),作為后續(xù)分析的基礎(chǔ)。
3.2、自己設(shè)計(jì)網(wǎng)絡(luò)連接服務(wù)器
? ?在分析之前,我們先自己設(shè)想一下,如果有個(gè)需求:請(qǐng)自己設(shè)計(jì)一套網(wǎng)絡(luò)連接系統(tǒng),那么此時(shí)你會(huì)怎么做呢?此刻例如來了5個(gè)網(wǎng)絡(luò)連接,如下:

那么又該如何處理這些請(qǐng)求呢?最簡(jiǎn)單的方式:

對(duì)于每個(gè)到來的網(wǎng)絡(luò)連接都為其創(chuàng)建一條線程,每個(gè)連接由單獨(dú)的線程負(fù)責(zé)處理,所以最初的
BIO也是這樣來的,由于設(shè)計(jì)起來非常簡(jiǎn)單,所以它成為了最初的網(wǎng)絡(luò)IO模型,但這種方式的缺陷非常明顯,在之前的BIO章節(jié)也曾分析過,無法支撐高并發(fā)的流量訪問,因此這種多線程的方式去實(shí)現(xiàn)自然行不通了,兜兜轉(zhuǎn)轉(zhuǎn)又得回到單線程的角度去思考,單線程如何處理多個(gè)網(wǎng)絡(luò)請(qǐng)求呢?最簡(jiǎn)單的方式,偽代碼如下:
// 不斷輪詢監(jiān)聽所有的網(wǎng)絡(luò)連接
while(true){
// 遍歷所有的網(wǎng)絡(luò)套接字連接
for(SocketFD xFD : FDS){
// 判斷網(wǎng)絡(luò)連接中是否有數(shù)據(jù)
if (xFD.data != null){
// 從套接字中讀取網(wǎng)絡(luò)數(shù)據(jù)
readData();
// 將網(wǎng)絡(luò)數(shù)據(jù)交給應(yīng)用程序處理(寫入對(duì)應(yīng)的程序緩沖區(qū))
processingData();
// ......
}
}
}
如上代碼,當(dāng)有網(wǎng)絡(luò)連接到來時(shí),將其加入FDS數(shù)組中,然后由單條線程不斷的輪詢監(jiān)聽所有網(wǎng)絡(luò)套接字,如果套接字中有數(shù)據(jù),則從中將網(wǎng)絡(luò)數(shù)據(jù)讀取出來,然后將讀取到的網(wǎng)絡(luò)數(shù)據(jù)交給應(yīng)用程序處理。
這似乎是不是就通過單線程的方式解決了多個(gè)網(wǎng)絡(luò)連接的問題?答案是
Yes,但相較而言,性能自然不堪入目,如果內(nèi)核是這樣去處理網(wǎng)絡(luò)連接,對(duì)于并發(fā)支持自然也上不去,那Linux內(nèi)核具體是如何處理的呢?一起來看看。
四、多路復(fù)用函數(shù) - select()
? ?在JDK1.8的源碼中,剛剛似乎并未發(fā)現(xiàn)Selectxxx這系列的定義,這是由于Linux內(nèi)核2.6之后的版本中,已經(jīng)使用epoll代替了select,所以對(duì)應(yīng)的JDK1.5之后版本,也將Linux-select的實(shí)現(xiàn)給移除了,所以如若想觀測(cè)到Linux-select相關(guān)的實(shí)現(xiàn),那還需先安裝一個(gè)kernel-2.6以下的Linux系統(tǒng),以及還需要下載JDK1.5的源碼,這樣才能分析完整的select實(shí)現(xiàn)。
我大致過了一下內(nèi)核中的源碼,對(duì)于
select函數(shù)的實(shí)現(xiàn)大致在2000行左右,大致看下來后,由于對(duì)C語言沒有那么熟悉,并且源碼實(shí)現(xiàn)較長(zhǎng),因此后續(xù)不再以全源碼鏈路的方式剖析,而是適當(dāng)結(jié)合部分核心源碼進(jìn)行闡述。當(dāng)然,如若你的C語言功底還算扎實(shí),那可以下載《Linux2.6.28.6版本內(nèi)核源碼》解壓調(diào)試。
先講清楚接下來的分析思路,在后續(xù)分析IO多路復(fù)用函數(shù)時(shí),大體會(huì)以調(diào)用入口 → 函數(shù)定義 → 核心結(jié)構(gòu)體 → 核心源碼 → 函數(shù)缺陷這個(gè)思路進(jìn)行展開。
4.1、Java-select函數(shù)的JNI入口
? ?對(duì)于Open-JDK1.4、1.5的源碼,由于年代較久遠(yuǎn)了,實(shí)在沒有找到對(duì)應(yīng)的JDK源碼,所以在這里分析Linux-select函數(shù)時(shí),就以前面分析的Windows-select思路舉例說明,如下:
- ①
Java中通過調(diào)用選擇器的select()方法監(jiān)聽客戶端連接。 - ②線程執(zhí)行時(shí),會(huì)執(zhí)行到當(dāng)前平臺(tái)對(duì)應(yīng)的選擇器實(shí)現(xiàn)類的
doSelect()方法。 - ③接著會(huì)調(diào)用實(shí)現(xiàn)類對(duì)應(yīng)的
poll()輪詢方法,最終在該方法中會(huì)調(diào)用其native方法。 - ④當(dāng)線程需要執(zhí)行本地方法時(shí),觸發(fā)
JNI調(diào)用,會(huì)在本地方法庫(kù)中查找對(duì)應(yīng)的C實(shí)現(xiàn)。 - ⑤定位到
native本地方法對(duì)應(yīng)的C語言函數(shù),然后執(zhí)行對(duì)應(yīng)的C代碼。 - ⑥在
C代碼的函數(shù)中,最終會(huì)發(fā)起系統(tǒng)調(diào)用,那假設(shè)此時(shí)系統(tǒng)調(diào)用的函數(shù)為select()。
此時(shí),對(duì)于Java是如何調(diào)用底層操作系統(tǒng)內(nèi)核函數(shù)的過程就分析出來了,但是由于這里沒有下載到對(duì)應(yīng)版本的源碼,因此無法通過源碼進(jìn)行演示,但就算沒有對(duì)應(yīng)的源碼作為依據(jù)也無大礙,因?yàn)闊o論是什么類型的操作系統(tǒng),也無論調(diào)用的是哪個(gè)多路復(fù)用函數(shù),本質(zhì)上入口都是相同的,只是JNI調(diào)用時(shí)會(huì)存在些許差異。
4.2、內(nèi)核select函數(shù)的定義
? ?OK~,得知了Java-NIO執(zhí)行的前因后果后,現(xiàn)在來聊一聊最初NIO會(huì)調(diào)用的系統(tǒng)函數(shù):select,在Linux中的定義如下:
// 定義位于/sys/select.h文件中
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
select函數(shù)定義中,存在五個(gè)參數(shù),如下:
-
nfds:表示FDS中有效的FD數(shù)量,全部文件描述符的最大值+1。 -
readfds:表示需要監(jiān)控讀事件發(fā)生的文件描述符集合。 -
writefds:表示需要監(jiān)控寫事件發(fā)生的文件描述符集合。 -
exceptfds:表示需要監(jiān)控異常/錯(cuò)誤發(fā)生的文件描述符集合。 -
timeout:表示select在沒有事件觸發(fā)的情況下,會(huì)阻塞的時(shí)間。
4.3、select結(jié)構(gòu)體 - fd_set、timeval
? ?在上述中簡(jiǎn)單了解select的定義與參數(shù)后,大家可能會(huì)有些暈乎乎的,這是由于這五個(gè)參數(shù)中涉及到兩組類型的定義,分別為fd_set、timeval,先來看看它們是如何定義的:
// 相關(guān)定義位于linux/types.h、linux/posix_types.h文件中
// -------linux/types.h----------
// 這里定義了一個(gè)__kerenl_fd_set的類型,別名為fd_set。
typedef __kerenl_fd_set fd_set;
省略其他.....
// -------linux/posix_types.h----------
/*
unsigned long表示無符號(hào)長(zhǎng)整型,占4bytes/32bits
sizeof()函數(shù)是求字節(jié)的長(zhǎng)度,sizeof(unsigned long)=4
因此最終這里的__NFDBITS=(8 * 4)=32
*/
#undef __NFDBITS
#define __NFDBITS (8 * sizeof(unsigned long))
// 這里限制了最大長(zhǎng)度為1024(可修改,不推薦)
#undef __FD_SETSIZE
#define __FD_SETSIZE 1024
// 根據(jù)前面的__NFDBITS求出long數(shù)組的最大容量為:1024/32=32個(gè)
#undef __FD_SET_LONGS
#define __FD_SET_LONGS (__FD_SETSIZE/__NFDBITS)
// 這兩組定義則是用于置位、復(fù)位(清除置位)的
#undef __FDELT
#define __FDELT(d) ((d) / __NFDBITS)
#undef __FDMASK
#define __FDMASK(d) (1UL << (d) % __NFDBITS)
// 這里定義了__kerenl_fd_set類型,本質(zhì)上是一個(gè)long數(shù)組
typedef struct {
unsigned long fds_bits [__FDSET_LONGS];
} __kerenl_fd_set;
觀察上述源碼,其實(shí)你會(huì)發(fā)現(xiàn)fd_set的定義是__kerenl_fd_set類型的,而__kerenl_fd_set的定義本質(zhì)上就是一個(gè)long數(shù)組,同時(shí)在__kerenl_fd_set的定義中,也聲明了最大長(zhǎng)度為1024,相信了解過多路復(fù)用函數(shù)的小伙伴都知道select模型的最大缺陷之一就在于:最多只能監(jiān)聽1024個(gè)文件描述符,而對(duì)于具體是為什么,相信看到這個(gè)源碼大家就徹底清楚了。
PS:首先基于上述的知識(shí),已經(jīng)得知最大長(zhǎng)度為
1024,但這1024并非代表著:數(shù)組可以擁有1024個(gè)long元素,而是限制了這個(gè)long數(shù)組最多只能有1024個(gè)比特位的長(zhǎng)度,也就是數(shù)組中最多能擁有1024/32=32個(gè)元素。對(duì)于這點(diǎn),在源碼中也有定義,大家可參考源碼中的注釋。
OK~,那這個(gè)long類型的數(shù)組究竟有什么作用呢?簡(jiǎn)單來說明一下,在這個(gè)fd_set的數(shù)組中,其實(shí)每個(gè)位對(duì)應(yīng)著一個(gè)FD文件描述符的狀態(tài),0代表沒有事件發(fā)生,1則代表有事件觸發(fā),如下圖:

在這個(gè)數(shù)組中,所有的long元素,在計(jì)算機(jī)底層本質(zhì)上都會(huì)被轉(zhuǎn)換成bit存儲(chǔ),而每一個(gè)bit位都對(duì)應(yīng)著一個(gè)FD,所以這個(gè)數(shù)組本質(zhì)上就組成了一個(gè)位圖結(jié)構(gòu),同時(shí)為了方便操作這個(gè)位圖,在之前的sys/select.h文件中還提供了一組宏函數(shù),如下:
// 位于/sys/select.h文件中
// 將一個(gè)fd_set數(shù)組所有位都置零
int FD_ZERO(int fd, fd_set *fdset);
// 將指定的某個(gè)位復(fù)位(賦零)
int FD_CLR(int fd, fd_set *fdset);
// 將指定的某個(gè)位置位(賦一)
int FD_SET(int fd, fd_set *fd_set);
// 檢測(cè)指定的某個(gè)位是否被置位
int FD_ISSET(int fd, fd_set *fdset);
// 這里則是上述宏函數(shù)的實(shí)現(xiàn)(位操作過程)
# define __FD_ZERO(set) \
do { \
unsigned int __i; \
fd_set *__arr = (set); \
for (__i = 0; __i < sizeof (fd_set) / sizeof (__fd_mask); ++__i) \
__FDS_BITS (__arr)[__i] = 0; \
} while (0)
#define __FD_SET(d, set) \
((void) (__FDS_BITS (set)[__FD_ELT (d)] |= __FD_MASK (d)))
#define __FD_CLR(d, set) \
((void) (__FDS_BITS (set)[__FD_ELT (d)] &= ~__FD_MASK (d)))
#define __FD_ISSET(d, set) \
((__FDS_BITS (set)[__FD_ELT (d)] & __FD_MASK (d)) != 0)
對(duì)于定義的幾組宏函數(shù),可以參考上述注釋中的解釋,而對(duì)于這些函數(shù)是如何實(shí)現(xiàn)的,大家可以自行閱讀貼出的源碼。接下來再看看timeval結(jié)構(gòu)體是如何定義的:
struct timeval {
long tv_sec; /* 秒 */
long tv_usec; /* 毫秒 */
};
其實(shí)這個(gè)結(jié)構(gòu)體就是一個(gè)阻塞的時(shí)間,好比select傳入的timeout參數(shù)為3,則timeval.tv_sec=3、timeval.tv_usec=3000,代表調(diào)用select()沒有獲取到有效事件的情況下,在3s內(nèi)會(huì)不斷循環(huán)檢測(cè)。當(dāng)然,這個(gè)timeout的值會(huì)分為三種情況:
-
0:表示調(diào)用select()函數(shù)后不等待,沒有就緒事件時(shí)直接返回。 -
NULL:表示調(diào)用select()函數(shù)后無限等待,阻塞至出現(xiàn)中斷信號(hào)或觸發(fā)事件后返回。 - 正數(shù):表示調(diào)用
select()函數(shù)后,在指定的時(shí)間內(nèi)等待事件觸發(fā),超時(shí)則返回。
至此,對(duì)于
select()函數(shù)所需參數(shù)中,涉及到的兩個(gè)結(jié)構(gòu)體已經(jīng)弄明白了,那么再回來看看select()的五個(gè)參數(shù)。
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
調(diào)用select()時(shí),中間的三個(gè)參數(shù)要求傳入fd_set類型,它們分別對(duì)應(yīng)著:那些文件描述符需要監(jiān)聽讀事件發(fā)生、那些文件描述符需要監(jiān)聽寫事件發(fā)生、那些文件描述符需要監(jiān)聽異常錯(cuò)誤發(fā)生。當(dāng)調(diào)用select()函數(shù)后會(huì)陷入阻塞,直到有描述符的事件就緒(有數(shù)據(jù)可讀、可寫或出現(xiàn)異常錯(cuò)誤)或超時(shí)后才會(huì)返回。而select()函數(shù)返回也會(huì)存在三種狀態(tài):
-
0:當(dāng)描述符集合中沒有事件觸發(fā),并且超出設(shè)置的時(shí)間后,會(huì)返回0。 -
-1:當(dāng)select執(zhí)行過程中,出現(xiàn)異常/錯(cuò)誤時(shí)則會(huì)返回-1。 - 正數(shù):如果監(jiān)視的文件描述符集合中有事件發(fā)生(有數(shù)據(jù)),則會(huì)對(duì)應(yīng)的事件數(shù)量。
4.4、select()函數(shù)的使用案例
? ?在上述中已經(jīng)對(duì)于select()函數(shù)的一些基礎(chǔ)知識(shí)建立了認(rèn)知,接下來上個(gè)偽代碼感受一下select()函數(shù)的使用過程:
/* ----------①---------- */
// 創(chuàng)建服務(wù)端socket套接字,并監(jiān)聽客戶端連接
serverSockfd = socket(AF_INET,SOCK_STREAM,0);
// 省略.....
bind(serverSockfd,IP,Port);
listen(serverSockfd,numfds);
// 這里是已經(jīng)接收的客戶端連接集合
fds[numfds] = accept(serverSockfd,.....);
/* ----------②---------- */
// 將所有的客戶端連接,分別加入對(duì)應(yīng)的位圖中
FD_SET readfds, writefds, exceptfds;
int read_count = 0, write_count = 0, except_count = 0;
for (i = 0; i < numfds; i++) {
if (fds[i].events == 讀取事件){
// 加入readfds
}
if (fds[i].events == 寫入事件){
// 加入writefds
}
// 省略.....
}
/* ----------③---------- */
// 求出最大的fds值
maxfds = ....;
struct timeval timevalue, *tv;
// 省略.....
/* ----------④---------- */
while(1){
// 初始化位圖
FD_ZERO(readfds);
FD_ZERO(writefds);
FD_ZERO(exceptfds);
// 分別對(duì)每個(gè)位圖中需要監(jiān)聽的FD進(jìn)行置位
for (i = 0; i < numfds; i++) {
if (fds[i].events == 讀取事件){
FD_SET(fds[i],&readfds);
}
// 省略其他置位處理.....
}
// 調(diào)用select函數(shù)
int result = select(maxfds+1, &readfds, &writefds, &exceptfds, tv);
/* ----------⑤---------- */
if (result == 0){
// 處理超時(shí)并返回....
}
if (result < 0){
// 處理異常并返回....
}
/* ----------⑥、⑦---------- */
// 能執(zhí)行到這里,代表select()返回大于0
for (i = 0; i < numfds; i++) {
if(FD_ISSET(fds[i],&readfds)){
// 讀取被置位的socket.....
read(fds[i], buffer,0,MAXBUF);
}
// 省略其他......
}
}
上述的偽代碼雖然看著較多,但本質(zhì)上并不難,大體分為如下幾步:
- ①創(chuàng)建服務(wù)端的
Socket套接字并綁定相關(guān)的地址,建立監(jiān)聽,等待客戶端連接。 - ②將所有的客戶端連接,根據(jù)注冊(cè)的事件,分別將其加入到對(duì)應(yīng)的位圖中。
- ③求出文件描述符的最大值,并對(duì)于超時(shí)時(shí)間這個(gè)參數(shù)進(jìn)行初始化構(gòu)建。
- ④對(duì)位圖做置位,調(diào)用
select()函數(shù)并傳入的相關(guān)參數(shù),等待內(nèi)核處理完成。 - ⑤根據(jù)內(nèi)核的返回結(jié)果,進(jìn)行對(duì)應(yīng)處理,如超時(shí)處理、異常處理、事件處理等。
- ⑥如果沒有超時(shí)以及出現(xiàn)錯(cuò)誤,那么則遍歷判斷那個(gè)
FD有數(shù)據(jù)的(被置位)。 - ⑦對(duì)于有事件發(fā)生的
FD,根據(jù)其事件類型進(jìn)行對(duì)應(yīng)的處理(讀、寫數(shù)據(jù))。
對(duì)于這個(gè)偽代碼,其實(shí)也是調(diào)用select()函數(shù)的通用模型,以Java的JNI調(diào)用為例,其實(shí)大體的過程也是相同的,如下:

沒有下載到
JDK1.5的源碼,所以以Windows-select的調(diào)用為例。
4.5、內(nèi)核select函數(shù)核心源碼
? ?在上述過程中,我們調(diào)用了select()函數(shù)實(shí)現(xiàn)了IO多路復(fù)用,但調(diào)用之后select()的執(zhí)行過程,相對(duì)而言其實(shí)是未知,那么接著再來看看select()的核心源碼,剖析一下調(diào)用select后,內(nèi)核究竟會(huì)如何處理。
內(nèi)核源碼的執(zhí)行流程:
sys_select() → SYSCALL_DEFINE5() → core_sys_select() → do_select() → f_op->poll/tcp_poll()。
所有的系統(tǒng)調(diào)用,都可以在它的名字前加上“sys_”前綴,這就是它在內(nèi)核中對(duì)應(yīng)的函數(shù)。比如系統(tǒng)調(diào)用open、read、write、select,與之對(duì)應(yīng)的內(nèi)核函數(shù)為:sys_open、sys_read、sys_write、sys_select,因此上述的sys_select()其實(shí)就是select()函數(shù)再內(nèi)核中對(duì)應(yīng)的函數(shù)。
接著來看看SYSCALL_DEFINE5()、core_sys_select()函數(shù)的內(nèi)容:
// 位于fs/select.c文件中(sys_select函數(shù))
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct timeval __user *, tvp)
{
struct timespec end_time, *to = NULL;
struct timeval tv;
int ret;
// 判斷是否傳入了超時(shí)時(shí)間
if (tvp) {
if (copy_from_user(&tv, tvp, sizeof(tv)))
return -EFAULT;
to = &end_time;
// 如果已經(jīng)到了超時(shí)時(shí)間,則中斷執(zhí)行并返回
if (poll_select_set_timeout(to,
tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
return -EINVAL;
}
// 未超時(shí)或沒有設(shè)置超時(shí)時(shí)間的情況下,調(diào)用core_sys_select
ret = core_sys_select(n, inp, outp, exp, to);
ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);
return ret;
}
// 位于fs/select.c文件中(core_sys_select函數(shù))
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timespec *end_time)
{
fd_set_bits fds;
void *bits;
int ret, max_fds;
unsigned int size;
struct fdtable *fdt;
/* 由于涉及到了用戶態(tài)和內(nèi)核態(tài)的切換,因此將位圖存儲(chǔ)在棧上,
(盡量提升狀態(tài)切換時(shí)的效率,這里采用棧的方式存儲(chǔ)) */
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
ret = -EINVAL;
if (n < 0)
goto out_nofds;
// 先計(jì)算出max_fds值
rcu_read_lock();
fdt = files_fdtable(current->files);
max_fds = fdt->max_fds;
rcu_read_unlock();
if (n > max_fds)
n = max_fds;
// 根據(jù)前面計(jì)算的max_fds值,判斷一下前面開??臻g是否足夠
// (在這里涉及到一個(gè)新的結(jié)構(gòu)體:fd_set_bits,稍后詳細(xì)分析)
size = FDS_BYTES(n);
bits = stack_fds;
if (size > sizeof(stack_fds) / 6) {
// 如果空間不夠則調(diào)用內(nèi)核的kmalloc為fd_set_bits分配更大的空間
ret = -ENOMEM;
bits = kmalloc(6 * size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
// 將fd_set_bits中六個(gè)位圖指針指向分配好的內(nèi)存位置
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;
// 將用戶空間提交的三個(gè)fd_set拷貝到內(nèi)核空間
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
goto out;
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);
// 調(diào)用select模型的核心函數(shù)do_select()
ret = do_select(n, &fds, end_time);
if (ret < 0)
goto out;
// 檢測(cè)到有信號(hào)則系統(tǒng)調(diào)用退出,返回用戶空間執(zhí)行信號(hào)處理函數(shù)
if (!ret) {
ret = -ERESTARTNOHAND;
if (signal_pending(current))
goto out;
ret = 0;
}
if (set_fd_set(n, inp, fds.res_in) ||
set_fd_set(n, outp, fds.res_out) ||
set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;
// goto跳轉(zhuǎn)的對(duì)應(yīng)點(diǎn)
out:
if (bits != stack_fds)
kfree(bits);
out_nofds:
return ret;
}
源碼看過去,看起來有些多,對(duì)于C語言不太熟悉的小伙伴可能看的會(huì)一臉懵,但沒關(guān)系,我們不去講細(xì)了,重點(diǎn)理解其主干內(nèi)容,上述源碼分為如下幾步:
- ①先判斷調(diào)用
select()時(shí),是否設(shè)置了超時(shí)時(shí)間:- 是:記錄一下超時(shí)的時(shí)間點(diǎn),并判斷一下是否超時(shí),超時(shí)則中斷并返回。
- 否:沒有超時(shí)或沒設(shè)置超時(shí)時(shí)間,則調(diào)用
core_sys_select()函數(shù)。
- ②計(jì)算出最大的文件描述符,然后采用開棧方式存儲(chǔ)遞交的參數(shù)值。
- ③根據(jù)計(jì)算出的
max_fds值,判斷開??臻g能否可以存儲(chǔ)遞交的參數(shù)值:- 不能:調(diào)用內(nèi)核的
kmalloc分配器為fd_set_bits分配更大的空間(新分配的內(nèi)存是在堆)。 - 能:更改
fd_set_bits中的指針指向,然后將遞交的三個(gè)fd_set拷貝到內(nèi)核空間。
- 不能:調(diào)用內(nèi)核的
- ④上述工作全部已就緒后,調(diào)用
select()函數(shù)中的核心函數(shù):do_select()處理。
在上述過程中,理解起來并不復(fù)雜,唯一的疑惑點(diǎn)就在于多出了一個(gè)新的結(jié)構(gòu)體:fd_set_bits,那它究竟是什么意思呢?先來看看它的定義:
typedef struct {
unsigned long *in, *out, *ex;
unsigned long *res_in, *res_out, *res_ex;
} fd_set_bits;
很明顯,fd_set_bits是由六個(gè)元素組成的,這六個(gè)元素分別對(duì)應(yīng)著六個(gè)位圖,其中前三個(gè)則對(duì)應(yīng)調(diào)用select()函數(shù)時(shí)遞交的三個(gè)參數(shù):readfds、writefds、exceptfds,而后三個(gè)則對(duì)應(yīng)著select()執(zhí)行完成之后返回的位圖,為什么還需要有后面三個(gè)呢?
因?yàn)?code>select()在遍歷需要監(jiān)聽的文件描述符列表時(shí),也需要三個(gè)對(duì)應(yīng)的位圖來記錄哪些
FD中是有數(shù)據(jù)的,因此也需要有三個(gè)位圖對(duì)應(yīng)著傳入的三個(gè)位圖,在select()執(zhí)行完成后,如若有Socket中存在數(shù)據(jù)需要處理,那則會(huì)將這三個(gè)位圖中對(duì)應(yīng)的Socket位置進(jìn)行置位,然后從內(nèi)核空間再將其拷貝回用戶空間,以供程序處理。
OK~,了解fd_set_bits結(jié)構(gòu)后,對(duì)于core_sys_select函數(shù)中做的工作就自然理解了,一句話總結(jié)一下這個(gè)函數(shù)做的工作:
core_sys_select只不過是在為后面要調(diào)用的do_select()函數(shù)做準(zhǔn)備工作而已。
當(dāng)然,在上述的core_sys_select函數(shù)中還涉及到兩個(gè)函數(shù):get_fd_set()、set_fd_set(),其實(shí)現(xiàn)如下:
// 調(diào)用了copy_from_user()函數(shù),也就是從用戶空間拷貝數(shù)據(jù)到內(nèi)核空間
static inline
int get_fd_set(unsigned long nr, void __user *ufdset, unsigned long *fdset)
{
nr = FDS_BYTES(nr);
if (ufdset)
return copy_from_user(fdset, ufdset, nr) ? -EFAULT : 0;
memset(fdset, 0, nr);
return 0;
}
// 調(diào)用了__copy_to_user()函數(shù),也就是將數(shù)據(jù)從內(nèi)核空間拷貝回用戶空間
static inline unsigned long __must_check
set_fd_set(unsigned long nr, void __user *ufdset, unsigned long *fdset)
{
if (ufdset)
return __copy_to_user(ufdset, fdset, FDS_BYTES(nr));
return 0;
}
從最終調(diào)用的copy_from_user()、copy_to_user()兩個(gè)函數(shù)中就能得知,這就是用于用戶空間與內(nèi)核空間之間數(shù)據(jù)拷貝的函數(shù)而已。
那么再來看看select()的核心函數(shù)do_select()吧,先上源碼:
int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
ktime_t expire, *to = NULL;
// -------- 核心結(jié)構(gòu):poll_wqueues -------------
struct poll_wqueues table;
poll_table *wait;
int retval, i, timed_out = 0;
unsigned long slack = 0;
// 先獲取一下最大的文件描述符
rcu_read_lock();
retval = max_select_fd(n, fds);
rcu_read_unlock();
// 如果獲取到的值為負(fù)數(shù),則返回select()執(zhí)行過程中錯(cuò)誤
if (retval < 0)
return retval;
n = retval;
// 初始化poll_wqueues結(jié)構(gòu)體中的poll_table,并更改__pollwait的指針指向
poll_initwait(&table);
wait = &table.pt;
// 如果系統(tǒng)調(diào)用select()函數(shù)時(shí),設(shè)置的超時(shí)時(shí)間為0,
// 那么賦值timed_out = 1,表示未獲取到事件的情況下不阻塞,直接返回。
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait = NULL;
timed_out = 1;
}
// 如果設(shè)置了超時(shí)時(shí)間,則預(yù)估一下還剩下多少時(shí)間
if (end_time && !timed_out)
slack = estimate_accuracy(end_time);
retval = 0; // 這個(gè)是最終返回的值
// 開啟輪詢,這里是核心!?。? for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
// 對(duì)于每個(gè)需要監(jiān)聽的fd,向其等待隊(duì)列中注冊(cè)后一個(gè)entry
set_current_state(TASK_INTERRUPTIBLE);
// 準(zhǔn)備工作
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
const struct file_operations *f_op = NULL;
struct file *file = NULL;
// 做一次位或操作,對(duì)于并集為0的FD直接忽略
// (在前面分析過,只有置位=1的,才代表這個(gè)FD需要被監(jiān)聽事件)
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) {
i += __NFDBITS;
continue;
}
// 內(nèi)層循環(huán):開始對(duì)需要監(jiān)聽的FD進(jìn)行掃描(核心中的核心?。。? for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
int fput_needed;
if (i >= n)
break;
if (!(bit & all_bits))
continue;
file = fget_light(i, &fput_needed);
// 這里是重點(diǎn):主要做了f_op->poll這個(gè)操作(具體含義后面細(xì)聊)
if (file) {
f_op = file->f_op;
mask = DEFAULT_POLLMASK;
// 檢測(cè)對(duì)應(yīng)的FD是否能夠進(jìn)行IO操作
if (f_op && f_op->poll)
// 會(huì)調(diào)用具體設(shè)備的poll()方法
mask = (*f_op->poll)(file, retval ? NULL : wait);
fput_light(file, fput_needed);
// 判斷對(duì)應(yīng)的文件描述符目前的狀態(tài)
// 如果是可讀狀態(tài),則將其res_in集合對(duì)應(yīng)的坑位置1
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
}
// 如果是可寫狀態(tài),則將其res_out集合.......
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
}
}
}
// 對(duì)于監(jiān)聽到有數(shù)據(jù)的FD,賦值給之前要返回的位圖中
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
cond_resched();
}
// 如果掃描到了活躍FD、或出現(xiàn)超時(shí)、出現(xiàn)喚醒信號(hào)以及指向碰到錯(cuò)誤
// 中斷循環(huán)掃描,返回到之前的core_sys_select()函數(shù)中
// 如若是被喚醒或超時(shí)了,則會(huì)重新掃描一次所有FD
wait = NULL;
if (retval || timed_out || signal_pending(current))
break;
if (table.error) {
retval = table.error;
break;
}
// 第一次循環(huán)時(shí),如果設(shè)置了超時(shí)時(shí)間,那么則將時(shí)間賦值給to指針
if (end_time && !to) {
expire = timespec_to_ktime(*end_time);
to = &expire;
}
/* 未掃描到活躍的FD,則調(diào)用schedule_hrtimeout_range函數(shù),
函數(shù)作用:讓當(dāng)前程序進(jìn)入睡眠,讓出CPU資源,避免無效掃描浪費(fèi)CPU,
調(diào)用時(shí)傳入了to,這是調(diào)用時(shí)指定的阻塞時(shí)間,超時(shí)則返回0,
如果在睡眠過程中,被socket喚醒則返回-EINTR */
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1; // 睡眠超時(shí)后置1,方便后面退出循環(huán)返回到上層
}
__set_current_state(TASK_RUNNING);
// 清理各個(gè)驅(qū)動(dòng)程序的等待隊(duì)列頭,
// 同時(shí)釋放所有空出來的poll_table_page頁(包含的poll_table_entry)
poll_freewait(&table);
// 返回掃描到的活躍FD數(shù)量
return retval;
}
對(duì)于源碼的執(zhí)行過程,在上面都已給出了相關(guān)注釋,但看起來有些費(fèi)力,我們稍后再去總結(jié)一遍,但在此之前我們需要先理解兩個(gè)內(nèi)容:活躍FD數(shù)、poll_wqueues結(jié)構(gòu)體。
活躍
FD數(shù):表示有事件發(fā)生的文件描述符,比如一個(gè)網(wǎng)絡(luò)套接字中有數(shù)據(jù)可讀,那么這個(gè)Socket對(duì)應(yīng)的FD則可記為一次活躍數(shù)。如果一個(gè)FD同時(shí)觸發(fā)了兩個(gè)事件,那么則會(huì)計(jì)算兩次活躍數(shù)。
poll_wqueues結(jié)構(gòu)體則屬于do_select()函數(shù)中的一個(gè)核心結(jié)構(gòu),定義如下:
// 位于include/linux/poll.h文件中
struct poll_wqueues {
// 驅(qū)動(dòng)注冊(cè),回調(diào)函數(shù)__pollwait的指針
poll_table pt;
// 如果下面的inline_entries不夠 就會(huì)需要
struct poll_table_page * table;
int error;
// 記錄下面的table使用過的下標(biāo)
int inline_index;
// 對(duì)應(yīng)下述的poll_table_entry結(jié)構(gòu)
struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};
// 加入等待隊(duì)列的節(jié)點(diǎn)
struct poll_table_entry {
struct file * filp;
wait_queue_t wait;
wait_queue_head_t * wait_address;
};
// 回調(diào)函數(shù)的指針
typedef struct poll_table_struct {
poll_queue_proc qproc;
} poll_table;
對(duì)于這個(gè)結(jié)構(gòu)體而言,核心就在于其中的pt成員,它是poll_table類型的,不過想要理解它,那首先必須明白一個(gè)知識(shí)點(diǎn):
當(dāng)某個(gè)進(jìn)程需要對(duì)一個(gè)
IO設(shè)備(例如socket)進(jìn)行讀寫時(shí),如果發(fā)現(xiàn)此設(shè)備的數(shù)據(jù)暫且還未就緒,所以不能進(jìn)行讀寫操作,當(dāng)前進(jìn)程就需要阻塞等待。為了實(shí)現(xiàn)阻塞進(jìn)程,那每個(gè)socket/IO設(shè)備都有個(gè)等待隊(duì)列,當(dāng)進(jìn)程需要阻塞等待數(shù)據(jù)時(shí),就可以將該進(jìn)程添加到對(duì)應(yīng)的等待隊(duì)列中進(jìn)行休眠,當(dāng)socket數(shù)據(jù)就緒后,再喚醒隊(duì)列中的進(jìn)程。
而poll_table結(jié)構(gòu)就是為了將進(jìn)程添加到等待隊(duì)列中而創(chuàng)造的,在上述源碼中調(diào)用poll_initwait()函數(shù)后,就會(huì)將poll_wqueues中的poll_table成員的poll_queue.proc設(shè)置為__pollwait()回調(diào)函數(shù),當(dāng)后續(xù)執(zhí)行到f_op->poll()時(shí)會(huì)調(diào)用poll_wait()函數(shù),最終就會(huì)執(zhí)行到這里設(shè)置的__pollwait()回調(diào),這兩個(gè)函數(shù)實(shí)現(xiàn)如下:
// 將當(dāng)前進(jìn)程添加到wait參數(shù)指定的等待列表(poll_table)中
poll_wait(struct file *filp, wait_queue_head_t *queue, poll_table *wait)
{
if (p && wait_address)
p->qproc(filp, wait_address, p);
}
// 設(shè)置喚醒回調(diào)函數(shù)為pollwake函數(shù),并將poll_table_entry.wait加入等待隊(duì)列
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
poll_table *p)
{
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
struct poll_table_entry *entry = poll_get_entry(p);
if (!entry)
return;
get_file(filp);
entry->filp = filp;
// 設(shè)置等待隊(duì)列頭
entry->wait_address = wait_address;
// 設(shè)置關(guān)注的事件
entry->key = p->key;
// 設(shè)置等待隊(duì)列節(jié)點(diǎn)的回調(diào)函數(shù)為pollwake()
init_waitqueue_func_entry(&entry->wait, pollwake);
// 私有數(shù)據(jù) poll_wqueues
entry->wait.private = pwq;
// 將 poll_table_entry 添加到對(duì)應(yīng)的等待隊(duì)列上
add_wait_queue(wait_address, &entry->wait);
}
OK~,到這里看的可能會(huì)有些懵,因?yàn)檫@是跟后續(xù)的喚醒動(dòng)作有關(guān)的,待會(huì)兒結(jié)合具體的設(shè)備驅(qū)動(dòng)一起來理解,現(xiàn)在咱們重點(diǎn)先分析一下do_select()函數(shù)的核心過程:
- ①準(zhǔn)備階段:獲取最大文件描述符值、設(shè)置阻塞回調(diào)、處理超時(shí)時(shí)間等。
- ②開啟輪詢,將不需要監(jiān)聽的
FD忽略,需要監(jiān)聽的FD都向其等待隊(duì)列注冊(cè)一個(gè)entry。 - ③開啟循環(huán)將所有需要監(jiān)聽的
FD全部掃描一遍,判斷FD對(duì)應(yīng)的設(shè)備是否有數(shù)據(jù)可讀寫:- 有:直接跳到步驟⑤。
- 沒有:內(nèi)核調(diào)用
schedule讓當(dāng)前進(jìn)程睡眠xx秒,讓出cpu進(jìn)入阻塞。
- ④如果有
FD主動(dòng)喚醒了當(dāng)前進(jìn)程,或xx秒后自己醒了,再次跳回步驟③。 - ⑤如果從文件描述符集合中掃描到了有數(shù)據(jù)可讀寫的
FD,記錄相應(yīng)的活躍個(gè)數(shù)。 - ⑥將就緒事件結(jié)果保存在
fds的res_in、res_out、res_ex集合中,然后調(diào)用poll_freewait()函數(shù)移除各個(gè)驅(qū)動(dòng)程序的等待隊(duì)列頭,最后返回對(duì)應(yīng)的活躍FD數(shù)。
do_select()函數(shù)的核心流程總結(jié)給出來了,其實(shí)粗略理解起來也不難,唯一有些繞的估計(jì)就是進(jìn)程阻塞/喚醒這塊的內(nèi)容,下面重點(diǎn)來說一下這塊。
在
do_select()中,掃描FD時(shí)有一個(gè)核心操作:
mask = (*f_op->poll)(file, retval ? NULL : wait);
在這步操作中,會(huì)調(diào)用文件描述符對(duì)應(yīng)設(shè)備的poll檢測(cè)當(dāng)前是否能夠進(jìn)行IO操作,那么對(duì)于網(wǎng)絡(luò)Socket套接字而言,調(diào)用poll之后,對(duì)應(yīng)的接口就是sock_poll(),其定義位于net/ipv4/,如下:
static unsigned int sock_poll(struct file *file, poll_table * wait)
{
struct socket *sock;
sock = socki_lookup(file->f_dentry->d_inode);
return sock->ops->poll(file, sock, wait);
}
實(shí)現(xiàn)很簡(jiǎn)單,首先會(huì)通過socki_lookup()函數(shù)將文件描述符轉(zhuǎn)換為具體的Socket套接字,然后會(huì)調(diào)用該socket.poll()函數(shù),例如這里的套接字是TCP類型的,那么對(duì)應(yīng)的實(shí)現(xiàn)就是tcp_poll()函數(shù):
// 位于net/ipv4/tcp/目錄下
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
unsigned int mask;
struct sock *sk = sock->sk;
struct tcp_sock *tp = tcp_sk(sk);
poll_wait(file, sk->sk_sleep, wait);
if (sk->sk_state == TCP_LISTEN)
return inet_csk_listen_poll(sk);
// 用mask來記錄socket數(shù)據(jù)是否可被讀寫
mask = 0;
// 開始進(jìn)行判斷
if (sk->sk_err)
mask = POLLERR;
if (sk->sk_shutdown == SHUTDOWN_MASK || sk->sk_state == TCP_CLOSE)
mask |= POLLHUP;
if (sk->sk_shutdown & RCV_SHUTDOWN)
mask |= POLLIN | POLLRDNORM | POLLRDHUP;
if ((1 << sk->sk_state) & ~(TCPF_SYN_SENT | TCPF_SYN_RECV)) {
int target = sock_rcvlowat(sk, 0, INT_MAX);
if (tp->urg_seq == tp->copied_seq &&
!sock_flag(sk, SOCK_URGINLINE) &&
tp->urg_data)
target--;
if (tp->rcv_nxt - tp->copied_seq >= target)
mask |= POLLIN | POLLRDNORM;
if (!(sk->sk_shutdown & SEND_SHUTDOWN)) {
if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
mask |= POLLOUT | POLLWRNORM;
} else { /* send SIGIO later */
set_bit(SOCK_ASYNC_NOSPACE,
&sk->sk_socket->flags);
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
/* Race breaker. If space is freed after
* wspace test but before the flags are set,
* IO signal will be lost.
*/
if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
mask |= POLLOUT | POLLWRNORM;
}
}
if (tp->urg_data & TCP_URG_VALID)
mask |= POLLPRI;
}
// 最終返回當(dāng)前socket是否可被讀寫
return mask;
}
在這個(gè)函數(shù)中,首先會(huì)調(diào)用poll_wait()函數(shù)將當(dāng)前進(jìn)程添加到wait等待列表中,然后檢測(cè)socket目前數(shù)據(jù)是否可以被讀寫,最終通過mask變量來記錄當(dāng)前套接字的數(shù)據(jù)是否可被讀寫,如果可讀寫會(huì)將對(duì)應(yīng)的FD記錄為活躍狀態(tài)。如若不可讀寫則會(huì)先返回,然后等當(dāng)前進(jìn)程遍歷完所有FD后,所有的FD都不能進(jìn)行I/O操作的情況下,當(dāng)前進(jìn)程則會(huì)進(jìn)入休眠阻塞狀態(tài)。
如果進(jìn)程陷入休眠阻塞狀態(tài)后,它被再次喚醒只有兩種情況:
①為進(jìn)程設(shè)置的休眠時(shí)間到了自己醒來。
②由對(duì)應(yīng)的驅(qū)動(dòng)設(shè)備主動(dòng)喚醒。
第一種情況都懂就不聊了,重點(diǎn)來說說第二種,這種喚醒則是由I/O設(shè)備決定的,之前分析__pollwait函數(shù)時(shí),在最后調(diào)用了add_wait_queue(wait_address, &entry->wait)函數(shù),在對(duì)應(yīng)的等待隊(duì)列上插入了一個(gè)entry,那當(dāng)I/O設(shè)備的數(shù)據(jù)就緒后,就會(huì)去遍歷等待隊(duì)列找到這個(gè)entry,然后會(huì)調(diào)用設(shè)置好的pollwake()回調(diào)函數(shù)喚醒對(duì)應(yīng)的進(jìn)程。此時(shí)由于數(shù)據(jù)已經(jīng)準(zhǔn)備好了,所以當(dāng)select被喚醒后,自然就能掃描到對(duì)應(yīng)的FD變?yōu)榱丝勺x寫狀態(tài),然后返回給用戶態(tài)的程序。
當(dāng)然,對(duì)于喚醒這塊的具體實(shí)現(xiàn)位于
/sys/wait.h、wait.c文件中,感興趣的可自行研究。
至此,select()函數(shù)被調(diào)用后,在內(nèi)核具體是如何工作的,整個(gè)源碼流程也就大致分析清楚了,現(xiàn)在咱們會(huì)簡(jiǎn)單總結(jié)一下,梳理清楚完整流程。
4.6、select底層原理小結(jié)
? ?在經(jīng)過上述一系列分析后,我們大致摸透了select()運(yùn)行的底層原理,但估摸著大家看下來都有一點(diǎn)云里霧里的感覺,因此再簡(jiǎn)單的寫一個(gè)完整流程的總結(jié):
- ①外部調(diào)用
select()函數(shù),傳入最大文件描述符值、三個(gè)FD集合以及超時(shí)時(shí)間。 - ②用六個(gè)位圖組成的
fd_set_bits結(jié)構(gòu)存儲(chǔ)傳入的FD集合,用kmalloc為其分配??臻g。 - ③將用戶態(tài)傳遞的
fd_set拷貝到內(nèi)核空間,緊接著調(diào)用do_select()函數(shù)。 - ④獲取傳入的最大文件描述符值、設(shè)置阻塞回調(diào)函數(shù)、處理超時(shí)時(shí)間等。
- ⑤開啟輪詢,將不需要監(jiān)聽的
FD忽略,需要監(jiān)聽的FD都向其等待隊(duì)列注冊(cè)一個(gè)entry。 - ⑥開啟循環(huán)將所有需要監(jiān)聽的
FD全部掃描一遍,判斷FD對(duì)應(yīng)的設(shè)備是否有數(shù)據(jù)可讀寫:- 有:直接跳到步驟⑧。
- 沒有:內(nèi)核調(diào)用
schedule讓當(dāng)前進(jìn)程睡眠xx秒,讓出cpu進(jìn)入阻塞。
- ⑦如果有
FD主動(dòng)喚醒了當(dāng)前進(jìn)程,或xx秒后自己醒了,再次跳回步驟⑥。 - ⑧如果從文件描述符集合中掃描到了有數(shù)據(jù)可讀寫的
FD,記錄相應(yīng)的活躍個(gè)數(shù)。 - ⑨將就緒事件結(jié)果保存在
fds的res_in、res_out、res_ex集合中,然后調(diào)用poll_freewait()函數(shù)移除各個(gè)驅(qū)動(dòng)程序的等待隊(duì)列頭,最后返回對(duì)應(yīng)的活躍FD數(shù)。 - ⑩將掃描到的
FD從內(nèi)核拷貝會(huì)用戶態(tài)空間,同時(shí)向程序返回已觸發(fā)的事件數(shù)。
其實(shí)整個(gè)流程下來,select分析的內(nèi)容頗多,這是因?yàn)樗彩呛罄m(xù)兩個(gè)函數(shù)的基礎(chǔ),把它的過程弄明白了,在分析后面的函數(shù)時(shí),過程也是換湯不換藥的,步驟都大致相同。
4.7、select的缺點(diǎn)分析與思考
? ?詳細(xì)了解了select()函數(shù)后,再來想想它有哪些不足的地方呢?
①由
32個(gè)long元素組成的fd_set,最大只能表示1024位,因此最多只能監(jiān)聽1024個(gè)socket,所以對(duì)于高并發(fā)的I/O場(chǎng)景很難提供支持。
②因?yàn)楸O(jiān)聽
FD的工作是內(nèi)核完成的,所以每次調(diào)用select()時(shí),都需將FD集合從用戶態(tài)拷貝到內(nèi)核態(tài)空間,這個(gè)過程開銷會(huì)較大。
③當(dāng)監(jiān)聽的
FD集合中,某個(gè)Socket上有數(shù)據(jù)可讀寫后,會(huì)喚醒陷入睡眠的select,但select醒來后也不知道那個(gè)FD有數(shù)據(jù),因此會(huì)重新將整個(gè)集合遍歷一次,造成了很大程度上的浪費(fèi)。
④每次調(diào)用
select函數(shù)時(shí),由于需要監(jiān)聽的文件描述符不同,所以需要構(gòu)建新的fd_set集合,也就是上一次使用過的fd_set不可被重用,造成較大的資源開銷。
上述四點(diǎn),則是select多路復(fù)用模型的四個(gè)致命缺陷,由于這些原因?qū)е滤⒉贿m合于一些高性能的場(chǎng)景,因此才有后續(xù)的poll、epoll等模型出現(xiàn)。
但在分析其他兩個(gè)函數(shù)之前,再思考一個(gè)問題,假設(shè)此時(shí)CPU正在處理一個(gè)IO數(shù)據(jù),但此刻另外一個(gè)Socket上也來了數(shù)據(jù),那么這個(gè)數(shù)據(jù)會(huì)被丟棄嗎?
答案是不會(huì)的,因?yàn)橛袑iT用于處理
I/O數(shù)據(jù)的硬件:DMA控制器以及網(wǎng)卡,在網(wǎng)絡(luò)連接到來時(shí),如果CPU正在處理另外一條網(wǎng)絡(luò)連接的數(shù)據(jù),新連接的網(wǎng)絡(luò)數(shù)據(jù)并不會(huì)被丟棄,而是會(huì)由網(wǎng)卡將數(shù)據(jù)接收并放入內(nèi)核緩沖區(qū)。同理,如果是本地IO,則會(huì)由DMA控制器處理。