
前言:
本文承接自上篇:iOS即時通訊進階 - CocoaAsyncSocket源碼解析(Connect篇)
注:文中涉及代碼比較多,建議大家結(jié)合源碼一起閱讀比較容易能加深理解。這里有樓主標(biāo)注好注釋的源碼,有需要的可以作為參照:CocoaAsyncSocket源碼注釋
如果對該框架用法不熟悉的話,可以參考樓主之前這篇文章:iOS即時通訊,從入門到“放棄”?,或者自行查閱。
上文我們提到了GCDAsyncSocket的初始化,以及最終connect之前的準(zhǔn)備工作,包括一些錯誤檢查;本機地址創(chuàng)建以及socket創(chuàng)建;服務(wù)端地址的創(chuàng)建;還有一些本機socket可選項的配置,例如禁止網(wǎng)絡(luò)出錯導(dǎo)致進程關(guān)閉的信號等。
言歸正傳,繼續(xù)上文往下講
上文講到了本文方法八--創(chuàng)建Socket,其中有這么一行代碼:
//和connectInterface綁定
if (![self bindSocket:socketFD toInterface:connectInterface error:errPtr])
{
//綁定失敗,直接關(guān)閉返回
[self closeSocket:socketFD];
return SOCKET_NULL;
}
我們?nèi)ビ弥皠?chuàng)建的本機地址去做socket綁定,接著會調(diào)用到如下方法中:
本文方法九--給Socket綁定本機地址
//綁定一個Socket的本地地址
- (BOOL)bindSocket:(int)socketFD toInterface:(NSData *)connectInterface error:(NSError **)errPtr
{
// Bind the socket to the desired interface (if needed)
//無接口就不綁定,connect會自動綁定到一個不沖突的端口上去。
if (connectInterface)
{
LogVerbose(@"Binding socket...");
//判斷當(dāng)前地址的Port是不是大于0
if ([[self class] portFromAddress:connectInterface] > 0)
{
// Since we're going to be binding to a specific port,
// we should turn on reuseaddr to allow us to override sockets in time_wait.
int reuseOn = 1;
//設(shè)置調(diào)用close(socket)后,仍可繼續(xù)重用該socket。調(diào)用close(socket)一般不會立即關(guān)閉socket,而經(jīng)歷TIME_WAIT的過程。
setsockopt(socketFD, SOL_SOCKET, SO_REUSEADDR, &reuseOn, sizeof(reuseOn));
}
//拿到地址
const struct sockaddr *interfaceAddr = (const struct sockaddr *)[connectInterface bytes];
//綁定這個地址
int result = bind(socketFD, interfaceAddr, (socklen_t)[connectInterface length]);
//綁定出錯,返回NO
if (result != 0)
{
if (errPtr)
*errPtr = [self errnoErrorWithReason:@"Error in bind() function"];
return NO;
}
}
//成功
return YES;
}
這個方法也非常簡單,如果沒有connectInterface則直接返回YES,當(dāng)socket進行連接的時候,會自動綁定一個端口,進行連接。
如果有值,則我們開始綁定到我們一開始指定的地址上。
這里調(diào)用了兩個和scoket相關(guān)的函數(shù):
第一個是我們之前提到的配置scoket參數(shù)的函數(shù):
setsockopt(socketFD, SOL_SOCKET, SO_REUSEADDR, &reuseOn, sizeof(reuseOn));
這里調(diào)用這個函數(shù)的主要目的是為了調(diào)用close的時候,不立即去關(guān)閉socket連接,而是經(jīng)歷一個TIME_WAIT過程。在這個過程中,socket是可以被復(fù)用的。我們注意到之前的connect流程并沒有看到復(fù)用socket的代碼。注意,我們現(xiàn)在走的連接流程是客戶端的流程,等我們講到服務(wù)端accept進行連接的時候,我們就能看到這個復(fù)用的作用了。
第二個是bind函數(shù)
int result = bind(socketFD, interfaceAddr, (socklen_t)[connectInterface length]);
這個函數(shù)倒是很簡單,就3個參數(shù),socket、需要綁定的地址、地址大小。這樣就把socket和這個地址(其實就是端口)捆綁在一起了。
這樣我們就做完了最終連接前所有準(zhǔn)備工作,本機socket有了,服務(wù)端的地址也有了。接著我們就可以開始進行最終連接了:
本文方法十 -- 建立連接的最終方法
//連接最終方法 3 finnal。。。
- (void)connectSocket:(int)socketFD address:(NSData *)address stateIndex:(int)aStateIndex
{
// If there already is a socket connected, we close socketFD and return
//已連接,關(guān)閉連接返回
if (self.isConnected)
{
[self closeSocket:socketFD];
return;
}
// Start the connection process in a background queue
//開始連接過程,在后臺queue中
__weak GCDAsyncSocket *weakSelf = self;
//獲取到全局Queue
dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
//新線程
dispatch_async(globalConcurrentQueue, ^{
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"
//調(diào)用connect方法,該函數(shù)阻塞線程,所以要異步新線程
//客戶端向特定網(wǎng)絡(luò)地址的服務(wù)器發(fā)送連接請求,連接成功返回0,失敗返回 -1。
int result = connect(socketFD, (const struct sockaddr *)[address bytes], (socklen_t)[address length]);
//老樣子,安全判斷
__strong GCDAsyncSocket *strongSelf = weakSelf;
if (strongSelf == nil) return_from_block;
//在socketQueue中,開辟線程
dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
//如果狀態(tài)為已經(jīng)連接,關(guān)閉連接返回
if (strongSelf.isConnected)
{
[strongSelf closeSocket:socketFD];
return_from_block;
}
//說明連接成功
if (result == 0)
{
//關(guān)閉掉另一個沒用的socket
[self closeUnusedSocket:socketFD];
//調(diào)用didConnect,生成stream,改變狀態(tài)等等!
[strongSelf didConnect:aStateIndex];
}
//連接失敗
else
{
//關(guān)閉當(dāng)前socket
[strongSelf closeSocket:socketFD];
// If there are no more sockets trying to connect, we inform the error to the delegate
//返回連接錯誤的error
if (strongSelf.socket4FD == SOCKET_NULL && strongSelf.socket6FD == SOCKET_NULL)
{
NSError *error = [strongSelf errnoErrorWithReason:@"Error in connect() function"];
[strongSelf didNotConnect:aStateIndex error:error];
}
}
}});
#pragma clang diagnostic pop
});
//輸出正在連接中
LogVerbose(@"Connecting...");
}
這個方法主要就是做了一件事,調(diào)用下面一個函數(shù)進行連接:
int result = connect(socketFD, (const struct sockaddr *)[address bytes], (socklen_t)[address length]);
這里需要注意的是這個函數(shù)是阻塞,直到結(jié)果返回之前,線程會一直停在這行。所以這里用的是全局并發(fā)隊列,開辟了一個新的線程進行連接,在得到結(jié)果之后,又調(diào)回socketQueue中進行后續(xù)操作。
如果result為0,說明連接成功,我們會關(guān)閉掉另外一個沒有用到的socket(如果有的話)。然后調(diào)用另外一個方法做一些連接成功的初始化操作。
否則連接失敗,我們會關(guān)閉socket,填充錯誤并且返回。
我們接著來看看連接成功后,初始化的方法:
本文方法十一 -- 連接成功后的初始化
//連接成功后調(diào)用,設(shè)置一些連接成功的狀態(tài)
- (void)didConnect:(int)aStateIndex
{
LogTrace();
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
//狀態(tài)不同
if (aStateIndex != stateIndex)
{
LogInfo(@"Ignoring didConnect, already disconnected");
// The connect operation has been cancelled.
// That is, socket was disconnected, or connection has already timed out.
return;
}
//kConnected合并到當(dāng)前flag中
flags |= kConnected;
//停止連接超時
[self endConnectTimeout];
#if TARGET_OS_IPHONE
// The endConnectTimeout method executed above incremented the stateIndex.
//上面的endConnectTimeout,會導(dǎo)致stateIndex增加,所以需要重新賦值
aStateIndex = stateIndex;
#endif
// Setup read/write streams (as workaround for specific shortcomings in the iOS platform)
//
// Note:
// There may be configuration options that must be set by the delegate before opening the streams.
//打開stream之前必須用相關(guān)配置設(shè)置代理
// The primary example is the kCFStreamNetworkServiceTypeVoIP flag, which only works on an unopened stream.
//主要的例子是kCFStreamNetworkServiceTypeVoIP標(biāo)記,只能工作在未打開的stream中?
//
// Thus we wait until after the socket:didConnectToHost:port: delegate method has completed.
//所以我們要等待,連接完成的代理調(diào)用完
// This gives the delegate time to properly configure the streams if needed.
//這些給了代理時間,去正確的配置Stream,如果是必要的話
//創(chuàng)建個Block來初始化Stream
dispatch_block_t SetupStreamsPart1 = ^{
NSLog(@"hello~");
#if TARGET_OS_IPHONE
//創(chuàng)建讀寫stream失敗,則關(guān)閉并報對應(yīng)錯誤
if (![self createReadAndWriteStream])
{
[self closeWithError:[self otherError:@"Error creating CFStreams"]];
return;
}
//參數(shù)是給NO的,就是有可讀bytes的時候,不會調(diào)用回調(diào)函數(shù)
if (![self registerForStreamCallbacksIncludingReadWrite:NO])
{
[self closeWithError:[self otherError:@"Error in CFStreamSetClient"]];
return;
}
#endif
};
//part2設(shè)置stream
dispatch_block_t SetupStreamsPart2 = ^{
#if TARGET_OS_IPHONE
//狀態(tài)不一樣直接返回
if (aStateIndex != stateIndex)
{
// The socket has been disconnected.
return;
}
//如果加到runloop上失敗
if (![self addStreamsToRunLoop])
{
//錯誤返回
[self closeWithError:[self otherError:@"Error in CFStreamScheduleWithRunLoop"]];
return;
}
//讀寫stream open
if (![self openStreams])
{
//開啟錯誤返回
[self closeWithError:[self otherError:@"Error creating CFStreams"]];
return;
}
#endif
};
// Notify delegate
//通知代理
//拿到server端的host port
NSString *host = [self connectedHost];
uint16_t port = [self connectedPort];
//拿到unix域的 url
NSURL *url = [self connectedUrl];
//拿到代理
__strong id theDelegate = delegate;
//代理隊列 和 Host不為nil 且響應(yīng)didConnectToHost代理方法
if (delegateQueue && host != nil && [theDelegate respondsToSelector:@selector(socket:didConnectToHost:port:)])
{
//調(diào)用初始化stream1
SetupStreamsPart1();
dispatch_async(delegateQueue, ^{ @autoreleasepool {
//到代理隊列調(diào)用連接成功的代理方法
[theDelegate socket:self didConnectToHost:host port:port];
//然后回到socketQueue中去執(zhí)行初始化stream2
dispatch_async(socketQueue, ^{ @autoreleasepool {
SetupStreamsPart2();
}});
}});
}
//這個是unix domain 請求回調(diào)
else if (delegateQueue && url != nil && [theDelegate respondsToSelector:@selector(socket:didConnectToUrl:)])
{
SetupStreamsPart1();
dispatch_async(delegateQueue, ^{ @autoreleasepool {
[theDelegate socket:self didConnectToUrl:url];
dispatch_async(socketQueue, ^{ @autoreleasepool {
SetupStreamsPart2();
}});
}});
}
//否則只初始化stream
else
{
SetupStreamsPart1();
SetupStreamsPart2();
}
// Get the connected socket
int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
//fcntl,功能描述:根據(jù)文件描述詞來操作文件的特性。http://blog.csdn.net/pbymw8iwm/article/details/7974789
// Enable non-blocking IO on the socket
//使socket支持非阻塞IO
int result = fcntl(socketFD, F_SETFL, O_NONBLOCK);
if (result == -1)
{
//失敗 ,報錯
NSString *errMsg = @"Error enabling non-blocking IO on socket (fcntl)";
[self closeWithError:[self otherError:errMsg]];
return;
}
// Setup our read/write sources
//初始化讀寫source
[self setupReadAndWriteSourcesForNewlyConnectedSocket:socketFD];
// Dequeue any pending read/write requests
//開始下一個任務(wù)
[self maybeDequeueRead];
[self maybeDequeueWrite];
}
這個方法很長一大串,其實做的東西也很簡單,主要做了下面幾件事:
- 把當(dāng)前狀態(tài)flags加上已連接,并且關(guān)閉掉我們一開始連接開啟的,連接超時的定時器。
- 初始化了兩個
Block:SetupStreamsPart1、SetupStreamsPart2,這兩個Block做的事都和讀寫流有關(guān)。SetupStreamsPart1用來創(chuàng)建讀寫流,并且注冊回調(diào)。另一個SetupStreamsPart2用來把流添加到當(dāng)前線程的runloop上,并且打開流。 - 判斷是否有代理
queue、host或者url這些參數(shù)是否為空、是否代理響應(yīng)didConnectToHost或didConnectToUrl代理,這兩種分別對應(yīng)了普通socket連接和unix domin socket連接。如果實現(xiàn)了對應(yīng)的代理,則調(diào)用連接成功的代理。 - 在調(diào)用代理的同時,調(diào)用了我們之前初始化的兩個讀寫流相關(guān)的
Block。這里值得說下的是這兩個Block和代理之間的調(diào)用順序:
-
先執(zhí)行
SetupStreamsPart1后執(zhí)行SetupStreamsPart2,沒什么好說的,問題是代理的執(zhí)行時間,想想如果我們放在SetupStreamsPart2后面是不是會導(dǎo)致個問題,就是用戶收到消息了,但是連接成功的代理還沒有被調(diào)用,這顯然是不合理的。所以我們的調(diào)用順序是SetupStreamsPart1->代理->SetupStreamsPart2所以出現(xiàn)了如下代碼:
//調(diào)用初始化stream1
SetupStreamsPart1();
dispatch_async(delegateQueue, ^{ @autoreleasepool {
//到代理隊列調(diào)用連接成功的代理方法
[theDelegate socket:self didConnectToHost:host port:port];
//然后回到socketQueue中去執(zhí)行初始化stream2
dispatch_async(socketQueue, ^{ @autoreleasepool {
SetupStreamsPart2();
}});
}});
原因是為了線程安全和socket相關(guān)的操作必須在socketQueue中進行。而代理必須在我們設(shè)置的代理queue中被回調(diào)。
- 拿到當(dāng)前的本機socket,調(diào)用如下函數(shù):
int result = fcntl(socketFD, F_SETFL, O_NONBLOCK);
簡單來說,這個函數(shù)類似我們之前提到的一個函數(shù)setsockopt(),都是給socket設(shè)置一些參數(shù),以實現(xiàn)一些功能。而這個函數(shù),能實現(xiàn)的功能更多。大家可以看看這篇文章參考參考:fcntl函數(shù)詳解
而在這里,就是為了把socket的IO模式設(shè)置為非阻塞。很多小伙伴又要疑惑什么是非阻塞了,先別急,關(guān)于這個我們下文會詳細(xì)的來談。
我們初始化了讀寫source(很重要,所有的消息都是由這個source來觸發(fā)的,我們之后會詳細(xì)分析這個方法)。
我們做完了
stream和source的初始化處理,則開始做一次讀寫任務(wù)(這兩個方法暫時不講,會放到之后的Read和Write篇中去講)。
我們接著來講講這個方法中對其他方法的調(diào)用,按照順序來,先從第2條,兩個Block中對stream的處理開始。和stream相關(guān)的函數(shù)一共有6個:
Stream相關(guān)方法一 -- 創(chuàng)建讀寫stream
//創(chuàng)建讀寫stream
- (BOOL)createReadAndWriteStream
{
LogTrace();
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
//如果有一個有值,就返回
if (readStream || writeStream)
{
// Streams already created
return YES;
}
//拿到socket,首選是socket4FD,其次socket6FD,都沒有才是socketUN,socketUN應(yīng)該是Unix的socket結(jié)構(gòu)體
int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
//如果都為空,返回NO
if (socketFD == SOCKET_NULL)
{
// Cannot create streams without a file descriptor
return NO;
}
//如果非連接,返回NO
if (![self isConnected])
{
// Cannot create streams until file descriptor is connected
return NO;
}
LogVerbose(@"Creating read and write stream...");
#pragma mark - 綁定Socket和CFStream
//下面的接口用于創(chuàng)建一對 socket stream,一個用于讀取,一個用于寫入:
CFStreamCreatePairWithSocket(NULL, (CFSocketNativeHandle)socketFD, &readStream, &writeStream);
// The kCFStreamPropertyShouldCloseNativeSocket property should be false by default (for our case).
// But let's not take any chances.
//讀寫stream都設(shè)置成不會隨著綁定的socket一起close,release。 kCFBooleanFalse不一起,kCFBooleanTrue一起
if (readStream)
CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
if (writeStream)
CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
//如果有一個為空
if ((readStream == NULL) || (writeStream == NULL))
{
LogWarn(@"Unable to create read and write stream...");
//關(guān)閉對應(yīng)的stream
if (readStream)
{
CFReadStreamClose(readStream);
CFRelease(readStream);
readStream = NULL;
}
if (writeStream)
{
CFWriteStreamClose(writeStream);
CFRelease(writeStream);
writeStream = NULL;
}
//返回創(chuàng)建失敗
return NO;
}
//創(chuàng)建成功
return YES;
}
這個方法基本上很簡單,就是關(guān)于兩個stream函數(shù)的調(diào)用:
- 創(chuàng)建stream的函數(shù):
CFStreamCreatePairWithSocket(NULL, (CFSocketNativeHandle)socketFD, &readStream, &writeStream);
這個函數(shù)創(chuàng)建了一對讀寫stream,并且把stream與這個scoket做了綁定。
- 設(shè)置stream屬性:
CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
這個函數(shù)可以給stream設(shè)置一個屬性,這里是設(shè)置stream不會隨著socket的生命周期(close,release)而變化。
接著調(diào)用了registerForStreamCallbacksIncludingReadWrite來給stream注冊讀寫回調(diào)。
Stream相關(guān)方法二 -- 讀寫回調(diào)的注冊:
//注冊Stream的回調(diào)
- (BOOL)registerForStreamCallbacksIncludingReadWrite:(BOOL)includeReadWrite
{
LogVerbose(@"%@ %@", THIS_METHOD, (includeReadWrite ? @"YES" : @"NO"));
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
//判斷讀寫stream是不是都為空
NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null");
//客戶端stream上下文對象
streamContext.version = 0;
streamContext.info = (__bridge void *)(self);
streamContext.retain = nil;
streamContext.release = nil;
streamContext.copyDescription = nil;
// The open has completed successfully.
// The stream has bytes to be read.
// The stream can accept bytes for writing.
// An error has occurred on the stream.
// The end of the stream has been reached.
//設(shè)置一個CF的flag 兩種,一種是錯誤發(fā)生的時候,一種是stream事件結(jié)束
CFOptionFlags readStreamEvents = kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered ;
//如果包含讀寫
if (includeReadWrite)
//仍然有Bytes要讀的時候 The stream has bytes to be read.
readStreamEvents |= kCFStreamEventHasBytesAvailable;
//給讀stream設(shè)置客戶端,會在之前設(shè)置的那些標(biāo)記下回調(diào)函數(shù) CFReadStreamCallback。設(shè)置失敗的話直接返回NO
if (!CFReadStreamSetClient(readStream, readStreamEvents, &CFReadStreamCallback, &streamContext))
{
return NO;
}
//寫的flag,也一樣
CFOptionFlags writeStreamEvents = kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered;
if (includeReadWrite)
writeStreamEvents |= kCFStreamEventCanAcceptBytes;
if (!CFWriteStreamSetClient(writeStream, writeStreamEvents, &CFWriteStreamCallback, &streamContext))
{
return NO;
}
//走到最后說明讀寫都設(shè)置回調(diào)成功,返回YES
return YES;
}
相信用過CFStream的朋友,應(yīng)該會覺得很簡單,這個方法就是調(diào)用了一些CFStream相關(guān)函數(shù),其中最主要的這個設(shè)置讀寫回調(diào)函數(shù):
Boolean CFReadStreamSetClient(CFReadStreamRef stream, CFOptionFlags streamEvents, CFReadStreamClientCallBack clientCB, CFStreamClientContext *clientContext);
Boolean CFWriteStreamSetClient(CFWriteStreamRef stream, CFOptionFlags streamEvents, CFWriteStreamClientCallBack clientCB, CFStreamClientContext *clientContext);
這個函數(shù)共4個參數(shù):
第1個為我們需要設(shè)置的stream;
第2個為需要監(jiān)聽的事件選項,包括以下事件:
typedef CF_OPTIONS(CFOptionFlags, CFStreamEventType) {
kCFStreamEventNone = 0, //沒有事件發(fā)生
kCFStreamEventOpenCompleted = 1, //成功打開流
kCFStreamEventHasBytesAvailable = 2, //流中有數(shù)據(jù)可讀
kCFStreamEventCanAcceptBytes = 4, //流中可以接受數(shù)據(jù)去寫
kCFStreamEventErrorOccurred = 8, //流發(fā)生錯誤
kCFStreamEventEndEncountered = 16 //到達(dá)流的結(jié)尾
};
其中具體用法,大家可以自行去試試,這里作者只監(jiān)聽了了兩種事件kCFStreamEventErrorOccurred和kCFStreamEventEndEncountered,再根據(jù)傳過來的參數(shù)去決定是否監(jiān)聽kCFStreamEventCanAcceptBytes:
//如果包含讀寫
if (includeReadWrite)
//仍然有Bytes要讀的時候 The stream has bytes to be read.
readStreamEvents |= kCFStreamEventHasBytesAvailable;
而這里我們傳過來的參數(shù)為NO,導(dǎo)致它并不監(jiān)聽可讀數(shù)據(jù)。顯然,我們正常的連接,當(dāng)有消息發(fā)送過來,并不是由stream回調(diào)來觸發(fā)的。這個框架中,如果是TLS傳輸?shù)?code>socket是用stream來觸發(fā)的,這個我們后續(xù)文章會講到。
那么有數(shù)據(jù)的時候,到底是什么來觸發(fā)我們的讀寫呢,答案就是讀寫source,我們接下來就會去創(chuàng)建初始化它。
這里綁定了兩個函數(shù),分別對應(yīng)讀和寫的回調(diào),分別為:
//讀的回調(diào)
static void CFReadStreamCallback (CFReadStreamRef stream, CFStreamEventType type, void *pInfo)
//寫的回調(diào)
static void CFWriteStreamCallback (CFWriteStreamRef stream, CFStreamEventType type, void *pInfo)
關(guān)于這兩個函數(shù),同樣這里暫時不做討論,等后續(xù)文章再來分析。
還有一點需要說一下的是streamContext這個屬性,它是一個結(jié)構(gòu)體,包含流的上下文信息,其結(jié)構(gòu)如下:
typedef struct {
CFIndex version;
void *info;
void *(*retain)(void *info);
void (*release)(void *info);
CFStringRef (*copyDescription)(void *info);
} CFStreamClientContext;
這個流的上下文中info指針,其實就是前面所對應(yīng)的讀寫回調(diào)函數(shù)中的pInfo指針,每次回調(diào)都會傳過去。其它的version就是流的版本標(biāo)識,之外的3個都需要的是一個函數(shù)指針,對應(yīng)我們傳遞的pInfo的持有以及釋放還有復(fù)制的描述信息,這里我們都賦值給nil。
接著我們來到流處理的第三步:addStreamsToRunLoop-添加到runloop上。
Stream相關(guān)方法三 -- 加到當(dāng)前線程的runloop上:
//把stream添加到runloop上
- (BOOL)addStreamsToRunLoop
{
LogTrace();
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null");
//判斷flag里是否包含kAddedStreamsToRunLoop,沒添加過則添加。
if (!(flags & kAddedStreamsToRunLoop))
{
LogVerbose(@"Adding streams to runloop...");
[[self class] startCFStreamThreadIfNeeded];
//在開啟的線程中去執(zhí)行,阻塞式的
[[self class] performSelector:@selector(scheduleCFStreams:)
onThread:cfstreamThread
withObject:self
waitUntilDone:YES];
//添加標(biāo)識
flags |= kAddedStreamsToRunLoop;
}
return YES;
}
這里方法做了兩件事:
- 開啟了一條用于
CFStream讀寫回調(diào)的常駐線程,其中調(diào)用了好幾個函數(shù):
+ (void)startCFStreamThreadIfNeeded;
+ (void)cfstreamThread;
在這兩個函數(shù)中,添加了一個runloop,并且綁定了一個定時器事件,讓它run起來,使得線程常駐。大家可以結(jié)合著github中demo的注釋,自行查看這幾個方法。如果有任何疑問可以看看樓主這篇文章:基于runloop的線程?;睢N毀與通信,或者本文下評論,會一一解答。
- 在這個常駐線程中去調(diào)用注冊方法:
//注冊CFStream
+ (void)scheduleCFStreams:(GCDAsyncSocket *)asyncSocket
{
LogTrace();
//斷言當(dāng)前線程是cfstreamThread,不是則報錯
NSAssert([NSThread currentThread] == cfstreamThread, @"Invoked on wrong thread");
//獲取到runloop
CFRunLoopRef runLoop = CFRunLoopGetCurrent();
//如果有readStream
if (asyncSocket->readStream)
//注冊readStream在runloop的kCFRunLoopDefaultMode上
CFReadStreamScheduleWithRunLoop(asyncSocket->readStream, runLoop, kCFRunLoopDefaultMode);
//一樣
if (asyncSocket->writeStream)
CFWriteStreamScheduleWithRunLoop(asyncSocket->writeStream, runLoop, kCFRunLoopDefaultMode);
}
這里可以看到,我們流的回調(diào)都是在這條流的常駐線程中,至于為什么要這么做,相信大家樓主看過AFNetworking系列文章的會明白。我們之后文章也會就這個框架線程的問題詳細(xì)討論的,這里就暫時不詳細(xì)說明了。
這里主要用了CFReadStreamScheduleWithRunLoop函數(shù)完成了runloop的注冊:
CFReadStreamScheduleWithRunLoop(asyncSocket->readStream, runLoop, kCFRunLoopDefaultMode);
CFWriteStreamScheduleWithRunLoop(asyncSocket->writeStream, runLoop, kCFRunLoopDefaultMode);
這樣,如果stream中有我們監(jiān)聽的事件發(fā)生了,就會在這個runloop中觸發(fā)我們之前設(shè)置的讀寫回調(diào)函數(shù)。
我們完成了注冊,接下來我們就需要打開stream了:
Stream相關(guān)方法四 -- 打開stream:
//打開stream
- (BOOL)openStreams
{
LogTrace();
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
//斷言讀寫stream都不會空
NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null");
//返回stream的狀態(tài)
CFStreamStatus readStatus = CFReadStreamGetStatus(readStream);
CFStreamStatus writeStatus = CFWriteStreamGetStatus(writeStream);
//如果有任意一個沒有開啟
if ((readStatus == kCFStreamStatusNotOpen) || (writeStatus == kCFStreamStatusNotOpen))
{
LogVerbose(@"Opening read and write stream...");
//開啟
BOOL r1 = CFReadStreamOpen(readStream);
BOOL r2 = CFWriteStreamOpen(writeStream);
//有一個開啟失敗
if (!r1 || !r2)
{
LogError(@"Error in CFStreamOpen");
return NO;
}
}
return YES;
}
方法也很簡單,通過CFReadStreamGetStatus函數(shù),獲取到當(dāng)前stream的狀態(tài),判斷沒開啟則調(diào)用CFReadStreamOpen函數(shù)去開啟,如果開啟失敗,錯誤返回。
到這里stream初始化相關(guān)的工作就做完了,接著我們還是回到本文方法十一 -- 連接成功后的初始化中:
其中第5條,我們談到了設(shè)置socket的I/O模式為非阻塞,相信很多朋友對socket的I/O:同步、異步、阻塞、非阻塞。這四個概念有所混淆。
簡單的來說,同步、異步是對于客戶端而言的。比如我發(fā)起一個調(diào)用一個函數(shù),我如果直接去調(diào)用,那么就是同步的,否則新開辟一個線程去做,那么對于當(dāng)前線程而言就是異步的。
而阻塞和非阻塞是對于服務(wù)端而言。當(dāng)服務(wù)端被客戶端調(diào)用后,我如果立刻返回調(diào)用的結(jié)果(無論數(shù)據(jù)是否處理完)那么就是非阻塞的,又或者等待數(shù)據(jù)拿到并且處理完(總之一系列邏輯)再返回,那么這種情況就是阻塞的。
好了,有了這個概念,我們接下來看看Linux下的5種I/O模型:
1)阻塞I/O(blocking I/O)
2)非阻塞I/O (nonblocking I/O)
- I/O復(fù)用(select 和poll) (I/O multiplexing)
4)信號驅(qū)動I/O (signal driven I/O (SIGIO))
5)異步I/O (asynchronous I/O (the POSIX aio_functions))
我們來簡單談?wù)勥@5種模型:
1)阻塞I/O:
簡單舉個例子,比如我們調(diào)用read()去讀取消息,如果是在阻塞模式下,我們會一直等待,直到有消息到來為止。
很多小伙伴可能又要說了,這有什么不可以,我們新開辟一條線程,讓它等著不就行了,看起來確實沒什么不可以。
那是因為你僅僅是站在客戶端的角度上來看。試想如果我們服務(wù)端也這么做,那豈不是有多少個socket連接,我們得開辟多少個線程去做阻塞IO?
2)非阻塞I/O
于是就有了非阻塞的概念,當(dāng)我們?nèi)?code>read()的時候,直接返回結(jié)果,這樣在很大概率下,是并沒有消息給我們讀的。這時候函數(shù)就會錯誤返回-1,并將errno設(shè)置為 EWOULDBLOCK,意為IO并沒有數(shù)據(jù)。
這時候就需要我們自己有一個機制,能知道什么時候有數(shù)據(jù),在去調(diào)用read()。有一個很傻的方式就是不停的循環(huán)去調(diào)用這個函數(shù),這樣有數(shù)據(jù)來,我們第一時間就讀到了。
3)I/O復(fù)用模式
I/O復(fù)用模式是阻塞I/O的改進版,它在read之前,會先去調(diào)用select去遍歷所有的socket,看哪一個有消息。當(dāng)然這個過程是阻塞的,直到有消息返回為止。然后在去調(diào)用read,阻塞的方式去讀取從系統(tǒng)內(nèi)核中去讀取這條消息到進程中來。
4)信號驅(qū)動I/O
信號驅(qū)動I/O是一個半異步的I/O模式,它首先會調(diào)用一個系統(tǒng)sginal相關(guān)的函數(shù),把socket和信號綁定起來,然后不管有沒有消息直接返回(這一步非阻塞)。這時候系統(tǒng)內(nèi)核會去檢查socket是否有可用數(shù)據(jù)。有的話則發(fā)送該信號給進程,然后進程在去調(diào)用read阻塞式的從系統(tǒng)內(nèi)核讀取數(shù)據(jù)到進程中來(這一步阻塞)。
5)可能聰明的你已經(jīng)想到了更好的解決方式,這就對了,這就是我們第5種IO模式:異步I/O ,它和第4步一樣,也是調(diào)用sginal相關(guān)函數(shù),把socket和信號綁定起來,同時綁定起來的還有一塊數(shù)據(jù)緩沖區(qū)buffer。然后無論有沒有數(shù)據(jù)直接返回(非阻塞)。而系統(tǒng)內(nèi)核會去檢查是否有可用數(shù)據(jù),一旦有可用數(shù)據(jù),則觸發(fā)信號,并且把數(shù)據(jù)填充到我們之前提供的數(shù)據(jù)緩沖區(qū)buffer中。這樣我們進程被信號觸發(fā),并且直接能從buffer中讀取到數(shù)據(jù),整個過程沒有任何阻塞。
很顯然,我們CocoaAyncSocket框架用的就是第5種I/O模式。
如果大家對I/O模式仍然感到疑惑,可以看看這篇文章:
socket阻塞與非阻塞,同步與異步、I/O模型
接著我們繼續(xù)看本文方法十一 -- 連接成功后的初始化中第6條,讀寫source的初始化方法:
本文方法十二 -- 初始化讀寫source:
//初始化讀寫source
- (void)setupReadAndWriteSourcesForNewlyConnectedSocket:(int)socketFD
{
//GCD source DISPATCH_SOURCE_TYPE_READ 會一直監(jiān)視著 socketFD,直到有數(shù)據(jù)可讀
readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socketFD, 0, socketQueue);
//_dispatch_source_type_write :監(jiān)視著 socketFD,直到寫數(shù)據(jù)了
writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, socketFD, 0, socketQueue);
// Setup event handlers
__weak GCDAsyncSocket *weakSelf = self;
#pragma mark readSource的回調(diào)
//GCD事件句柄 讀,當(dāng)socket中有數(shù)據(jù)流出現(xiàn),就會觸發(fā)這個句柄,全自動,不需要手動觸發(fā)
dispatch_source_set_event_handler(readSource, ^{ @autoreleasepool {
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"
__strong GCDAsyncSocket *strongSelf = weakSelf;
if (strongSelf == nil) return_from_block;
LogVerbose(@"readEventBlock");
//從readSource中,獲取到數(shù)據(jù)長度,
strongSelf->socketFDBytesAvailable = dispatch_source_get_data(strongSelf->readSource);
LogVerbose(@"socketFDBytesAvailable: %lu", strongSelf->socketFDBytesAvailable);
//如果長度大于0,開始讀數(shù)據(jù)
if (strongSelf->socketFDBytesAvailable > 0)
[strongSelf doReadData];
else
//因為觸發(fā)了,但是卻沒有可讀數(shù)據(jù),說明讀到當(dāng)前包邊界了。做邊界處理
[strongSelf doReadEOF];
#pragma clang diagnostic pop
}});
//寫事件句柄
dispatch_source_set_event_handler(writeSource, ^{ @autoreleasepool {
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"
__strong GCDAsyncSocket *strongSelf = weakSelf;
if (strongSelf == nil) return_from_block;
LogVerbose(@"writeEventBlock");
//標(biāo)記為接受數(shù)據(jù)
strongSelf->flags |= kSocketCanAcceptBytes;
//開始寫
[strongSelf doWriteData];
#pragma clang diagnostic pop
}});
// Setup cancel handlers
__block int socketFDRefCount = 2;
#if !OS_OBJECT_USE_OBJC
dispatch_source_t theReadSource = readSource;
dispatch_source_t theWriteSource = writeSource;
#endif
//讀寫取消的句柄
dispatch_source_set_cancel_handler(readSource, ^{
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"
LogVerbose(@"readCancelBlock");
#if !OS_OBJECT_USE_OBJC
LogVerbose(@"dispatch_release(readSource)");
dispatch_release(theReadSource);
#endif
if (--socketFDRefCount == 0)
{
LogVerbose(@"close(socketFD)");
//關(guān)閉socket
close(socketFD);
}
#pragma clang diagnostic pop
});
dispatch_source_set_cancel_handler(writeSource, ^{
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"
LogVerbose(@"writeCancelBlock");
#if !OS_OBJECT_USE_OBJC
LogVerbose(@"dispatch_release(writeSource)");
dispatch_release(theWriteSource);
#endif
if (--socketFDRefCount == 0)
{
LogVerbose(@"close(socketFD)");
//關(guān)閉socket
close(socketFD);
}
#pragma clang diagnostic pop
});
// We will not be able to read until data arrives.
// But we should be able to write immediately.
//設(shè)置未讀數(shù)量為0
socketFDBytesAvailable = 0;
//把讀掛起的狀態(tài)移除
flags &= ~kReadSourceSuspended;
LogVerbose(@"dispatch_resume(readSource)");
//開啟讀source
dispatch_resume(readSource);
//標(biāo)記為當(dāng)前可接受數(shù)據(jù)
flags |= kSocketCanAcceptBytes;
//先把寫source標(biāo)記為掛起
flags |= kWriteSourceSuspended;
}
這個方法初始化了讀寫source,這個方法主要是GCD source運用,如果有對這部分知識有所疑問,可以看看宜龍大神這篇:GCD高級用法。
這里GCD Source相關(guān)的主要是下面這3個函數(shù):
//創(chuàng)建source
dispatch_source_create(dispatch_source_type_t type,
uintptr_t handle,
unsigned long mask,
dispatch_queue_t _Nullable queue);
//為source設(shè)置事件句柄
dispatch_source_set_event_handler(dispatch_source_t source,
dispatch_block_t _Nullable handler);
//為source設(shè)置取消句柄
dispatch_source_set_cancel_handler(dispatch_source_t source,
dispatch_block_t _Nullable handler);
相信大家用至少用過GCD定時器,接觸過這3個函數(shù),這里創(chuàng)建source的函數(shù),根據(jù)參數(shù)type的不同,可以處理不同的事件:
這里我們用的是DISPATCH_SOURCE_TYPE_READ和DISPATCH_SOURCE_TYPE_WRITE這兩個類型。標(biāo)識如果handle如果有可讀或者可寫數(shù)據(jù)時,會觸發(fā)我們的事件句柄。
- 而這里初始化的讀寫事件句柄內(nèi)容也很簡單,就是去讀寫數(shù)據(jù)。
- 而取消句柄也就是去關(guān)閉
socket。 - 初始化完成后,我們開啟了
readSource,一旦有數(shù)據(jù)過來就觸發(fā)了我們readSource事件句柄,就可以去監(jiān)聽的socket所分配的緩沖區(qū)中去讀取數(shù)據(jù)了,而wirteSource初始化完是掛起的。 - 除此之外我們還初始化了當(dāng)前
source的狀態(tài),用于我們后續(xù)的操作。
至此我們客戶端的整個Connect流程結(jié)束了,用一張圖來概括總結(jié)一下吧:

