什么是dispatch_group_t
dispatch_group_t是提交給隊列用于異步調(diào)用的一組block??梢詫㈥犃械娜蝿誦lock進行分組,把隊列任務關聯(lián)到當前的組里面,然后還可以監(jiān)聽已關聯(lián)的任務是否全部執(zhí)行完成。dispatch_group_t可以通過dispatch_group_create函數(shù)創(chuàng)建:
dispatch_group_t group = dispatch_group_create();
dispatch_group_t的使用
dispatch_group_t的使用有兩種方式,一種dispatch_group_enter() 和dispatch_group_leave()配合的方式;另一種是使用dispatch_group_async函數(shù)。不管哪種方式,都可以通過dispatch_group_notify函數(shù)來監(jiān)聽group里面的任務是否都已經(jīng)執(zhí)行完畢,或者可以通過dispatch_group_wait函數(shù)等待所有任務執(zhí)行完畢。dispatch_group_wait和dispatch_group_notify的不同是,dispatch_group_notify可以指定所有任務完成之后執(zhí)行的隊列,它是異步的,不會阻塞當前線程;而dispatch_group_wait是同步的,會阻塞當前線程,但是dispatch_group_wait可以設置超時時間。下面就來看看dispatch_group_t使用示例。
dispatch_group_enter()&dispatch_group_leave()
這兩個函數(shù)必須成對出現(xiàn),這很類似信號量。dispatch_group_enter在任務添加之前調(diào)用,相當于告訴group要添加任務,而dispatch_group_leave則在任務結(jié)束的時候調(diào)用。以下是demo示例:
dispatch_queue_t conQueue = dispatch_queue_create("com.hello-world.djx", DISPATCH_QUEUE_CONCURRENT);
dispatch_group_t group = dispatch_group_create();
dispatch_group_enter(group);
dispatch_async(conQueue, ^{
NSLog(@"1");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(conQueue, ^{
NSLog(@"2");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(conQueue, ^{
NSLog(@"3");
dispatch_group_leave(group);
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"任務完成,返回主線程");
});
使用dispatch_group_async
這個函數(shù)直接實現(xiàn)了dispatch_group_enter和dispatch_group_leave的作用,以下是demo示例:
dispatch_queue_t conQueue = dispatch_queue_create("com.hello-world.djx", DISPATCH_QUEUE_CONCURRENT);
dispatch_group_t group = dispatch_group_create();
dispatch_group_async(group, conQueue, ^{
NSLog(@"1");
});
dispatch_group_async(group, conQueue, ^{
NSLog(@"2");
});
dispatch_group_async(group, conQueue, ^{
NSLog(@"3");
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"任務完成,返回主線程");
});
到這里的時候我們可能發(fā)現(xiàn)它跟柵欄dispatch_barrier_async函數(shù)很像,可以等某個隊列的某幾個任務都執(zhí)行了在執(zhí)行我們想要的操作。但還是group跟柵欄函數(shù)是不一樣的,group不會阻塞當前隊列,而且一個group不止可以關聯(lián)一個隊列,可以同時關聯(lián)多個隊列,下面我們就來看看group關聯(lián)多個隊列的用法。
可以關聯(lián)多個隊列任務、兩種方式一起使用
dispatch_queue_t conQueue = dispatch_queue_create("com.hello-world.djx", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_group_t group = dispatch_group_create();
dispatch_group_enter(group);
dispatch_async(conQueue, ^{
NSLog(@"1");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(globalQueue, ^{
NSLog(@"2");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(conQueue, ^{
NSLog(@"3");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(globalQueue, ^{
NSLog(@"4");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(conQueue, ^{
NSLog(@"5");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(globalQueue, ^{
NSLog(@"6");
dispatch_group_leave(group);
});
dispatch_group_async(group, conQueue, ^{
NSLog(@"7");
});
dispatch_group_async(group, globalQueue, ^{
NSLog(@"8");
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"任務完成,返回主線程");
});
NSLog(@"開始等待");
dispatch_time_t time = dispatch_walltime(NULL, 5* NSEC_PER_SEC);
dispatch_group_wait(group, time);
NSLog(@"等待結(jié)束")
從demo中可以看出,group可以關聯(lián)多個隊列conQueque和globalQueue,而且group的兩種用法dispatch_group_enter()&dispatch_group_leave()和dispatch_group_async是可以一起使用的,而且dispatch_group_notify和dispatch_group_wait并不是互斥的,他們也一樣可以同時使用。這個給了我們做業(yè)務有著很多選擇。
Group底層原理
這里先提出幾個問題:為什么dispatch_group_enter和dispatch_group_leave要成對出現(xiàn)?它們是怎么關聯(lián)的?以及dispatch_group_async為什么不需要enter和leave?dispatch_group_notify是如何知道所有任務都執(zhí)行完成了的?
接下來通過源碼分析來解釋這些問題。
dispatch_group_create源碼解析
dispatch_group_t
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
sizeof(struct dispatch_group_s));
dg->do_next = DISPATCH_OBJECT_LISTLESS;
dg->do_targetq = _dispatch_get_default_queue(false);
if (n) {
os_atomic_store2o(dg, dg_bits,
(uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
}
return dg;
}
DISPATCH_VTABLE能夠獲取group的類信息。由于初始值n為0,所以這里的os_atomic_store2o不會被調(diào)用,所以這會dg_bits為0。dg_bits用于記錄enter和leave的信息。
dispatch_group_enter源碼解析
dispatch_group_enter函數(shù):
void
dispatch_group_enter(dispatch_group_t dg)
{
// The value is decremented on a 32bits wide atomic so that the carry
// for the 0 -> -1 transition is not propagated to the upper 32bits.
uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
DISPATCH_GROUP_VALUE_INTERVAL, acquire);
uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
if (unlikely(old_value == 0)) {
_dispatch_retain(dg); // <rdar://problem/22318411>
}
if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
DISPATCH_CLIENT_CRASH(old_bits,
"Too many nested calls to dispatch_group_enter()");
}
}
os_atomic_sub_orig2o調(diào)用流程:os_atomic_sub_orig2o->os_atomic_sub_orig->_os_atomic_c11_op_orig((p), (v), m, sub, -)->atomic_fetch_sub_explicit
在enter函數(shù)里,首先會對dg->dg_bits進行減一的原子操作,用于統(tǒng)計enter次數(shù)。有前面創(chuàng)建group可知,dg->dg_bits初始值應該是為0的,所以減1之后正常情況下不會等于0,所以這里懶得分析_dispatch_retain函數(shù)了。
dispatch_group_leave源碼解析
dispatch_group_leave函數(shù):
void
dispatch_group_leave(dispatch_group_t dg)
{
// The value is incremented on a 64bits wide atomic so that the carry for
// the -1 -> 0 transition increments the generation atomically.
uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
DISPATCH_GROUP_VALUE_INTERVAL, release);
uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
old_state += DISPATCH_GROUP_VALUE_INTERVAL;
do {
new_state = old_state;
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
} else {
// If the group was entered again since the atomic_add above,
// we can't clear the waiters bit anymore as we don't know for
// which generation the waiters are for
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
}
if (old_state == new_state) break;
} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
old_state, new_state, &old_state, relaxed)));
return _dispatch_group_wake(dg, old_state, true);
}
if (unlikely(old_value == 0)) {
DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
"Unbalanced call to dispatch_group_leave()");
}
}
os_atomic_add_orig2o調(diào)用流程: os_atomic_add_orig2o->os_atomic_add_orig->_os_atomic_c11_op_orig->_os_atomic_c11_op_orig->atomic_fetch_add_explicit
實際上這里是實際上就是對dg->dg_state的原子加1操作,并返回舊值old_value。用于統(tǒng)計leave次數(shù)。dg_state操作完成之后會就會判斷enter和leave是否已經(jīng)成對,如果是就會調(diào)用_dispatch_group_wake喚醒group(dispatch_group_wait),同時去異步執(zhí)行notify任務。如果old_value==0,說明leave次數(shù)調(diào)用過多,會導致崩潰。
static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
dispatch_continuation_t dc, next_dc, tail;
// Snapshot before anything is notified/woken <rdar://problem/8554546>
dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
do {
dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
_dispatch_continuation_async(dsn_queue, dc,
_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
_dispatch_release(dsn_queue);
} while ((dc = next_dc));
refs++;
}
if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
_dispatch_wake_by_address(&dg->dg_gen);
}
if (refs) _dispatch_release_n(dg, refs);
}
_dispatch_group_wake函數(shù)中,會循環(huán)遍歷當前group里是否有notify任務,如果有則交給_dispatch_continuation_async把block提交到隊列中異步調(diào)用這個block,同時會判斷有沒有正在等待的group任務:
if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
_dispatch_wake_by_address(&dg->dg_gen);
}
dispatch_group_notify源碼分析
void
dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dsn = _dispatch_continuation_alloc();
_dispatch_continuation_init(dsn, dq, db, 0, DC_FLAG_CONSUME);
_dispatch_group_notify(dg, dq, dsn);
}
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dsn)
{
uint64_t old_state, new_state;
dispatch_continuation_t prev;
dsn->dc_data = dq;
_dispatch_retain(dq);
prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
if (os_mpsc_push_was_empty(prev)) {
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
if ((uint32_t)old_state == 0) {
os_atomic_rmw_loop_give_up({
return _dispatch_group_wake(dg, new_state, false);
});
}
});
}
}
在這里會首先把當前函數(shù)的任務(block)標記位notify,因為一會所有任務執(zhí)行完之后要調(diào)用。會循環(huán)(os_atomic_rmw_loop2o)監(jiān)聽狀態(tài),狀態(tài)滿足時old_state == 0執(zhí)行_dispatch_group_wake去執(zhí)行notify的block和喚醒group的wait。_dispatch_group_wake上面已經(jīng)有分析過了。
dispatch_group_wait源碼解析
dispatch_group_wait函數(shù)阻塞當前線程:
long
dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
{
uint64_t old_state, new_state;
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, relaxed, {
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
os_atomic_rmw_loop_give_up_with_fence(acquire, return 0);
}
if (unlikely(timeout == 0)) {
os_atomic_rmw_loop_give_up(return _DSEMA4_TIMEOUT());
}
new_state = old_state | DISPATCH_GROUP_HAS_WAITERS;
if (unlikely(old_state & DISPATCH_GROUP_HAS_WAITERS)) {
os_atomic_rmw_loop_give_up(break);
}
});
return _dispatch_group_wait_slow(dg, _dg_state_gen(new_state), timeout);
}
static long
_dispatch_group_wait_slow(dispatch_group_t dg, uint32_t gen,
dispatch_time_t timeout)
{
for (;;) {
int rc = _dispatch_wait_on_address(&dg->dg_gen, gen, timeout, 0);
if (likely(gen != os_atomic_load2o(dg, dg_gen, acquire))) {
return 0;
}
if (rc == ETIMEDOUT) {
return _DSEMA4_TIMEOUT();
}
}
}
dispatch_group_wait等待的原理是它開啟了一個循環(huán),檢查group任務相關的狀態(tài),如國狀態(tài)滿足就返回0,執(zhí)行dispatch_group_wait后面的代碼;如果超時就返回超時提醒,也會繼續(xù)執(zhí)行后面的代碼。
dispatch_group_async源碼解析
dispatch_group_async之所以能等同于dispatch_group_enter()和dispatch_group_leave()的操作,實際上它內(nèi)部實現(xiàn)也是依賴這兩個方法實現(xiàn)的:
void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
_dispatch_continuation_group_async(dg, dq, dc, qos);
}
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc, dispatch_qos_t qos)
{
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
這里是先創(chuàng)建一個dc,然后標記為group類型任務uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC。然后在把任務添加到隊列之前調(diào)用dispatch_group_enter(dg),到這里我們其實就可以發(fā)現(xiàn)了,dispatch_group_async實現(xiàn)原理實際上就是內(nèi)部也是依賴dispatch_group_enter和dispatch_group_leave函數(shù)來管理的。所以我們也很容易猜到在block任務被調(diào)用的時候,也會調(diào)用dispatch_group_leave函數(shù)來保持平衡的:
static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
struct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if (type == DISPATCH_GROUP_TYPE) {
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
dispatch_group_leave((dispatch_group_t)dou);
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
}