GCD的dispatch_group_t的原理及使用

什么是dispatch_group_t

dispatch_group_t是提交給隊列用于異步調(diào)用的一組block??梢詫㈥犃械娜蝿誦lock進行分組,把隊列任務關聯(lián)到當前的組里面,然后還可以監(jiān)聽已關聯(lián)的任務是否全部執(zhí)行完成。dispatch_group_t可以通過dispatch_group_create函數(shù)創(chuàng)建:

dispatch_group_t group = dispatch_group_create();

dispatch_group_t的使用

dispatch_group_t的使用有兩種方式,一種dispatch_group_enter() 和dispatch_group_leave()配合的方式;另一種是使用dispatch_group_async函數(shù)。不管哪種方式,都可以通過dispatch_group_notify函數(shù)來監(jiān)聽group里面的任務是否都已經(jīng)執(zhí)行完畢,或者可以通過dispatch_group_wait函數(shù)等待所有任務執(zhí)行完畢。dispatch_group_wait和dispatch_group_notify的不同是,dispatch_group_notify可以指定所有任務完成之后執(zhí)行的隊列,它是異步的,不會阻塞當前線程;而dispatch_group_wait是同步的,會阻塞當前線程,但是dispatch_group_wait可以設置超時時間。下面就來看看dispatch_group_t使用示例。

dispatch_group_enter()&dispatch_group_leave()

