
OSSpinLock
OSSpinLock自旋鎖,因?yàn)樽孕i一直busy-waiting忙等待占用cpu,且不會(huì)像互斥鎖、信號(hào)量一樣會(huì)導(dǎo)致線程休眠,進(jìn)而引發(fā)上下文切換,因此短時(shí)間持有自旋鎖性能最高;但適合多線程處理器及持有時(shí)間較短(對(duì)于單線程處理器會(huì)降低cpu效率),并且存在優(yōu)先級(jí)反轉(zhuǎn)問題(不再安全的 OSSpinLock,對(duì)于ios線程存在多個(gè)優(yōu)先級(jí)且高優(yōu)先級(jí)不會(huì)被低優(yōu)先級(jí)搶占,導(dǎo)致低優(yōu)先級(jí)占用自旋鎖后,高優(yōu)先級(jí)被執(zhí)行獲取自旋鎖,導(dǎo)致死鎖等待,見下)
如果一個(gè)低優(yōu)先級(jí)的線程獲得鎖并訪問共享資源,這時(shí)一個(gè)高優(yōu)先級(jí)的線程也嘗試獲得這個(gè)鎖,它會(huì)處于 spin lock 的忙等狀態(tài)從而占用大量 CPU。此時(shí)低優(yōu)先級(jí)線程無法與高優(yōu)先級(jí)線程爭(zhēng)奪 CPU 時(shí)間,從而導(dǎo)致任務(wù)遲遲完不成、無法釋放 lock。這并不只是理論上的問題,libobjc 已經(jīng)遇到了很多次這個(gè)問題了,于是蘋果的工程師停用了 OSSpinLock。
對(duì)于ios跟蹤OSSpinLock匯編,其實(shí)現(xiàn)就是while循環(huán),具體如下:
0x104ba98b1 <+12>: cmpl $-0x1, %eax
0x104ba98b4 <+15>: jne 0x104ba98d1 ; <+44>
0x104ba98b6 <+17>: testl %ecx, %ecx
0x104ba98b8 <+19>: je 0x104bab0f9 ; _OSSpinLockLockYield
0x104ba98be <+25>: pause
0x104ba98c0 <+27>: incl %ecx
0x104ba98c2 <+29>: movl (%rdi), %eax
0x104ba98c4 <+31>: testl %eax, %eax
0x104ba98c6 <+33>: jne 0x104ba98b1 ; <+12>
0x104ba98c8 <+35>: xorl %eax, %eax
0x104ba98ca <+37>: lock
0x104ba98cb <+38>: cmpxchgl %edx, (%rdi)
0x104ba98ce <+41>: jne 0x104ba98b1 ; <+12>
對(duì)于linux實(shí)現(xiàn)(linux2.6內(nèi)核,)如下:
#define spin_lock(lock) _spin_lock(lock) //lock數(shù)據(jù)類型為*spinlock_t,雖然沒有用到-_-。
#define _spin_lock(lock) __LOCK(lock)
#define __LOCK(lock) \
do { preempt_disable(); __acquire(lock); (void)(lock); } while (0)
linux實(shí)現(xiàn)是通過關(guān)搶占來避免其他任務(wù)搶占來保證原子性(也存在關(guān)閉中斷搶占),此時(shí)若持有自旋鎖的線程休眠,引發(fā)上下文切換,另一個(gè)線程也獲取該自旋鎖而導(dǎo)致死鎖(因關(guān)搶占除非主動(dòng)調(diào)度,因此無法釋放自旋鎖);
spinlock與linux內(nèi)核調(diào)度的關(guān)系
使用如下:
OSSpinLock lock = OS_SPINLOCK_INIT;//初始化
OSSpinLockLock(&lock);//上鎖
OSSpinLockTry(&lock);//嘗試上鎖
OSSpinLockUnlock(&lock);//解鎖
ios10.0+ mac10.12+已廢棄OSSpinLock,替換為os_unfair_lock來解決優(yōu)先級(jí)反轉(zhuǎn)問題;
源碼地址:
SpinLocksLoadStoreEx.c
os_unfair_lock
os_unfair_lock用于取代不安全的OSSpinLock,從iOS10 macos10.12開始才支持 從底層調(diào)用看,等待os_unfair_lock鎖的線程會(huì)處于休眠狀態(tài),并非忙等,需要導(dǎo)入頭文件#import <os/lock.h>
//初始化
os_unfair_lock lock = OS_UNFAIR_LOCK_INIT;
//加鎖
os_unfair_lock_lock(&lock);
//解鎖
os_unfair_lock_unlock(&lock);
跟蹤匯編實(shí)現(xiàn)如下:
libsystem_kernel.dylib`__ulock_wait:
0x10701f360 <+0>: movl $0x2000203, %eax ; imm = 0x2000203
0x10701f365 <+5>: movq %rcx, %r10
0x10701f368 <+8>: syscall
0x10701f36a <+10>: jae 0x10701f374 ; <+20>
0x10701f36c <+12>: movq %rax, %rdi
0x10701f36f <+15>: jmp 0x10701ce67 ; cerror_nocancel
0x10701f374 <+20>: retq
0x10701f375 <+21>: nop
0x10701f376 <+22>: nop
0x10701f377 <+23>: nop
其中syscall執(zhí)行后線程休眠,執(zhí)行路徑:os_unfair_lock_lock -> _os_unfair_lock_lock_slow -> __ulock_wait
pthread_mutex_t pthread_cond_t
互斥鎖及條件變量為POSIX接口,見《unix進(jìn)程間通信.md》
NSLock NSRecursiveLock
NSLock是對(duì)pthread_mutex普通鎖的封裝。pthread_mutex_init(mutex, NULL);默認(rèn)屬性為PTHREAD_MUTEX_NORMAL
NSLock 遵循 NSLocking 協(xié)議,使用方法與pthread_mutex類似,如下:
@protocol NSLocking
- (void)lock;
- (void)unlock;
@end
@interface NSLock : NSObject <NSLocking>
- (BOOL)tryLock;//tryLock 是嘗試加鎖,如果失敗的話返回 NO
- (BOOL)lockBeforeDate:(NSDate *)limit;//是在指定Date之前嘗試加鎖,如果在指定時(shí)間之前都不能加鎖,則返回NO
@end
具體NSLock實(shí)現(xiàn)可參考GNUSetup, 搜索NSLock.m,找到 initialize 方法
GNUstep是GNU計(jì)劃的項(xiàng)目之一,它將Cocoa的OC庫(kù)重新開源實(shí)現(xiàn)了一遍,雖然GNUstep不是蘋果官方源碼,但還是具有一定的參考價(jià)值
(void) initialize
{
static BOOL beenHere = NO;
if (beenHere == NO)
{
beenHere = YES;
/* Initialise attributes for the different types of mutex.
* We do it once, since attributes can be shared between multiple
* mutexes.
* If we had a pthread_mutexattr_t instance for each mutex, we would
* either have to store it as an ivar of our NSLock (or similar), or
* we would potentially leak instances as we couldn't destroy them
* when destroying the NSLock. I don't know if any implementation
* of pthreads actually allocates memory when you call the
* pthread_mutexattr_init function, but they are allowed to do so
* (and deallocate the memory in pthread_mutexattr_destroy).
*/
pthread_mutexattr_init(&attr_normal);
pthread_mutexattr_settype(&attr_normal, PTHREAD_MUTEX_NORMAL);
pthread_mutexattr_init(&attr_reporting);
pthread_mutexattr_settype(&attr_reporting, PTHREAD_MUTEX_ERRORCHECK);
pthread_mutexattr_init(&attr_recursive);
pthread_mutexattr_settype(&attr_recursive, PTHREAD_MUTEX_RECURSIVE);
/* To emulate OSX behavior, we need to be able both to detect deadlocks
* (so we can log them), and also hang the thread when one occurs.
* the simple way to do that is to set up a locked mutex we can
* force a deadlock on.
*/
pthread_mutex_init(&deadlock, &attr_normal);
pthread_mutex_lock(&deadlock);
}
}
NSRecursiveLock是對(duì)pthread_mutex遞歸屬性下的封裝,API與NSLock一致;
NSCondition
NSCondtion是對(duì)pthread_mutex和pthread_cond的封裝,具體類源碼如下:
+ (void) initialize
{
[NSLock class]; // Ensure mutex attributes are set up.
}
- (id) init
{
if (nil != (self = [super init]))
{
if (0 != pthread_cond_init(&_condition, NULL))
{
DESTROY(self);
}
else if (0 != pthread_mutex_init(&_mutex, &attr_reporting))
{
pthread_cond_destroy(&_condition);
DESTROY(self);
}
}
return self;
}
- (void) signal
{
pthread_cond_signal(&_condition);
}
- (void) wait
{
pthread_cond_wait(&_condition, &_mutex);
}
- (void) broadcast
{
pthread_cond_broadcast(&_conditon);
}
具體使用如下:
@interface NSCondition : NSObject <NSLocking> {
- (void)wait;//等待條件
- (BOOL)waitUntilDate:(NSDate *)limit;//超時(shí)等待
- (void)signal;//發(fā)送信號(hào)
- (void)broadcast;//廣播信號(hào)
@end
需要注意NSCondition遵循NSLock協(xié)議,同pthread_cond_t,需要添加條件變量時(shí)加鎖,避免信號(hào)丟失及條件變量無法獲取鎖導(dǎo)致一直阻塞線程等待;
NSCondtionLock
NSConditionLock是對(duì)NScondition的進(jìn)一步封裝,具體如下:
@interface NSConditionLock : NSObject <NSLocking> {
- (instancetype)initWithCondition:(NSInteger)condition;//初始化Condition,并且設(shè)置狀態(tài)值
@property (readonly) NSInteger condition;
- (void)lockWhenCondition:(NSInteger)condition;//當(dāng)狀態(tài)值為condition且收到等待的條件變量信號(hào)時(shí)返回,否則一直阻塞等待條件變量信號(hào)
- (BOOL)tryLock;
- (BOOL)tryLockWhenCondition:(NSInteger)condition;
- (void)unlockWithCondition:(NSInteger)condition;//修改condition條件判斷值并廣播條件變量信號(hào)
- (BOOL)lockBeforeDate:(NSDate *)limit;
- (BOOL)lockWhenCondition:(NSInteger)condition beforeDate:(NSDate *)limit;
@property (nullable, copy) NSString *name;
@end
NSCondition同pthread_cond一樣,存在pthread_cond_wait被虛假喚醒的情況,一般都會(huì)while(condition != true)循環(huán)判斷是否條件為真,否則一直pthread_cond_wait阻塞;NSConditionLock就是提供了condition條件判斷的邏輯;
具體類實(shí)現(xiàn)如下:
- (id) init
{
return [self initWithCondition: 0];
}
- (id) initWithCondition: (NSInteger)value
{
if (nil != (self = [super init]))
{
if (nil == (_condition = [NSCondition new]))
{
DESTROY(self);
}
else
{
_condition_value = value;
}
}
return self;
}
- (void) lockWhenCondition: (NSInteger)value
{
[_condition lock];
while (value != _condition_value)
{
[_condition wait];
}
}
//條件判斷值改變并廣播信號(hào)解鎖
- (void) unlockWithCondition: (NSInteger)value
{
_condition_value = value;
[_condition broadcast];
[_condition unlock];
}
synchronized
@synchronized使用如下:
@synchronized(obj) {
//code
}
上述代碼轉(zhuǎn)化為c++,簡(jiǎn)化如下:
try {
objc_sync_enter(obj);
//code
} finally {
objc_sync_exit(obj);
}
對(duì)于objc_sync_enter及objc_sync_exit源碼如下:
// Begin synchronizing on 'obj'.
// Allocates recursive mutex associated with 'obj' if needed.
// Returns OBJC_SYNC_SUCCESS once lock is acquired.
int objc_sync_enter(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, ACQUIRE);
assert(data);
data->mutex.lock();
} else {
// @synchronized(nil) does nothing
if (DebugNilSync) {
_objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
}
objc_sync_nil();
}
return result;
}
// End synchronizing on 'obj'.
// Returns OBJC_SYNC_SUCCESS or OBJC_SYNC_NOT_OWNING_THREAD_ERROR
int objc_sync_exit(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, RELEASE);
if (!data) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
} else {
bool okay = data->mutex.tryUnlock();
if (!okay) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
}
}
} else {
// @synchronized(nil) does nothing
}
return result;
}
其實(shí)現(xiàn)使用了遞歸鎖來實(shí)現(xiàn)加鎖避免多次使用@synchronized導(dǎo)致死鎖,并使用緩存技術(shù)通過傳入的objc地址來查找對(duì)應(yīng)的鎖,其中緩存是使用了hash map來緩存;若傳入nil則鎖不起作用;若@synchronized中的block修改了objc地址也不影響鎖結(jié)構(gòu);并使用try finally來捕獲異常,避免鎖未釋放;
使用注意事項(xiàng)
慎用@synchronized(self)
使用self對(duì)于外部可以修改使用的對(duì)象地址,容易外部混合使用@synchronized及其他鎖導(dǎo)致死鎖問題,如:
//class A
@synchronized (self) {
[_sharedLock lock];
NSLog(@"code in class A");
[_sharedLock unlock];
}
//class B
[_sharedLock lock];
@synchronized (objectA) {
NSLog(@"code in class B");
}
[_sharedLock unlock];
對(duì)于類內(nèi)部數(shù)據(jù)同步,正確的做法是傳入一個(gè)類內(nèi)部維護(hù)的NSObject對(duì)象,而且這個(gè)對(duì)象是對(duì)外不可見的;
減小粗粒度
對(duì)于不同的數(shù)據(jù)鎖同步使用不同的objc對(duì)象來控制,避免無關(guān)的對(duì)象鎖,且block內(nèi)部盡量避免函數(shù)調(diào)用;
正確使用多線程同步鎖@synchronized
dispatch_queue(DISPATCH_QUEUE_SERIAL) GCD串行隊(duì)列
dispatch_queue_t queue = dispatch_queue_create("top.istones.moneyQueue", DISPATCH_QUEUE_SERIAL);
dispatch_sync(self.moneyQueue, ^{
// 任務(wù)
});
任務(wù)順序串行同步執(zhí)行;
dispatch_semaphore
dispatch_semaphore信號(hào)量,同unix sem概念相同,具體使用如下:
//表示最多開啟5個(gè)線程
dispatch_semaphore_create(5);
// 如果信號(hào)量的值 > 0,就讓信號(hào)量的值減1,然后繼續(xù)往下執(zhí)行代碼
// 如果信號(hào)量的值 <= 0,就會(huì)休眠等待,直到信號(hào)量的值變成>0,就讓信號(hào)量的值減1,然后繼續(xù)往下執(zhí)行代碼
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout);
// 讓信號(hào)量的值+1
dispatch_semaphore_signal(self.semaphore);
實(shí)際封裝的
系統(tǒng)庫(kù)semaphone_xxx,使用的是基于內(nèi)存的信號(hào)量形式;
但是在資源可用的情況下,使用GCD semaphore將會(huì)消耗較少的時(shí)間,因?yàn)樵谶@種情況下GCD不會(huì)調(diào)用內(nèi)核,只有在資源不可用的時(shí)候才會(huì)調(diào)用內(nèi)核,并且系統(tǒng)需要停在你的線程里,直到線程發(fā)出可用信號(hào)。
pthread_rwlock 讀寫鎖
pthread_rwlock經(jīng)常用于文件等數(shù)據(jù)的讀寫操作,需要導(dǎo)入頭文件#import <pthread.h>
atomic屬性
atomic用于保證屬性setter、getter的原子性操作,相當(dāng)于在getter和setter內(nèi)部加了線程同步的鎖(使用了自旋鎖);
可以參考源碼objc4的objc-accessors.mm;
它并不能保證使用屬性的過程是線程安全的(eg.一個(gè)屬性array,atomic的話只能保證在外面set和get的時(shí)候線程安全,但是不能保證array addObject、removeObject線程安全);
atomic屬性保證的屬性的值修改(包括數(shù)值類型及指針類型)線程安全,但不保證指針指向的內(nèi)存的安全及多部操作的原子性,如上array;見iOS多線程到底不安全在哪里?
image.png
對(duì)于property分為三類內(nèi)存模型:指針、指針指向的內(nèi)存區(qū)域及數(shù)值類型;對(duì)于指針及指向的內(nèi)存容易好理解,對(duì)于數(shù)值類型,64位系統(tǒng)小于等于8位的數(shù)值,一個(gè)指令周期就可以獲取,因此set/get操作atomic或noatomic都可以保證原子性,但對(duì)于數(shù)值操作,如self.count = self.count + 1;排除編譯器優(yōu)化的影響,分為讀取(load)、加1(add)、賦值(store)三步操作,store前可能存在多次store操作;
建議:盡量避免多線程設(shè)計(jì),若不可避免,則盡量不使用atomic屬性,而使用鎖機(jī)制,保證多條指令執(zhí)行的原子性;
源碼使用了自旋鎖來保證屬性指針修改或者值修改線程安全,源碼見下:
static inline void reallySetProperty(id self, SEL _cmd, id newValue, ptrdiff_t offset, bool atomic, bool copy, bool mutableCopy)
{
if (offset == 0) {
object_setClass(self, newValue);
return;
}
id oldValue;
id *slot = (id*) ((char*)self + offset);
if (copy) {
newValue = [newValue copyWithZone:nil];
} else if (mutableCopy) {
newValue = [newValue mutableCopyWithZone:nil];
} else {
if (*slot == newValue) return;
newValue = objc_retain(newValue);
}
if (!atomic) {
oldValue = *slot;
*slot = newValue;
} else {
spinlock_t& slotlock = PropertyLocks[slot];
slotlock.lock();
oldValue = *slot;
*slot = newValue;
slotlock.unlock();
}
objc_release(oldValue);
}
void objc_setProperty(id self, SEL _cmd, ptrdiff_t offset, id newValue, BOOL atomic, signed char shouldCopy)
{
bool copy = (shouldCopy && shouldCopy != MUTABLE_COPY);
bool mutableCopy = (shouldCopy == MUTABLE_COPY);
reallySetProperty(self, _cmd, newValue, offset, atomic, copy, mutableCopy);
}
void objc_setProperty_atomic(id self, SEL _cmd, id newValue, ptrdiff_t offset)
{
reallySetProperty(self, _cmd, newValue, offset, true, false, false);
}
關(guān)鍵代碼如下:
//設(shè)置noatomic
if (!atomic) {
oldValue = *slot;
*slot = newValue;
} else {//設(shè)置atomic
spinlock_t& slotlock = PropertyLocks[slot];//自旋鎖
slotlock.lock();//自旋鎖加鎖
oldValue = *slot;//保留舊值
*slot = newValue;//修改新值的指針地址
slotlock.unlock();//自旋鎖釋放鎖
}
dispatch_barrier_async dispatch_barrier_sync
這個(gè)函數(shù)傳入的必須是自己通過dispatch_queue_cretate創(chuàng)建的DISPATCH_QUEUE_CONCURRENT并發(fā)隊(duì)列,如果傳入的是一個(gè)串行或是一個(gè)全局的并發(fā)隊(duì)列,那這個(gè)函數(shù)便等同于dispatch_async函數(shù)的效果;
作用是類似柵欄,但相比柵欄能控制匯合的點(diǎn),即dispatch_barrier_asyn或者dispatch_barrier_sync,在這之前添加到隊(duì)列的所有block執(zhí)行完成,才去執(zhí)行這兩個(gè)函數(shù)中的block,再去執(zhí)行后續(xù)添加的block;
主要用于并發(fā)讀寫控制;
dispatch_queue_t queue = dispatch_queue_create("top.istones.rwQueue", DISPATCH_QUEUE_CONCURRENT);
// 讀
dispatch_async(queue, ^{
});
// 寫
dispatch_barrier_async(queue, ^{
});
兩者的相同:
- 等待前面的任務(wù)都執(zhí)行完成才去執(zhí)行后面的函數(shù)添加的任務(wù);
不同:
-
dispatch_barrier_sync會(huì)阻塞調(diào)用線程此函數(shù)后面的執(zhí)行(因?yàn)橥綀?zhí)行),而dispatch_barrier_async不會(huì)阻塞當(dāng)前調(diào)用線程后面的代碼執(zhí)行;
image.png
image.png
dispatch_barrier_sync、dispatch_barrier_async的使用
參考資料
Demo
https://github.com/FengyunSky/notes/blob/master/local/code/threadlock.tar


