RACSubject

RACSubject *subject = [RACSubject subject];
[subject subscribeNext:^(id  _Nullable x) {
    NSLog(@"before:%@",x);
}];
[subject sendNext:@"test"];

[subject subscribeNext:^(id  _Nullable x) {
    NSLog(@"after:%@",x);
}];

打印結(jié)果:
ReactiveCocoa[5111:179158] before:test

上面的例子中,發(fā)送消息前的訂閱觸發(fā)了,發(fā)送消息后的訂閱沒(méi)有觸發(fā),這是為什么呢?我們來(lái)看看具體的源碼實(shí)現(xiàn):

+ (instancetype)subject {
    return [[self alloc] init];
}
- (instancetype)init {
    self = [super init];
    if (self == nil) return nil;

    _disposable = [RACCompoundDisposable compoundDisposable];
    _subscribers = [[NSMutableArray alloc] initWithCapacity:1];
    
    return self;
}

創(chuàng)建的源碼實(shí)現(xiàn)比較簡(jiǎn)單,但看到RACSubject內(nèi)部有個(gè)成員變量_subscribers,是可變數(shù)組。接下來(lái)看看訂閱的實(shí)現(xiàn):

// 訂閱時(shí)創(chuàng)建訂閱者對(duì)象RACSubscriber,并調(diào)用自己的subscribe方法
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
    NSCParameterAssert(nextBlock != NULL);
    
    RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
    return [self subscribe:o];
}

//RACSubject有個(gè)subscribers屬性,用來(lái)存儲(chǔ)所有的訂閱者
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);

    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

    NSMutableArray *subscribers = self.subscribers;
    @synchronized (subscribers) {
        [subscribers addObject:subscriber];
    }
    
    [disposable addDisposable:[RACDisposable disposableWithBlock:^{
        @synchronized (subscribers) {  
            // Since newer subscribers are generally shorter-lived, search
            // starting from the end of the list.
            NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
                return obj == subscriber;
            }];
            //一次所有訂閱則
            if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
        }
    }]];

    return disposable;
}

訂閱的主要實(shí)現(xiàn)就是創(chuàng)建訂閱者并加入到_subscribers中。RACSubject可以多次訂閱,每次訂閱都會(huì)創(chuàng)建一個(gè)訂閱者并添加到_subscribers數(shù)組中。

下面是發(fā)送消息的實(shí)現(xiàn):

- (void)sendNext:(id)value {
    [self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
        //遍歷所有訂閱者發(fā)送消息
        [subscriber sendNext:value];
    }];
}

- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
    NSArray *subscribers;
    @synchronized (self.subscribers) {
        subscribers = [self.subscribers copy];
    }

    for (id<RACSubscriber> subscriber in subscribers) {
        block(subscriber);
    }
}

發(fā)送消息時(shí),遍歷subscribers取出每一個(gè)subscriber發(fā)送消息。所以,必須先訂閱才能接收到消息。這就是為什么開(kāi)頭的例子中,只打印了 before:test。

RACSubject既能訂閱信號(hào)又能發(fā)送信號(hào),所以RACSubject是一個(gè)熱信號(hào)。上面有說(shuō)到,RACSubject必須在信號(hào)發(fā)送前訂閱才能觸發(fā),RAC還有另外一個(gè)類(lèi)RACReplaySubject可以解決這個(gè)問(wèn)題。

RACReplaySubject *subject = [RACReplaySubject subject];
    [subject subscribeNext:^(id  _Nullable x) {
        NSLog(@"before:%@",x);
    }];
    [subject sendNext:@"test"];

    [subject subscribeNext:^(id  _Nullable x) {
        NSLog(@"after:%@",x);
    }];
打印結(jié)果:
ReactiveCocoa[5247:209617] before:test
ReactiveCocoa[5247:209617] after:test

還是同樣的例子,只是這次換成了RACReplaySubject,發(fā)送消息前后的訂閱都觸發(fā)了,這是怎么做到的呢?

RACReplaySubject是繼承自RACSubject。首先看RACReplaySubject初始化方法的實(shí)現(xiàn):

- (instancetype)init {
    return [self initWithCapacity:RACReplaySubjectUnlimitedCapacity];
}

- (instancetype)initWithCapacity:(NSUInteger)capacity {
    self = [super init];
    
    _capacity = capacity;
    _valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
    
    return self;
}

RACReplaySubject也有個(gè)成員變量_valuesReceived是個(gè)可變數(shù)組。

RACReplaySubject的訂閱實(shí)現(xiàn)如下:

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

    RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
        @synchronized (self) {
            for (id value in self.valuesReceived) {
                if (compoundDisposable.disposed) return;
                // 消息訂閱時(shí)發(fā)送消息
                [subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
            }

            if (compoundDisposable.disposed) return;

            if (self.hasCompleted) {
                [subscriber sendCompleted];
            } else if (self.hasError) {
                [subscriber sendError:self.error];
            } else {
                RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
                [compoundDisposable addDisposable:subscriptionDisposable];
            }
        }
    }];

    [compoundDisposable addDisposable:schedulingDisposable];

    return compoundDisposable;
}

從上面源碼能看到消息訂閱時(shí),如果檢測(cè)到self.valuesReceived中有值,那么當(dāng)前的訂閱者就會(huì)將值發(fā)送出去。

- (void)sendNext:(id)value {
    @synchronized (self) {
        //消息發(fā)送前先將value存儲(chǔ)到valuesReceived中,留給后面訂閱時(shí)發(fā)送
        [self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
        [super sendNext:value];
        
        if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
            [self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
        }
    }
}

能看到RACReplaySubject的sendNext實(shí)現(xiàn),是先將消息發(fā)送的值value存儲(chǔ)到valuesReceived中,然后調(diào)用父類(lèi)RACSubject的sendNext。RACReplaySubjec在每次訂閱時(shí),會(huì)遍歷_valuesReceived中存儲(chǔ)的值給訂閱者發(fā)送一遍。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容