一個(gè)信號,由
Signal類型表現(xiàn),是可以被持續(xù)監(jiān)視的一系列事件(events)。
信號一般用來表示“正在進(jìn)行中”的事件流,比如通知,用戶輸入等。隨著操作完成或者收到數(shù)據(jù),事件在信號上發(fā)送,信號將他們推向所有的監(jiān)聽器。所有的監(jiān)聽器會在同時(shí)看到事件。
用戶必須監(jiān)聽(
observe)一個(gè)信號來訪問它的事件。監(jiān)聽信號不會產(chǎn)生任何副作用(side effects)。換句話說,信號完全是生成器驅(qū)動(dòng)和基于推送的,而且在其生命周期內(nèi)消費(fèi)者(監(jiān)聽器)不能對它產(chǎn)生任何影響。監(jiān)聽一個(gè)信號的時(shí)候,用戶只能按照事件在信號上發(fā)送的順序處理事件。沒有方法可以隨機(jī)訪問信號的值。
可以通過在信號上施加原函數(shù)(primitives)來操作信號。典型的操作信號的原函數(shù)有
filter,map,reduce,以及一些同時(shí)操作多個(gè)信號的原函數(shù)(比如zip)。原函數(shù)僅在信號的Next事件上施加操作。
信號的生命周期由任意多個(gè)
Next事件,和一個(gè)緊隨其后的終結(jié)事件組成,終結(jié)事件可能是Failed,Completed,或者Interrupted(但不會是他們的組合)。終結(jié)事件不屬于信號的值,他們必須被特殊處理。
1. 信號什么樣
看看信號的定義,可以發(fā)現(xiàn)信號異常簡單,它僅僅持有了一個(gè)對監(jiān)聽器集合的引用而已:
public final class Signal<Value, Error: ErrorType> {
public typealias Observer = ReactiveCocoa.Observer<Value, Error>
private let atomicObservers: Atomic<Bag<Observer>?> = Atomic(Bag())
......
}
Bag是一個(gè)在ReactiveCocoa定義的數(shù)據(jù)結(jié)構(gòu),認(rèn)為它是一個(gè)數(shù)組就可以了。Atomic也是ReactiveCocoa自定的數(shù)據(jù)結(jié)構(gòu),給Atomic一個(gè)值以后,如果對這個(gè)值進(jìn)行操作,Atomic會保證線程安全。這些都是信號的內(nèi)部實(shí)現(xiàn)細(xì)節(jié),對信號的使用者來講是透明的。
關(guān)鍵在于信號的私有成員atomicObservers,它是正在監(jiān)聽這個(gè)信號的所有監(jiān)聽器的集合。通過typealias,信號的泛型定義決定了這些監(jiān)聽器能夠接受的事件的類型。也就是說,如果一個(gè)信號發(fā)出帶有String類型數(shù)據(jù)的Next事件,或者帶有SomeError類型的Failed事件,那么只有能夠處理這兩種事件的監(jiān)聽器才能監(jiān)聽它。
于是信號看起來就是這樣子的:

那么信號是怎么發(fā)送事件的?ReactiveCocoa的信號僅僅是一個(gè)媒介,它不管事件如何發(fā)生,只管把這些事件發(fā)送給監(jiān)聽自己的監(jiān)聽器們。打個(gè)比方,信號就是公路,而事件是公路上跑的車。事件怎么產(chǎn)生,需要用戶(也就是我們)在初始化信號時(shí)來告訴信號,下面就來看看信號是如何初始化的。
2. 創(chuàng)建信號
創(chuàng)建一個(gè)信號,可以通過信號的初始化方法init(_ generator: Observer -> Disposable?)。這個(gè)方法的外部參數(shù)名被取消了,不過從內(nèi)部參數(shù)名可以看出它的作用——事件源(generator)。剛才提到過,信號?除了誰在監(jiān)聽自己以外一無所知,其實(shí)事件源也不知道信號的存在,那么就必須有一個(gè)中間人將事件從事件源移動(dòng)到信號上。這個(gè)中間人,就是事件源接受的參數(shù)——一個(gè)Observer。他們之間的聯(lián)系如何建立,就要看看信號初始化方法的執(zhí)行過程:

