RACSubject *subject = [RACSubject subject];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"before:%@",x);
}];
[subject sendNext:@"test"];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"after:%@",x);
}];
打印結(jié)果:
ReactiveCocoa[5111:179158] before:test
上面的例子中,發(fā)送消息前的訂閱觸發(fā)了,發(fā)送消息后的訂閱沒(méi)有觸發(fā),這是為什么呢?我們來(lái)看看具體的源碼實(shí)現(xiàn):
+ (instancetype)subject {
return [[self alloc] init];
}
- (instancetype)init {
self = [super init];
if (self == nil) return nil;
_disposable = [RACCompoundDisposable compoundDisposable];
_subscribers = [[NSMutableArray alloc] initWithCapacity:1];
return self;
}
創(chuàng)建的源碼實(shí)現(xiàn)比較簡(jiǎn)單,但看到RACSubject內(nèi)部有個(gè)成員變量_subscribers,是可變數(shù)組。接下來(lái)看看訂閱的實(shí)現(xiàn):
// 訂閱時(shí)創(chuàng)建訂閱者對(duì)象RACSubscriber,并調(diào)用自己的subscribe方法
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
//RACSubject有個(gè)subscribers屬性,用來(lái)存儲(chǔ)所有的訂閱者
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
[disposable addDisposable:[RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
// Since newer subscribers are generally shorter-lived, search
// starting from the end of the list.
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
//一次所有訂閱則
if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}]];
return disposable;
}
訂閱的主要實(shí)現(xiàn)就是創(chuàng)建訂閱者并加入到_subscribers中。RACSubject可以多次訂閱,每次訂閱都會(huì)創(chuàng)建一個(gè)訂閱者并添加到_subscribers數(shù)組中。
下面是發(fā)送消息的實(shí)現(xiàn):
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
//遍歷所有訂閱者發(fā)送消息
[subscriber sendNext:value];
}];
}
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}
for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}
發(fā)送消息時(shí),遍歷subscribers取出每一個(gè)subscriber發(fā)送消息。所以,必須先訂閱才能接收到消息。這就是為什么開(kāi)頭的例子中,只打印了 before:test。
RACSubject既能訂閱信號(hào)又能發(fā)送信號(hào),所以RACSubject是一個(gè)熱信號(hào)。上面有說(shuō)到,RACSubject必須在信號(hào)發(fā)送前訂閱才能觸發(fā),RAC還有另外一個(gè)類(lèi)RACReplaySubject可以解決這個(gè)問(wèn)題。
RACReplaySubject *subject = [RACReplaySubject subject];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"before:%@",x);
}];
[subject sendNext:@"test"];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"after:%@",x);
}];
打印結(jié)果:
ReactiveCocoa[5247:209617] before:test
ReactiveCocoa[5247:209617] after:test
還是同樣的例子,只是這次換成了RACReplaySubject,發(fā)送消息前后的訂閱都觸發(fā)了,這是怎么做到的呢?
RACReplaySubject是繼承自RACSubject。首先看RACReplaySubject初始化方法的實(shí)現(xiàn):
- (instancetype)init {
return [self initWithCapacity:RACReplaySubjectUnlimitedCapacity];
}
- (instancetype)initWithCapacity:(NSUInteger)capacity {
self = [super init];
_capacity = capacity;
_valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
return self;
}
RACReplaySubject也有個(gè)成員變量_valuesReceived是個(gè)可變數(shù)組。
RACReplaySubject的訂閱實(shí)現(xiàn)如下:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
// 消息訂閱時(shí)發(fā)送消息
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}
if (compoundDisposable.disposed) return;
if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];
[compoundDisposable addDisposable:schedulingDisposable];
return compoundDisposable;
}
從上面源碼能看到消息訂閱時(shí),如果檢測(cè)到self.valuesReceived中有值,那么當(dāng)前的訂閱者就會(huì)將值發(fā)送出去。
- (void)sendNext:(id)value {
@synchronized (self) {
//消息發(fā)送前先將value存儲(chǔ)到valuesReceived中,留給后面訂閱時(shí)發(fā)送
[self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
[super sendNext:value];
if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
[self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
}
}
}
能看到RACReplaySubject的sendNext實(shí)現(xiàn),是先將消息發(fā)送的值value存儲(chǔ)到valuesReceived中,然后調(diào)用父類(lèi)RACSubject的sendNext。RACReplaySubjec在每次訂閱時(shí),會(huì)遍歷_valuesReceived中存儲(chǔ)的值給訂閱者發(fā)送一遍。