這兩個函數(shù)必須成對出現(xiàn),這很類似信號量。dispatch_group_enter在任務添加之前調(diào)用,相當于告訴group要添加任務,而dispatch_group_leave則在任務結(jié)束的時候調(diào)用。以下是demo示例:

    dispatch_queue_t conQueue = dispatch_queue_create("com.hello-world.djx", DISPATCH_QUEUE_CONCURRENT);
    dispatch_group_t group = dispatch_group_create();
    dispatch_group_enter(group);
    dispatch_async(conQueue, ^{
        NSLog(@"1");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(conQueue, ^{
        NSLog(@"2");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(conQueue, ^{
        NSLog(@"3");
        dispatch_group_leave(group);
    });

    dispatch_group_notify(group, dispatch_get_main_queue(), ^{
        NSLog(@"任務完成,返回主線程");
    });
使用dispatch_group_async

這個函數(shù)直接實現(xiàn)了dispatch_group_enter和dispatch_group_leave的作用,以下是demo示例:

    dispatch_queue_t conQueue = dispatch_queue_create("com.hello-world.djx", DISPATCH_QUEUE_CONCURRENT);
    dispatch_group_t group = dispatch_group_create();
    dispatch_group_async(group, conQueue, ^{
        NSLog(@"1");
    });
    dispatch_group_async(group, conQueue, ^{
        NSLog(@"2");
    });
    dispatch_group_async(group, conQueue, ^{
        NSLog(@"3");
    });
    dispatch_group_notify(group, dispatch_get_main_queue(), ^{
        NSLog(@"任務完成,返回主線程");
    });

到這里的時候我們可能發(fā)現(xiàn)它跟柵欄dispatch_barrier_async函數(shù)很像,可以等某個隊列的某幾個任務都執(zhí)行了在執(zhí)行我們想要的操作。但還是group跟柵欄函數(shù)是不一樣的,group不會阻塞當前隊列,而且一個group不止可以關聯(lián)一個隊列,可以同時關聯(lián)多個隊列,下面我們就來看看group關聯(lián)多個隊列的用法。

可以關聯(lián)多個隊列任務、兩種方式一起使用
    dispatch_queue_t conQueue = dispatch_queue_create("com.hello-world.djx", DISPATCH_QUEUE_CONCURRENT);
    dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
    dispatch_group_t group = dispatch_group_create();

    dispatch_group_enter(group);
    dispatch_async(conQueue, ^{
        NSLog(@"1");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(globalQueue, ^{
        NSLog(@"2");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(conQueue, ^{
        NSLog(@"3");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(globalQueue, ^{
        NSLog(@"4");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(conQueue, ^{
        NSLog(@"5");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(globalQueue, ^{
        NSLog(@"6");
        dispatch_group_leave(group);
    });
    
    dispatch_group_async(group, conQueue, ^{
        NSLog(@"7");
    });

    dispatch_group_async(group, globalQueue, ^{
        NSLog(@"8");
    });
    
    dispatch_group_notify(group, dispatch_get_main_queue(), ^{
        NSLog(@"任務完成,返回主線程");
    });
    NSLog(@"開始等待");
    dispatch_time_t time = dispatch_walltime(NULL, 5* NSEC_PER_SEC);
    dispatch_group_wait(group, time);
    NSLog(@"等待結(jié)束")

從demo中可以看出,group可以關聯(lián)多個隊列conQueque和globalQueue,而且group的兩種用法dispatch_group_enter()&dispatch_group_leave()和dispatch_group_async是可以一起使用的,而且dispatch_group_notify和dispatch_group_wait并不是互斥的,他們也一樣可以同時使用。這個給了我們做業(yè)務有著很多選擇。

Group底層原理

這里先提出幾個問題:為什么dispatch_group_enter和dispatch_group_leave要成對出現(xiàn)?它們是怎么關聯(lián)的?以及dispatch_group_async為什么不需要enter和leave?dispatch_group_notify是如何知道所有任務都執(zhí)行完成了的?
接下來通過源碼分析來解釋這些問題。

dispatch_group_create源碼解析
dispatch_group_t
dispatch_group_create(void)
{
    return _dispatch_group_create_with_count(0);
}
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
    dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
            sizeof(struct dispatch_group_s));
    dg->do_next = DISPATCH_OBJECT_LISTLESS;
    dg->do_targetq = _dispatch_get_default_queue(false);
    if (n) {
        os_atomic_store2o(dg, dg_bits,
                (uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
        os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
    }
    return dg;
}

DISPATCH_VTABLE能夠獲取group的類信息。由于初始值n為0,所以這里的os_atomic_store2o不會被調(diào)用,所以這會dg_bits為0。dg_bits用于記錄enter和leave的信息。

dispatch_group_enter源碼解析

dispatch_group_enter函數(shù):

void
dispatch_group_enter(dispatch_group_t dg)
{
    // The value is decremented on a 32bits wide atomic so that the carry
    // for the 0 -> -1 transition is not propagated to the upper 32bits.
    uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
            DISPATCH_GROUP_VALUE_INTERVAL, acquire);
    uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
    if (unlikely(old_value == 0)) {
        _dispatch_retain(dg); // <rdar://problem/22318411>
    }
    if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
        DISPATCH_CLIENT_CRASH(old_bits,
                "Too many nested calls to dispatch_group_enter()");
    }
}

os_atomic_sub_orig2o調(diào)用流程:os_atomic_sub_orig2o->os_atomic_sub_orig->_os_atomic_c11_op_orig((p), (v), m, sub, -)->atomic_fetch_sub_explicit
在enter函數(shù)里,首先會對dg->dg_bits進行減一的原子操作,用于統(tǒng)計enter次數(shù)。有前面創(chuàng)建group可知,dg->dg_bits初始值應該是為0的,所以減1之后正常情況下不會等于0,所以這里懶得分析_dispatch_retain函數(shù)了。

dispatch_group_leave源碼解析

dispatch_group_leave函數(shù):

void
dispatch_group_leave(dispatch_group_t dg)
{
    // The value is incremented on a 64bits wide atomic so that the carry for
    // the -1 -> 0 transition increments the generation atomically.
    uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
            DISPATCH_GROUP_VALUE_INTERVAL, release);
    uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);

    if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
        old_state += DISPATCH_GROUP_VALUE_INTERVAL;
        do {
            new_state = old_state;
            if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
                new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
                new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
            } else {
                // If the group was entered again since the atomic_add above,
                // we can't clear the waiters bit anymore as we don't know for
                // which generation the waiters are for
                new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
            }
            if (old_state == new_state) break;
        } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
                old_state, new_state, &old_state, relaxed)));
        return _dispatch_group_wake(dg, old_state, true);
    }

    if (unlikely(old_value == 0)) {
        DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
                "Unbalanced call to dispatch_group_leave()");
    }
}

os_atomic_add_orig2o調(diào)用流程: os_atomic_add_orig2o->os_atomic_add_orig->_os_atomic_c11_op_orig->_os_atomic_c11_op_orig->atomic_fetch_add_explicit
實際上這里是實際上就是對dg->dg_state的原子加1操作,并返回舊值old_value。用于統(tǒng)計leave次數(shù)。dg_state操作完成之后會就會判斷enter和leave是否已經(jīng)成對,如果是就會調(diào)用_dispatch_group_wake喚醒group(dispatch_group_wait),同時去異步執(zhí)行notify任務。如果old_value==0,說明leave次數(shù)調(diào)用過多,會導致崩潰。

static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
    uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>

    if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
        dispatch_continuation_t dc, next_dc, tail;

        // Snapshot before anything is notified/woken <rdar://problem/8554546>
        dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
        do {
            dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
            next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
            _dispatch_continuation_async(dsn_queue, dc,
                    _dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
            _dispatch_release(dsn_queue);
        } while ((dc = next_dc));

        refs++;
    }

    if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
        _dispatch_wake_by_address(&dg->dg_gen);
    }

    if (refs) _dispatch_release_n(dg, refs);
}

_dispatch_group_wake函數(shù)中,會循環(huán)遍歷當前group里是否有notify任務,如果有則交給_dispatch_continuation_async把block提交到隊列中異步調(diào)用這個block,同時會判斷有沒有正在等待的group任務:

if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
        _dispatch_wake_by_address(&dg->dg_gen);
    }
dispatch_group_notify源碼分析
void
dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_block_t db)
{
    dispatch_continuation_t dsn = _dispatch_continuation_alloc();
    _dispatch_continuation_init(dsn, dq, db, 0, DC_FLAG_CONSUME);
    _dispatch_group_notify(dg, dq, dsn);
}
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_continuation_t dsn)
{
    uint64_t old_state, new_state;
    dispatch_continuation_t prev;

    dsn->dc_data = dq;
    _dispatch_retain(dq);

    prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
    if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
    os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
    if (os_mpsc_push_was_empty(prev)) {
        os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
            new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
            if ((uint32_t)old_state == 0) {
                os_atomic_rmw_loop_give_up({
                    return _dispatch_group_wake(dg, new_state, false);
                });
            }
        });
    }
}