- 先創(chuàng)建一個(gè)空的串行存根(
SerialDisposable) - 再創(chuàng)建一個(gè)監(jiān)聽器(
Observer),這個(gè)監(jiān)聽器持有上一步中創(chuàng)建的存根的引用,和對信號(Signal)本身的引用 - 將上一步創(chuàng)建的監(jiān)聽器交給事件源,讓事件源開始工作
- 將上一步返回的存根?(
Disposable)交給第一步的串行存根
上面第二步中的監(jiān)聽器可能是ReactiveCocoa中最重要的對象了(其實(shí),?把它叫做監(jiān)聽器讓人困惑,我覺得叫做事件分發(fā)器更合適,只不過使用Observer類來實(shí)現(xiàn)了而已),這個(gè)對象把ReactiveCocoa和ReactiveCocoa之外的世界(我們想做的App)聯(lián)系了起來。所以必須說明一下它的作用(就是上面圖中黃色圓角的action在做什么):
- 如果它從事件源收到任何事件,就在信號的監(jiān)聽器集合中循環(huán)迭代,將此事件原封不動(dòng)地?分發(fā)給每一個(gè)監(jiān)聽器。
- 如果它從事件源收到的事件是一個(gè)終結(jié)事件,除了分發(fā)這個(gè)事件外,它還會廢棄自己持有的存根對象。

事件源一開始發(fā)生事件,分發(fā)器就把這個(gè)事件分發(fā)給信號的所有監(jiān)聽器。一旦事件源發(fā)出了終結(jié)事件,分發(fā)器就廢棄自己持有的串行存根,這會進(jìn)而廢棄事件源返回的存根,釋放事件源占用的系統(tǒng)資源,事件源不再工作,信號就終結(jié)了。
值得注意的是:
- 其實(shí)在我們使用者看來,事件源,分發(fā)器,信號三者并沒有區(qū)分看待的必要,將他們整體看做信號就可以了。
- 信號一經(jīng)初始化,事件源就立即開始工作,發(fā)生事件(也就是所謂的“熱”信號)。
該看看我們的職責(zé)了——提供事件源。事件源是一個(gè)回調(diào)函數(shù),接受一個(gè)Observer參數(shù)(就是那個(gè)很重要的分發(fā)器),可以選擇性地返回一個(gè)Disposable。我們可以做任何想做的事,只要把想告知信號另一端的監(jiān)聽器的值用sendNext(value:)方法交給分發(fā)器就可以了;如果想要告訴對方我們做的事情失敗了,就用調(diào)用分發(fā)器的sendFailed(error:)方法;如果我們的操作正常結(jié)束就調(diào)用sendCompleted();如果我們被打斷了,就調(diào)用sendInterrupted()。
另外,如果我們的事件源要做一些很重的操作,需要占用系統(tǒng)資源要到操作完成才能釋放的話,我們可以把釋放資源的工作包裝到一個(gè)Disposable對象中,把它作為返回值傳回去。分發(fā)器會在收到我們的終結(jié)事件時(shí)幫我們調(diào)用這些清理和釋放的工作。當(dāng)然,要是沒有這個(gè)必要的話返回nil就可以了。
code somple
信號我們有了,那么如何監(jiān)聽信號呢?
3. 監(jiān)聽信號
相信你已經(jīng)有了答案,要監(jiān)聽一個(gè)信號,只要將一個(gè)類型正確的監(jiān)聽器加入到信號的監(jiān)聽器集合里就行了。為此,ReavtiveCocoa框架在Signal類中定義了observe(observer: Observer) -> Disposable?實(shí)例方法,把我們的監(jiān)聽器作為參數(shù)傳入就可以了。
signal.observe(Signal.Observer { event in
switch event {
case let .Next(next):
print("Next: \(next)")
case let .Failed(error):
print("Failed: \(error)")
case .Completed:
print("Completed")
case .Interrupted:
print("Interrupted")
}
})
值得一提的是這個(gè)方法的返回值,一個(gè)存根會交到我們手中,我們可以廢棄這個(gè)存根,這樣做僅僅會使我們的監(jiān)聽器被從信號的監(jiān)聽器集合中移除,從而停止接收信號發(fā)出的事件,但是對信號本身而言沒有任何影響。