整個客戶端連接的流程大致如上圖,當(dāng)然遠(yuǎn)不及于此,這里我們對地址做了IPV4和IPV6的兼容處理,對一些使用socket而產(chǎn)生的網(wǎng)絡(luò)錯誤導(dǎo)致進程退出的容錯處理。以及在這個過程中,socketQueue、代理queue、全局并發(fā)queue和stream常駐線程的管理調(diào)度等等。
當(dāng)然其中絕大部分操作都是在socketQueue中進行的。而在socketQueue中,我們也分為兩種操作dispatch_sync和dispatch_async。
因為socketQueue本身就是一個串行queue,所以我們所有的操作都在這個queue中進行保證了線程安全,而需要阻塞后續(xù)行為的操作,我們用了sync的方式。其實這樣使用sync是及其容易死鎖的,但是作者每次在調(diào)用sync之前都調(diào)用了這么一行判斷:
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
判斷當(dāng)前隊列是否就是這個socketQueue隊列,如果是則直接調(diào)用,否則就用sync的方式提交到這個queue中去執(zhí)行。這種防死鎖的方式,你學(xué)到了么?
接著我們來講講服務(wù)端Accept流程:
整個流程還是相對Connect來說還是十分簡單的,因為這個方法很長,而且大多數(shù)是我們直接連接講到過得內(nèi)容,所以我省略了一部分的代碼,只把重要的展示出來,大家可以參照著源碼看。
//監(jiān)聽端口起點
- (BOOL)acceptOnPort:(uint16_t)port error:(NSError **)errPtr
{
return [self acceptOnInterface:nil port:port error:errPtr];
}
- (BOOL)acceptOnInterface:(NSString *)inInterface port:(uint16_t)port error:(NSError **)errPtr
{
LogTrace();
// Just in-case interface parameter is immutable.
//防止參數(shù)被修改
NSString *interface = [inInterface copy];
__block BOOL result = NO;
__block NSError *err = nil;
// CreateSocket Block
// This block will be invoked within the dispatch block below.
//創(chuàng)建socket的Block
int(^createSocket)(int, NSData*) = ^int (int domain, NSData *interfaceAddr) {
//創(chuàng)建TCP的socket
int socketFD = socket(domain, SOCK_STREAM, 0);
//一系列錯誤判斷
...
// Bind socket
//用本地地址去綁定
status = bind(socketFD, (const struct sockaddr *)[interfaceAddr bytes], (socklen_t)[interfaceAddr length]);
//監(jiān)聽這個socket
//第二個參數(shù)是這個端口下維護的socket請求隊列,最多容納的用戶請求數(shù)。
status = listen(socketFD, 1024);
return socketFD;
};
// Create dispatch block and run on socketQueue
dispatch_block_t block = ^{ @autoreleasepool {
//一系列錯誤判斷
...
//判斷ipv4 ipv6是否支持
...
//得到本機的IPV4 IPV6的地址
[self getInterfaceAddress4:&interface4 address6:&interface6 fromDescription:interface port:port];
...
//判斷可以用IPV4還是6進行請求
...
// Create accept sources
//創(chuàng)建接受連接被觸發(fā)的source
if (enableIPv4)
{
//接受連接的source
accept4Source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socket4FD, 0, socketQueue);
//事件句柄
dispatch_source_set_event_handler(accept4Source, ^{ @autoreleasepool {
//拿到數(shù)據(jù),連接數(shù)
unsigned long numPendingConnections = dispatch_source_get_data(acceptSource);
LogVerbose(@"numPendingConnections: %lu", numPendingConnections);
//循環(huán)去接受這些socket的事件(一次觸發(fā)可能有多個連接)
while ([strongSelf doAccept:socketFD] && (++i < numPendingConnections));
}});
//取消句柄
dispatch_source_set_cancel_handler(accept4Source, ^{
//...
//關(guān)閉socket
close(socketFD);
});
//開啟source
dispatch_resume(accept4Source);
}
//ipv6一樣
...
//在scoketQueue中同步做這些初始化。
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
block();
else
dispatch_sync(socketQueue, block);
//...錯誤判斷
//返回結(jié)果
return result;
}
這個方法省略完仍然有這么長,它主要做了這兩件事(篇幅原因,盡量精簡):
- 創(chuàng)建本機地址、創(chuàng)建socket、綁定端口、監(jiān)聽端口。
- 創(chuàng)建了一個
GCD Source,來監(jiān)聽這個socket讀source,這樣連接事件一發(fā)生,就會觸發(fā)我們的事件句柄。接著我們調(diào)用了doAccept:方法循環(huán)去接受所有的連接。
接著我們來看這個接受連接的方法(同樣省略了一部分不那么重要的代碼):
//連接接受的方法
- (BOOL)doAccept:(int)parentSocketFD
{
LogTrace();
int socketType;
int childSocketFD;
NSData *childSocketAddress;
//IPV4
if (parentSocketFD == socket4FD)
{
socketType = 0;
struct sockaddr_in addr;
socklen_t addrLen = sizeof(addr);
//調(diào)用接受,得到接受的子socket
childSocketFD = accept(parentSocketFD, (struct sockaddr *)&addr, &addrLen);
//NO說明沒有連接
if (childSocketFD == -1)
{
LogWarn(@"Accept failed with error: %@", [self errnoError]);
return NO;
}
//子socket的地址數(shù)據(jù)
childSocketAddress = [NSData dataWithBytes:&addr length:addrLen];
}
//一樣
else if (parentSocketFD == socket6FD)
{
...
}
//unix domin socket 一樣
else // if (parentSocketFD == socketUN)
{
...
}
//socket 配置項的設(shè)置... 和connect一樣
//響應(yīng)代理
if (delegateQueue)
{
__strong id theDelegate = delegate;
//代理隊列中調(diào)用
dispatch_async(delegateQueue, ^{ @autoreleasepool {
// Query delegate for custom socket queue
dispatch_queue_t childSocketQueue = NULL;
//判斷是否實現(xiàn)了為socket 生成一個新的SocketQueue,是的話拿到新queue
if ([theDelegate respondsToSelector:@selector(newSocketQueueForConnectionFromAddress:onSocket:)])
{
childSocketQueue = [theDelegate newSocketQueueForConnectionFromAddress:childSocketAddress
onSocket:self];
}
// Create GCDAsyncSocket instance for accepted socket
//新創(chuàng)建一個本類實例,給接受的socket
GCDAsyncSocket *acceptedSocket = [[[self class] alloc] initWithDelegate:theDelegate
delegateQueue:delegateQueue
socketQueue:childSocketQueue];
//IPV4 6 un
if (socketType == 0)
acceptedSocket->socket4FD = childSocketFD;
else if (socketType == 1)
acceptedSocket->socket6FD = childSocketFD;
else
acceptedSocket->socketUN = childSocketFD;
//標(biāo)記開始 并且已經(jīng)連接
acceptedSocket->flags = (kSocketStarted | kConnected);
// Setup read and write sources for accepted socket
//初始化讀寫source
dispatch_async(acceptedSocket->socketQueue, ^{ @autoreleasepool {
[acceptedSocket setupReadAndWriteSourcesForNewlyConnectedSocket:childSocketFD];
}});
//判斷代理是否實現(xiàn)了didAcceptNewSocket方法,把我們新創(chuàng)建的socket返回出去
if ([theDelegate respondsToSelector:@selector(socket:didAcceptNewSocket:)])
{
[theDelegate socket:self didAcceptNewSocket:acceptedSocket];
}
}});
}
return YES;
}
- 這個方法很簡單,核心就是調(diào)用下面這個函數(shù),去接受連接,并且拿到一個新的
socket
childSocketFD = accept(parentSocketFD, (struct sockaddr *)&addr, &addrLen);
- 然后調(diào)用了
newSocketQueueForConnectionFromAddress:onSocket:這個代理,可以為新的socket重新設(shè)置一個socketQueue。 - 接著我們用這個
Socket重新創(chuàng)建了一個GCDAsyncSocket實例,然后調(diào)用我們的代理didAcceptNewSocket方法,把這個實例給傳出去了。 - 這里需要注意的是,我們調(diào)用
didAcceptNewSocket代理方法傳出去的實例我們需要自己保留,不然就會被釋放掉,那么這個與客戶端的連接也就斷開了。 - 同時我們還初始化了這個新
socket的讀寫source,這一步完全和connect中一樣,調(diào)用同一個方法,這樣如果有讀寫數(shù)據(jù),就會觸發(fā)這個新的socket的source了。
建立連接之后的無數(shù)個新的socket,都是獨立的,它們處理讀寫連接斷開的邏輯就和客戶端socket完全一樣了。
而我們監(jiān)聽本機端口的那個socket始終只有一個,這個用來監(jiān)聽觸發(fā)socket連接,并返回創(chuàng)建我們這無數(shù)個新的socket實例。
作為服務(wù)端的Accept流程就這么結(jié)束了,因為篇幅原因,所以盡量精簡了一些細(xì)節(jié)的處理,不過這些處理在Connect中也是反復(fù)出現(xiàn)的,所以基本無傷大雅。如果大家會感到困惑,建議下載github中的源碼注釋,對照著再看一遍,相信會有幫助的。
接著我們來講講Unix Domin Socket建立本地進程通信流程:
基本上這個流程,比上述任何流程還要簡單,簡單的到即使不簡化代碼,也沒多少行(當(dāng)然這是建立在客戶端Connect流程已經(jīng)實現(xiàn)了很多公用方法的基礎(chǔ)上)。
接著進入正題,我們來看看它發(fā)起連接的方法:
//連接本機的url上,IPC,進程間通信
- (BOOL)connectToUrl:(NSURL *)url withTimeout:(NSTimeInterval)timeout error:(NSError **)errPtr;
{
LogTrace();
__block BOOL result = NO;
__block NSError *err = nil;
dispatch_block_t block = ^{ @autoreleasepool {
//判斷長度
if ([url.path length] == 0)
{
NSString *msg = @"Invalid unix domain socket url.";
err = [self badParamError:msg];
return_from_block;
}
// Run through standard pre-connect checks
//前置的檢查
if (![self preConnectWithUrl:url error:&err])
{
return_from_block;
}
// We've made it past all the checks.
// It's time to start the connection process.
flags |= kSocketStarted;
// Start the normal connection process
NSError *connectError = nil;
//調(diào)用另一個方法去連接
if (![self connectWithAddressUN:connectInterfaceUN error:&connectError])
{
[self closeWithError:connectError];
return_from_block;
}
[self startConnectTimeout:timeout];
result = YES;
}};
//在socketQueue中同步執(zhí)行
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
block();
else
dispatch_sync(socketQueue, block);
if (result == NO)
{
if (errPtr)
*errPtr = err;
}
return result;
}
連接方法非常簡單,就只是做了一些錯誤的處理,然后調(diào)用了其他的方法,包括一個前置檢查,這檢查中會去判斷各種參數(shù)是否正常,如果正常會返回YES,并且把url轉(zhuǎn)換成Uinix domin socket地址的結(jié)構(gòu)體,賦值給我們的屬性connectInterfaceUN。
接著調(diào)用了connectWithAddressUN方法去發(fā)起連接。
我們接著來看看這個方法:
//連接Unix域服務(wù)器
- (BOOL)connectWithAddressUN:(NSData *)address error:(NSError **)errPtr
{
LogTrace();
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
// Create the socket
int socketFD;
LogVerbose(@"Creating unix domain socket");
//創(chuàng)建本機socket
socketUN = socket(AF_UNIX, SOCK_STREAM, 0);
socketFD = socketUN;
if (socketFD == SOCKET_NULL)
{
if (errPtr)
*errPtr = [self errnoErrorWithReason:@"Error in socket() function"];
return NO;
}
// Bind the socket to the desired interface (if needed)
LogVerbose(@"Binding socket...");
int reuseOn = 1;
//設(shè)置可復(fù)用
setsockopt(socketFD, SOL_SOCKET, SO_REUSEADDR, &reuseOn, sizeof(reuseOn));
// Prevent SIGPIPE signals
int nosigpipe = 1;
//進程終止錯誤信號禁止
setsockopt(socketFD, SOL_SOCKET, SO_NOSIGPIPE, &nosigpipe, sizeof(nosigpipe));
// Start the connection process in a background queue
int aStateIndex = stateIndex;
dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_async(globalConcurrentQueue, ^{
const struct sockaddr *addr = (const struct sockaddr *)[address bytes];
//并行隊列調(diào)用連接
int result = connect(socketFD, addr, addr->sa_len);
if (result == 0)
{
dispatch_async(socketQueue, ^{ @autoreleasepool {
//連接成功的一些狀態(tài)初始化
[self didConnect:aStateIndex];
}});
}
else
{
// 失敗的處理
perror("connect");
NSError *error = [self errnoErrorWithReason:@"Error in connect() function"];
dispatch_async(socketQueue, ^{ @autoreleasepool {
[self didNotConnect:aStateIndex error:error];
}});
}
});
LogVerbose(@"Connecting...");
return YES;
}
主要部分基本和客戶端連接相同,并且簡化了很多,調(diào)用了這一行完成了連接:
int result = connect(socketFD, addr, addr->sa_len);
同樣也和客戶端一樣,在連接成功之后去調(diào)用下面這個方法完成了一些資源的初始化:
[self didConnect:aStateIndex];
基本上連接就這么兩個方法了(當(dāng)然我們省略了一些細(xì)節(jié)),看完客戶端的連接之后,到這就變得非常簡單了。
接著我們來看看uinix domin socket作為服務(wù)端Accept。
這個Accpet,基本和我們普通Socket服務(wù)端的Accept相同。
//接受一個Url,uniex domin socket 做為服務(wù)端
- (BOOL)acceptOnUrl:(NSURL *)url error:(NSError **)errPtr;
{
LogTrace();
__block BOOL result = NO;
__block NSError *err = nil;
//基本和正常的socket accept一模一樣
// CreateSocket Block
// This block will be invoked within the dispatch block below.
//生成一個創(chuàng)建socket的block,創(chuàng)建、綁定、監(jiān)聽
int(^createSocket)(int, NSData*) = ^int (int domain, NSData *interfaceAddr) {
//creat socket
...
// Set socket options
...
// Bind socket
...
// Listen
...
};
// Create dispatch block and run on socketQueue
//錯誤判斷
dispatch_block_t block = ^{ @autoreleasepool {
//錯誤判斷
...
//判斷是否有這個url路徑是否正確
...
//調(diào)用上面的Block創(chuàng)建socket,并且綁定監(jiān)聽。
...
//創(chuàng)建接受連接的source
acceptUNSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socketUN, 0, socketQueue);
int socketFD = socketUN;
dispatch_source_t acceptSource = acceptUNSource;
//事件句柄,和accpept一樣
dispatch_source_set_event_handler(acceptUNSource, ^{ @autoreleasepool {
//循環(huán)去接受所有的每一個連接
...
}});
//取消句柄
dispatch_source_set_cancel_handler(acceptUNSource, ^{
//關(guān)閉socket
close(socketFD);
});
LogVerbose(@"dispatch_resume(accept4Source)");
dispatch_resume(acceptUNSource);
flags |= kSocketStarted;
result = YES;
}};
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
block();
else
dispatch_sync(socketQueue, block);
//填充錯誤
if (result == NO)
{
LogInfo(@"Error in accept: %@", err);
if (errPtr)
*errPtr = err;
}
return result;
}
因為代碼基本雷同,所以我們省略了大部分代碼,大家可以參照著之前的講解或者源碼去理解。這里和普通服務(wù)端socket唯一的區(qū)別就是,這里服務(wù)端綁定的地址是unix domin socket類型的地址,它是一個結(jié)構(gòu)體,里面包含的是我們進行進程通信的紐帶-一個本機文件路徑。
所以這里服務(wù)端簡單來說就是綁定的這個文件路徑,當(dāng)這個文件路徑有數(shù)據(jù)可讀(即有客戶端連接到達(dá))的時候,會觸發(fā)初始化的source事件句柄,我們會去循環(huán)的接受所有的連接,并且新生成一個socket實例,這里和普通的socket完全一樣。
就這樣我們所有的連接方式已經(jīng)講完了,后面這兩種方式,為了節(jié)省篇幅,確實講的比較粗略,但是核心的部分都有提到。
另外如果你有理解客戶端的Connect流程,那么理解起來應(yīng)該沒有什么問題,這兩個流程比前者可簡化太多了。
寫在結(jié)尾:
這個框架的Connect篇到此為止了,其實想一篇結(jié)束一塊內(nèi)容的,但是代碼量實在太多,如果講的太粗略,大家也很難去學(xué)習(xí)到真正的內(nèi)容。但是樓主也不想寫的太長,太瑣碎,相信大家都很難看下去,不過萬幸能兩篇內(nèi)總結(jié)完。
之后的內(nèi)容,等過完年會繼續(xù)寫。包括read篇和write篇等等,希望這個系列能讓大家能對Socket編程有個新的認(rèn)識和理解。以后也可以自己上手Socket,運用于項目中去。
轉(zhuǎn)眼過年了,回想這一年,許多地方都做的差強人意,希望2017有個更好的愿景吧。
紙上學(xué)來終覺淺,絕知此事要躬行。
