為什么要線程同步
- 我們在使用多線程的時候,可能會遇到多個線程同時訪問同一個數(shù)據(jù)導(dǎo)致數(shù)據(jù)錯亂和數(shù)據(jù)不安全的問題,所以就需要使用線程同步
常用的線程同步方法
- 常用線程同步的方法就是加鎖,以保證同一時間只有一個線程在訪問該數(shù)據(jù)
鎖的分類
- 自旋鎖 等待鎖的線程會處于忙等(busy-wait)狀態(tài),一直占用著CPU資源,通常在下面情況下使用
- 預(yù)計線程等待鎖的時間很短
- 加鎖的代碼(臨界區(qū))經(jīng)常被調(diào)用,但競爭情況很少發(fā)生
- CPU資源不緊張
- 多核處理器
- 互斥鎖 等待鎖的線程會處于休眠狀態(tài)
- 預(yù)計線程等待鎖的時間較長
- 臨界區(qū)代碼復(fù)雜或者循環(huán)量大
- 臨界區(qū)有IO操作
- 單核處理器
- 臨界區(qū)競爭非常激烈
- 常用的鎖
-
OSSpinLock(自旋鎖),已經(jīng)棄用了
會出現(xiàn)優(yōu)先級翻轉(zhuǎn)的情況.比如線程1優(yōu)先級比較高,線程2優(yōu)先級比較低,然后在某一時刻是線程2先獲取到鎖,所以先是線程2加鎖,這時候,線程1就在while(目標(biāo)鎖還未釋放),這個狀態(tài),但因為線程1優(yōu)先級比較高,所以系統(tǒng)分配的時間比較多,有可能會沒有分配時間給線程2執(zhí)行后續(xù)的操作(需要做的任務(wù)和解鎖)了,這時候就會造成死鎖。
但如果是線程休眠的情況,在優(yōu)先級高的線程休眠后,優(yōu)先級比較低的線程會給系統(tǒng)調(diào)用,所以不會有死鎖的情況 os_unfair_lock被用來取代OSSpinLock,并且從iOS 10開始支持os_unfair_lock。等待鎖的線程會處于休眠狀態(tài)(不同于OSSpinLock的忙等狀態(tài)),不會占用CPU資源。因此,使用os_unfair_lock不會導(dǎo)致優(yōu)先級反轉(zhuǎn)的問題。
pthread_mutex
mutex叫做”互斥鎖”,等待鎖的線程會處于休眠狀態(tài)
pthread_mutex – 遞歸鎖
pthread_mutex – 條件NSLock、NSRecursiveLock
NSLock是對mutex普通鎖的封裝
NSRecursiveLock也是對mutex遞歸鎖的封裝,API跟NSLock基本一致NSCondition
NSCondition是對mutex和cond的封裝NSConditionLock
NSConditionLock是對NSCondition的進(jìn)一步封裝,可以設(shè)置具體的條件值dispatch_semaphore
semaphore叫做”信號量”
信號量的初始值,可以用來控制線程并發(fā)訪問的最大數(shù)量
信號量的初始值為1,代表同時只允許1條線程訪問資源,保證線程同步@synchronized
@synchronized是對mutex遞歸鎖的封裝
@synchronized(obj)內(nèi)部會生成obj對應(yīng)的遞歸鎖,然后進(jìn)行加鎖、解鎖操作
-
-
各種鎖的性能對比
鎖的性能
@synchronized
- @synchronized的研究,我們可以從2個方向入手
-
查看匯編
image.png -
clang
image.png
-
- 可以發(fā)現(xiàn)@synchronized的底層實現(xiàn)是通過
objc_sync_enter和objc_sync_exit,添加符號斷點發(fā)現(xiàn)objc_sync_enter是在libobjc.dylib中實現(xiàn)的
image.png
objc_sync_enter
- objc_sync_enter的代碼實現(xiàn)
int objc_sync_enter(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, ACQUIRE);
ASSERT(data);
data->mutex.lock();
} else {
// @synchronized(nil) does nothing
if (DebugNilSync) {
_objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
}
objc_sync_nil();
}
return result;
}
- 判斷obj是否為空, // @synchronized(nil) does nothing從注釋看到obj為空的時候沒有做任何事情,如果不為空,通過
id2data(obj, ACQUIRE)獲取到一個SyncData類型的data然后調(diào)用data屬性的lock - SyncData的結(jié)構(gòu)
typedef struct alignas(CacheLineSize) SyncData {
struct SyncData* nextData;
DisguisedPtr<objc_object> object;
int32_t threadCount; // number of THREADS using this block
recursive_mutex_t mutex;
} SyncData;
-
id2data的實現(xiàn)
static SyncData* id2data(id object, enum usage why)
{
spinlock_t *lockp = &LOCK_FOR_OBJ(object);
SyncData **listp = &LIST_FOR_OBJ(object);
SyncData* result = NULL;
#if SUPPORT_DIRECT_THREAD_KEYS
// Check per-thread single-entry fast cache for matching object
bool fastCacheOccupied = NO;
SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY);
if (data) {
fastCacheOccupied = YES;
if (data->object == object) {
// Found a match in fast cache.
uintptr_t lockCount;
result = data;
lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY);
if (result->threadCount <= 0 || lockCount <= 0) {
_objc_fatal("id2data fastcache is buggy");
}
switch(why) {
case ACQUIRE: {
lockCount++;
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
break;
}
case RELEASE:
lockCount--;
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
if (lockCount == 0) {
// remove from fast cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, NULL);
// atomic because may collide with concurrent ACQUIRE
OSAtomicDecrement32Barrier(&result->threadCount);
}
break;
case CHECK:
// do nothing
break;
}
return result;
}
}
#endif
// Check per-thread cache of already-owned locks for matching object
SyncCache *cache = fetch_cache(NO);
if (cache) {
unsigned int i;
for (i = 0; i < cache->used; i++) {
SyncCacheItem *item = &cache->list[i];
if (item->data->object != object) continue;
// Found a match.
result = item->data;
if (result->threadCount <= 0 || item->lockCount <= 0) {
_objc_fatal("id2data cache is buggy");
}
switch(why) {
case ACQUIRE:
item->lockCount++;
break;
case RELEASE:
item->lockCount--;
if (item->lockCount == 0) {
// remove from per-thread cache
cache->list[i] = cache->list[--cache->used];
// atomic because may collide with concurrent ACQUIRE
OSAtomicDecrement32Barrier(&result->threadCount);
}
break;
case CHECK:
// do nothing
break;
}
return result;
}
}
// Thread cache didn't find anything.
// Walk in-use list looking for matching object
// Spinlock prevents multiple threads from creating multiple
// locks for the same new object.
// We could keep the nodes in some hash table if we find that there are
// more than 20 or so distinct locks active, but we don't do that now.
lockp->lock();
{
SyncData* p;
SyncData* firstUnused = NULL;
for (p = *listp; p != NULL; p = p->nextData) {
if ( p->object == object ) {
result = p;
// atomic because may collide with concurrent RELEASE
OSAtomicIncrement32Barrier(&result->threadCount);
goto done;
}
if ( (firstUnused == NULL) && (p->threadCount == 0) )
firstUnused = p;
}
// no SyncData currently associated with object
if ( (why == RELEASE) || (why == CHECK) )
goto done;
// an unused one was found, use it
if ( firstUnused != NULL ) {
result = firstUnused;
result->object = (objc_object *)object;
result->threadCount = 1;
goto done;
}
}
// Allocate a new SyncData and add to list.
// XXX allocating memory with a global lock held is bad practice,
// might be worth releasing the lock, allocating, and searching again.
// But since we never free these guys we won't be stuck in allocation very often.
posix_memalign((void **)&result, alignof(SyncData), sizeof(SyncData));
result->object = (objc_object *)object;
result->threadCount = 1;
new (&result->mutex) recursive_mutex_t(fork_unsafe_lock);
result->nextData = *listp;
*listp = result;
done:
lockp->unlock();
if (result) {
// Only new ACQUIRE should get here.
// All RELEASE and CHECK and recursive ACQUIRE are
// handled by the per-thread caches above.
if (why == RELEASE) {
// Probably some thread is incorrectly exiting
// while the object is held by another thread.
return nil;
}
if (why != ACQUIRE) _objc_fatal("id2data is buggy");
if (result->object != object) _objc_fatal("id2data is buggy");
#if SUPPORT_DIRECT_THREAD_KEYS
if (!fastCacheOccupied) {
// Save in fast thread cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, result);
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1);
} else
#endif
{
// Save in thread cache
if (!cache) cache = fetch_cache(YES);
cache->list[cache->used].data = result;
cache->list[cache->used].lockCount = 1;
cache->used++;
}
}
return result;
}
- 通過代碼調(diào)試發(fā)現(xiàn)第一次進(jìn)來的時候會執(zhí)行下面流程
posix_memalign((void **)&result, alignof(SyncData), sizeof(SyncData));
result->object = (objc_object *)object;
result->threadCount = 1;
new (&result->mutex) recursive_mutex_t(fork_unsafe_lock);
result->nextData = *listp;
*listp = result;
if (result) {
// Only new ACQUIRE should get here.
// All RELEASE and CHECK and recursive ACQUIRE are
// handled by the per-thread caches above.
if (why == RELEASE) {
// Probably some thread is incorrectly exiting
// while the object is held by another thread.
return nil;
}
if (why != ACQUIRE) _objc_fatal("id2data is buggy");
if (result->object != object) _objc_fatal("id2data is buggy");
#if SUPPORT_DIRECT_THREAD_KEYS
if (!fastCacheOccupied) {
// Save in fast thread cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, result);
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1);
}
return result;
- 第二次之后進(jìn)來會執(zhí)行下面流程
{
SyncData* p;
SyncData* firstUnused = NULL;
for (p = *listp; p != NULL; p = p->nextData) {
if ( p->object == object ) {
result = p;
// atomic because may collide with concurrent RELEASE
OSAtomicIncrement32Barrier(&result->threadCount);
goto done;// 從這里跳轉(zhuǎn)到done
}
}
if (!fastCacheOccupied) {
// Save in fast thread cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, result);
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1);
} else
#endif
return result;
objc_sync_exit
-
objc_sync_exit和objc_sync_enter差不多,只是在id2data傳遞的參數(shù)不一樣,所以在id2data里面的執(zhí)行流程也不一樣
int objc_sync_exit(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, RELEASE);
if (!data) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
} else {
bool okay = data->mutex.tryUnlock();
if (!okay) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
}
}
} else {
// @synchronized(nil) does nothing
}
return result;
}
-
objc_sync_exit在id2data的執(zhí)行流程
bool fastCacheOccupied = NO;
SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY);
if (data) {
fastCacheOccupied = YES;
if (data->object == object) {
// Found a match in fast cache.
uintptr_t lockCount;
result = data;
lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY);
if (result->threadCount <= 0 || lockCount <= 0) {
_objc_fatal("id2data fastcache is buggy");
}
switch(why) {
case ACQUIRE: {
lockCount++;
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
break;
}
case RELEASE:
lockCount--;
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
if (lockCount == 0) {
// remove from fast cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, NULL);
// atomic because may collide with concurrent ACQUIRE
OSAtomicDecrement32Barrier(&result->threadCount);
}
break;
case CHECK:
// do nothing
break;
}
return result;
}
}
- 個人理解就是
@synchronized會通過全局哈希表**listp根據(jù)不同的對象存儲SyncData,第二次進(jìn)來之后就會返回對應(yīng)的SyncData,再對threadCount+1,因為@synchronized實現(xiàn)的是一個同步鎖,所以任務(wù)執(zhí)行完畢之后會通過tls_get_direct獲取SyncData的lockCount進(jìn)行--,當(dāng)結(jié)果為0的時候就通過tls_set_direct將該key設(shè)置為NULL回收控件
NSLock、NSRecursiveLock
- NSLock的底層其實就是對pthread_ mutex的封裝,通過lldb調(diào)試發(fā)現(xiàn)NSLock是基于Foundation實現(xiàn)的,而Foundation是不開源的,我們可以通過查看swift的實現(xiàn)
// 初始化方法
public override init() {
#if os(Windows)
InitializeSRWLock(mutex)
InitializeConditionVariable(timeoutCond)
InitializeSRWLock(timeoutMutex)
#else
pthread_mutex_init(mutex, nil)
#if os(macOS) || os(iOS)
pthread_cond_init(timeoutCond, nil)
pthread_mutex_init(timeoutMutex, nil)
#endif
#endif
}
open func lock() {
#if os(Windows)
AcquireSRWLockExclusive(mutex)
#else
pthread_mutex_lock(mutex)
#endif
}
open func unlock() {
#if os(Windows)
ReleaseSRWLockExclusive(mutex)
AcquireSRWLockExclusive(timeoutMutex)
WakeAllConditionVariable(timeoutCond)
ReleaseSRWLockExclusive(timeoutMutex)
#else
pthread_mutex_unlock(mutex)
#if os(macOS) || os(iOS)
// Wakeup any threads waiting in lock(before:)
pthread_mutex_lock(timeoutMutex)
pthread_cond_broadcast(timeoutCond)
pthread_mutex_unlock(timeoutMutex)
#endif
#endif
}
- NSRecursiveLock和NSLock差不多,只不過是在基礎(chǔ)鎖的基礎(chǔ)上加了遞歸屬性
public override init() {
super.init()
#if os(Windows)
InitializeCriticalSection(mutex)
InitializeConditionVariable(timeoutCond)
InitializeSRWLock(timeoutMutex)
#else
#if CYGWIN
var attrib : pthread_mutexattr_t? = nil
#else
var attrib = pthread_mutexattr_t()
#endif
withUnsafeMutablePointer(to: &attrib) { attrs in
pthread_mutexattr_init(attrs)
pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE))
pthread_mutex_init(mutex, attrs)
}
#if os(macOS) || os(iOS)
pthread_cond_init(timeoutCond, nil)
pthread_mutex_init(timeoutMutex, nil)
#endif
#endif
}
NSConditionLock、NSCondition
- NSCondition
public override init() {
#if os(Windows)
InitializeSRWLock(mutex)
InitializeConditionVariable(cond)
#else
pthread_mutex_init(mutex, nil)
pthread_cond_init(cond, nil)
#endif
}
- NSConditionLock的初始化
public convenience override init() {
self.init(condition: 0)
}
public init(condition: Int) {
_value = condition
}
- NSConditionLock不同于NSLock的是它的lock方法進(jìn)行了處理
open func lock() {
let _ = lock(before: Date.distantFuture)
}
open func lock(whenCondition condition: Int) {
let _ = lock(whenCondition: condition, before: Date.distantFuture)
}
open func lock(whenCondition condition: Int, before limit: Date) -> Bool {
_cond.lock()
while _thread != nil || _value != condition {
if !_cond.wait(until: limit) {
_cond.unlock()
return false
}
}
#if os(Windows)
_thread = GetCurrentThread()
#else
_thread = pthread_self()
#endif
_cond.unlock()
return true
}
- NSConditionLock調(diào)用lock的時候會判斷下條件,如果條件不成立就會一直等待,直到date到了