在swift 2中,協(xié)議的定義中可以提供方法的默認(rèn)實(shí)現(xiàn)。所有聲明要實(shí)現(xiàn)?該協(xié)議的對象,如果沒有提供自己的對于這些方法的實(shí)現(xiàn),都可以使用這些默認(rèn)實(shí)現(xiàn)。ReactiveCocoa里定義了一個(gè)SignalType協(xié)議,規(guī)定了一個(gè)對象能夠被稱為信號所需要滿足的接口。同時(shí),它還定義了一些便利的幫助方法:
extension SignalType {
public func observe(action: Signal<Value, Error>.Observer.Action) -> Disposable? {
return observe(Observer(action))
}
public func observeNext(next: Value -> ()) -> Disposable? {
return observe(Observer(next: next))
}
public func observeCompleted(completed: () -> ()) -> Disposable? {
return observe(Observer(completed: completed))
}
public func observeFailed(error: Error -> ()) -> Disposable? {
return observe(Observer(failed: error))
}
public func observeInterrupted(interrupted: () -> ()) -> Disposable? {
return observe(Observer(interrupted: interrupted))
}
......
}
Signal類實(shí)現(xiàn)了SignalType協(xié)議,繼承了這些默認(rèn)方法,所以就不必顯示調(diào)用監(jiān)聽器的初始化函數(shù)了,只要針對我們感興趣的事件提供處理方法,作為參數(shù)傳入就可以了:
signal.observeNext { next in
print("Next: \(next)")
}
signal.observeFailed { error in
print("Failed: \(error)")
}
signal.observeCompleted {
print("Completed")
}
signal.observeInterrupted {
print("Interrupted")
}
4. 管道(Pipes)
一個(gè)管道,由
Signal.pipe()方法創(chuàng)建,是一個(gè)可以手動(dòng)控制的信號(signal)。
這個(gè)方法返回一個(gè)信號(
signal)和一個(gè)監(jiān)聽器(observer)??梢酝ㄟ^向監(jiān)聽器發(fā)送事件來控制信號。這在將非RAC的代碼橋接到信號的世界時(shí)非常有用。
比如,不在回調(diào)中處理應(yīng)用程序邏輯,?而是在這個(gè)回調(diào)中簡單的向監(jiān)聽器發(fā)送事件。同時(shí),信號可以被返回,隱藏了回調(diào)的實(shí)現(xiàn)細(xì)節(jié)。
pipe是定義在Signal類上的一個(gè)類方法,是另一種創(chuàng)建信號的方法。和信號的初始化方法不同,它不需要我們提供事件源,而是在返回值的元組中把事件分發(fā)器的引用交給我們,如何發(fā)送事件和何時(shí)發(fā)送時(shí)間完全由我們的后續(xù)處理而定:
/// Creates a Signal that will be controlled by sending events to the given
/// observer.
///
/// The Signal will remain alive until a terminating event is sent to the
/// observer.
public static func pipe() -> (Signal, Observer) {
var observer: Observer!
let signal = self.init { innerObserver in
observer = innerObserver
return nil
}
return (signal, observer)
}
pipe方法調(diào)用了信號的初始化方法,作為參數(shù)的事件源中沒有任何產(chǎn)生事件的處理,而是將事件分發(fā)器(上面代碼中的innerObserver)直接賦值到閉包外面的變量中,最后用元組的形式將創(chuàng)建好的信號和事件分發(fā)器返回。我們可以操作并監(jiān)聽返回的信號,或者在分發(fā)器上手動(dòng)發(fā)送事件:
let (signal, observer) = Signal<String, NoError>.pipe()
signal
.map { string in string.uppercaseString }
.observeNext { next in print(next) }
observer.sendNext("a") // Prints A
observer.sendNext("b") // Prints B
observer.sendNext("c") // Prints C
事件可以產(chǎn)生了,信號把它們傳遞到了我們的監(jiān)聽器里,我們的監(jiān)聽器把事件中關(guān)聯(lián)的值拿來做了我們要做的事?,F(xiàn)在輪到ReactiveCocoa中最強(qiáng)大的部分登場了。
5. 信號的變形
假定有一只正在發(fā)出白光的手電筒,我們從它那里得到了白色的光。如果把它放到一塊藍(lán)色的玻璃后面,我們得到的光就變成了藍(lán)色——信號發(fā)生了變形。
?用ReactiveCocoa的概念做個(gè)類比,信號就是手電筒,事件就是發(fā)出的光,監(jiān)聽器就是我們的眼睛。如果需要在事件發(fā)送到我們的監(jiān)聽器之前發(fā)生對它們做一些改變,就必須要有一個(gè)辦法把我們的藍(lán)色玻璃插入到信號和監(jiān)聽器之間,而且還應(yīng)該可以插入任意多個(gè)任意顏色的玻璃。上面提到的SignalType協(xié)議就提供這些辦法。
SignalType協(xié)議里有三個(gè)信號變形方法的默認(rèn)實(shí)現(xiàn),這三個(gè)方法(尤其是map)是其他信號變形的基礎(chǔ):
map<U>(transform: Value -> U) -> Signal<U, Error>mapError<F>(transform: Error -> F) -> Signal<Value, F>filter(predicate: Value -> Bool) -> Signal<Value, Error>
正如這三個(gè)方法一樣,所有關(guān)于信號變形操作的返回值依然是一個(gè)信號,也就是說可以進(jìn)一步對這個(gè)新信號再次施加變形操作,從而形成一個(gè)變形操作的鏈條。除了map,mapError,filter以外,ReactiveCocoa提供了許多其他的變形操作(后述),將這些操作排列組合,可以讓信號發(fā)生無窮無盡的變化。如果ReactiveCocoa提供的變形操作不夠用,我們可以擴(kuò)展SignalType協(xié)議(使用extension)加入自定義的變形方法。
code sample
下面我們來分別看看它們在做什么:
1. 映射(map和mapError)
顧名思義,映射就是事件一對一的變形,我們來決定變形的具體過程,將這個(gè)過程作為參數(shù)傳遞給map方法即可。
extension SignalType {
......
/// Maps each value in the signal to a new value.
@warn_unused_result(message="Did you forget to call `observe` on the signal?")
public func map<U>(transform: Value -> U) -> Signal<U, Error> {
return Signal { observer in
return self.observe { event in
observer.action(event.map(transform))
}
}
}
/// Maps errors in the signal to a new error.
@warn_unused_result(message="Did you forget to call `observe` on the signal?")
public func mapError<F>(transform: Error -> F) -> Signal<Value, F> {
return Signal { observer in
return self.observe { event in
observer.action(event.mapError(transform))
}
}
}
......
}
map方法寥寥數(shù)語,但是所做事情比較復(fù)雜,有必要慢慢分解一下的話:
- 首先創(chuàng)建一個(gè)新的信號,這個(gè)過程和前面提到的信號初始化相同,一個(gè)事件分發(fā)器被傳遞到事件源中。
- 新信號的事件源使用得到的分發(fā)器創(chuàng)建一個(gè)監(jiān)聽器,這個(gè)監(jiān)聽對我們作為參數(shù)傳入的變形方法有一個(gè)引用,它對每一個(gè)收到的事件實(shí)施這個(gè)變形方法,然后交給新信號的分發(fā)器。
- 新信號的?事件源不發(fā)生任何事件,僅僅把?第二步創(chuàng)建的監(jiān)聽器用
observe方法加入到當(dāng)前信號的監(jiān)聽器集合中。 - 因?yàn)橛昧?code>observe方法,一個(gè)
ActionDisposable類型的存根會返回,交給新信號的串行存根。 - 將新的信號返回。