在這里會首先把當前函數(shù)的任務(block)標記位notify,因為一會所有任務執(zhí)行完之后要調(diào)用。會循環(huán)(os_atomic_rmw_loop2o)監(jiān)聽狀態(tài),狀態(tài)滿足時old_state == 0執(zhí)行_dispatch_group_wake去執(zhí)行notify的block和喚醒group的wait。_dispatch_group_wake上面已經(jīng)有分析過了。

dispatch_group_wait源碼解析

dispatch_group_wait函數(shù)阻塞當前線程:

long
dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
{
    uint64_t old_state, new_state;

    os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, relaxed, {
        if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
            os_atomic_rmw_loop_give_up_with_fence(acquire, return 0);
        }
        if (unlikely(timeout == 0)) {
            os_atomic_rmw_loop_give_up(return _DSEMA4_TIMEOUT());
        }
        new_state = old_state | DISPATCH_GROUP_HAS_WAITERS;
        if (unlikely(old_state & DISPATCH_GROUP_HAS_WAITERS)) {
            os_atomic_rmw_loop_give_up(break);
        }
    });

    return _dispatch_group_wait_slow(dg, _dg_state_gen(new_state), timeout);
}
static long
_dispatch_group_wait_slow(dispatch_group_t dg, uint32_t gen,
        dispatch_time_t timeout)
{
    for (;;) {
        int rc = _dispatch_wait_on_address(&dg->dg_gen, gen, timeout, 0);
        if (likely(gen != os_atomic_load2o(dg, dg_gen, acquire))) {
            return 0;
        }
        if (rc == ETIMEDOUT) {
            return _DSEMA4_TIMEOUT();
        }
    }
}

dispatch_group_wait等待的原理是它開啟了一個循環(huán),檢查group任務相關的狀態(tài),如國狀態(tài)滿足就返回0,執(zhí)行dispatch_group_wait后面的代碼;如果超時就返回超時提醒,也會繼續(xù)執(zhí)行后面的代碼。

dispatch_group_async源碼解析

dispatch_group_async之所以能等同于dispatch_group_enter()和dispatch_group_leave()的操作,實際上它內(nèi)部實現(xiàn)也是依賴這兩個方法實現(xiàn)的:

void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_block_t db)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
    dispatch_qos_t qos;

    qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
    _dispatch_continuation_group_async(dg, dq, dc, qos);
}

static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_continuation_t dc, dispatch_qos_t qos)
{
    dispatch_group_enter(dg);
    dc->dc_data = dg;
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

這里是先創(chuàng)建一個dc,然后標記為group類型任務uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC。然后在把任務添加到隊列之前調(diào)用dispatch_group_enter(dg),到這里我們其實就可以發(fā)現(xiàn)了,dispatch_group_async實現(xiàn)原理實際上就是內(nèi)部也是依賴dispatch_group_enter和dispatch_group_leave函數(shù)來管理的。所以我們也很容易猜到在block任務被調(diào)用的時候,也會調(diào)用dispatch_group_leave函數(shù)來保持平衡的:

static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
    struct dispatch_object_s *dou = dc->dc_data;
    unsigned long type = dx_type(dou);
    if (type == DISPATCH_GROUP_TYPE) {
        _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
        _dispatch_trace_item_complete(dc);
        dispatch_group_leave((dispatch_group_t)dou);
    } else {
        DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容