簡而言之,映射操作就是使用當(dāng)前的信號作為事件源制造了一個(gè)新的信號。沿用我們的類比,就是把手電筒和藍(lán)色的玻璃綁在一起,當(dāng)成一個(gè)新的手電筒。上面的過程中第二步中創(chuàng)建的監(jiān)聽器十分關(guān)鍵,它起到了連接新舊兩個(gè)信號的作用,我們定義的變形方法(也就是我們制造的一個(gè)有顏色的玻璃)?被包裝在這個(gè)監(jiān)聽器中。第三步,這個(gè)監(jiān)聽器加入到了當(dāng)前信號的監(jiān)聽器集合中(跟手電筒綁在一起),一旦當(dāng)前的信號有事件發(fā)生,這個(gè)監(jiān)聽器就會收到并立即調(diào)用變形方法,然后將新的事件交給新信號的分發(fā)器,于是新的信號的監(jiān)聽器們(我們的眼睛)就收到了變形后的事件(藍(lán)色的光)。就像這樣:

上面第四步返回的存根,和之前提到的監(jiān)聽信號時(shí)得到的存根一樣,可以用來將負(fù)責(zé)事件變形的監(jiān)聽器從當(dāng)前的信號上移除,而信號本身不會受任何影響(相當(dāng)于把藍(lán)色的玻璃拿掉,而手電筒不會有什么變化。)。
2. 過濾(filter)
有了上面?zhèn)€關(guān)于映射的討論,再來看過濾的話就不困難了。過濾不會改變信號上事件流的值或類型,而是把不滿足一定條件的事件攔截掉。攔截的方法,就是在連接新舊信號的監(jiān)聽器中規(guī)定,如果事件不滿足條件,就不要把該事件傳遞給新信號的分發(fā)器。
extension SignalType {
......
/// Preserves only the values of the signal that pass the given predicate.
@warn_unused_result(message="Did you forget to call `observe` on the signal?")
public func filter(predicate: Value -> Bool) -> Signal<Value, Error> {
return Signal { observer in
return self.observe { (event: Event<Value, Error>) -> () in
if case let .Next(value) = event {
if predicate(value) {
observer.sendNext(value)
}
} else {
observer.action(event)
}
}
}
}
}

3. 聚合(reduce和collect)
6. 信號的組合
1. 組合(combine)
2. 打包(zip)
7. 信號的扁平化(Flatten)
1. 混合
2. 連接
3. 最新
8. 其他種類的變形
1. ignoreNil
2. take

3. collect

4. observeOn

5. combineLatestWith

6. delay

7. skip

8. materialize

9. dematerialize

10. sampleOn

11. takeUntil

12. skipUntil

13. combinePrevious

14. reduce

15. scan
scan(initial:, combine:)將信號包裝為一個(gè)新信號,每當(dāng)源信號發(fā)出事件時(shí),事件的值都會被累積,然后再轉(zhuǎn)發(fā)給新信號。具體的累積方法,由scan方法的第二個(gè)參數(shù)規(guī)定,累積的結(jié)果的類型可以和源信號的值得類型不同。scan的第一個(gè)參數(shù)是累積用的初始值,它的類型必須和累積的結(jié)果類型一致。
scan方法在原信號的監(jiān)聽器集合中加入一個(gè)監(jiān)聽器,當(dāng)信號發(fā)出第一個(gè)事件后,事件的值會和initial的值累積后轉(zhuǎn)發(fā)給新信號,累積的結(jié)果會保存在新信號的一個(gè)變量中。之后源信號發(fā)出的每一個(gè)事件的值都會和前一次累積的結(jié)果再次累積,然后轉(zhuǎn)發(fā)給新信號。

16. skipRepeats

17. skipWhile

18. takeUntilReplacement

19. takeLast
takeLast(count:)操作將信號包裝為一個(gè)新信號,在源信號發(fā)出完成事件時(shí),將源信號的最后count個(gè)事件發(fā)送出來,之后緊隨一個(gè)完成事件。在源信號發(fā)出完成事件之前,新信號不發(fā)出任何事件。
takeLast方法在源信號的監(jiān)聽器集合中加入一個(gè)帶有緩沖的監(jiān)聽器,這個(gè)緩沖是一個(gè)原信號值類型的數(shù)組,數(shù)組長度由參count數(shù)而定。當(dāng)源信號發(fā)出Next事件時(shí),這個(gè)監(jiān)聽器并不將事件轉(zhuǎn)發(fā)給新信號的事件分發(fā)器,而是將事件存儲在緩沖的數(shù)組中。如果事件的數(shù)量超過了緩沖的容量,就將最早的事件從緩沖中移除以騰出空間。當(dāng)源信號發(fā)出Complete事件時(shí),這個(gè)監(jiān)聽器就循環(huán)迭代緩沖數(shù)組,將其中所有的事件發(fā)送出去,之后再發(fā)出一個(gè)Complete事件。
如果源信號發(fā)出了Failed或Interrupted事件,緩沖機(jī)制不會執(zhí)行,而是直接轉(zhuǎn)發(fā)給新信號。

20. takeWhile

22. zipWith
