Observer(觀察者)、Observable(可觀察序列)
? 核心理解就是一個(gè)觀察者(Observer)訂閱一個(gè)可觀察序列(Observable),觀察者(Observer)對(duì)可觀察序列(Observable)發(fā)射的數(shù)據(jù)或數(shù)據(jù)序列作出響應(yīng)。
? 可觀察序列存在三種情況:發(fā)射數(shù)據(jù)(Next)、遇到問題(Error)、發(fā)射完成(Completed),也就是3個(gè)事件
enum Event<Element> {
case Next(Element) // 序列的下一個(gè)元素
case Error(ErrorType) // 序列因?yàn)槟承╁e(cuò)誤終止
case Completed // 正常的序列技術(shù)
}
觀察者 (Observer)
觀察者需要一個(gè)訂閱序列的功能,如下:
class Observable<Element> {
func subscribe(observer: Observer<Element>) -> Disposable
}
protocol ObserverType {
func on(event: Event<Element>)
}
通過這個(gè) subscribe 來訂閱序列。這里就涉及到序列的“冷”“熱”:
- 冷:只有當(dāng)有觀察者訂閱這個(gè)序列時(shí),序列才發(fā)射值;
- 熱:序列創(chuàng)建時(shí)就開始發(fā)射值。
AsyncSubject
let disposeBag = DisposeBag()
let subject = AsyncSubject<String>()
subject.subscribe{
print("subscription: 1 Event:", $0)
}.disposed(by: disposeBag)
subject.onNext("??")
subject.onNext("??")
subject.onNext("??")
subject.onNext("??")
subject.onCompleted()
打?。?br>
subscription: 1 Event: next(??)
subscription: 1 Event: completed
Subject
? 我們把 Subject 當(dāng)作一個(gè)橋梁(或者說是代理), Subject 既是 Observable 也是 Observer :
- 作為一個(gè) Observer ,它可以訂閱序列。
- 同時(shí)作為一個(gè) Observable ,它可以轉(zhuǎn)發(fā)或者發(fā)射數(shù)據(jù)。
PublishSubject
PublishSubject 只發(fā)射給觀察者訂閱后的數(shù)據(jù)。

let disposeBag = DisposeBag()
let subject = PublishSubject<String>()
subject.subscribe{
print("subscription: 1 Event:", $0)
}.disposed(by: disposeBag)
subject.onNext("??")
subject.onNext("??")
subject.subscribe{
print("subscription: 2 Event:", $0)
}.disposed(by: disposeBag)
subject.onNext("??")
subject.onNext("??")
打?。?br>
subscription: 1 Event: next(??)
subscription: 1 Event: next(??)
subscription: 1 Event: next(??)
subscription: 2 Event: next(??)
subscription: 1 Event: next(??)
subscription: 2 Event: next(??)
ReplaySubject
和 PushblishSubject 不同,不論觀察者什么時(shí)候訂閱, ReplaySubject 都會(huì)發(fā)射完整的數(shù)據(jù)給觀察者。

let disposeBag = DisposeBag()
let subject = ReplaySubject<String>.create(bufferSize: 1)
subject.subscribe{
print("Subscription: 1 Event:", $0)
}.disposed(by: disposeBag)
subject.onNext("??")
subject.onNext("??")
subject.subscribe{
print("Subscription: 2 Event:", $0)
}.disposed(by: disposeBag)
subject.onNext("1??")
subject.onNext("??")
打?。?br>
Subscription: 1 Event: next(??)
Subscription: 1 Event: next(??)
Subscription: 2 Event: next(??)
Subscription: 1 Event: next(1??)
Subscription: 2 Event: next(1??)
Subscription: 1 Event: next(??)
Subscription: 2 Event: next(??)
BehaviorSubject
? 當(dāng)觀察者對(duì) BehaviorSubject 進(jìn)行訂閱時(shí),它會(huì)將源 Observable 中最新的元素發(fā)送出來(如果不存在最新的元素,就發(fā)出默認(rèn)元素)。然后將隨后產(chǎn)生的元素發(fā)送出來。

let disposeBag = DisposeBag()
let subject = BehaviorSubject.init(value: "??")
subject.subscribe{
print("Subcription: 1 Event:", $0)
}.disposed(by: disposeBag)
subject.onNext("??")
subject.onNext("??")
subject.subscribe{
print("Subcription: 2 Event:", $0)
}.disposed(by: disposeBag)
subject.onNext("??")
subject.onNext("??")
subject.subscribe{
print("Subcription: 3 Event:", $0)
}.disposed(by: disposeBag)
subject.onNext("??")
subject.onNext("??")
打?。?br>
Subcription: 1 Event: next(??)
Subcription: 1 Event: next(??)
Subcription: 1 Event: next(??)
Subcription: 2 Event: next(??)
Subcription: 1 Event: next(??)
Subcription: 2 Event: next(??)
Subcription: 1 Event: next(??)
Subcription: 2 Event: next(??)
Subcription: 3 Event: next(??)
Subcription: 1 Event: next(??)
Subcription: 2 Event: next(??)
Subcription: 3 Event: next(??)
Subcription: 1 Event: next(??)
Subcription: 2 Event: next(??)
Subcription: 3 Event: next(??)
Variable
Variable 是 BehaviorSubject 的一個(gè)封裝。相比 BehaviorSubject ,它不會(huì)因?yàn)殄e(cuò)誤終止也不會(huì)正常終止,是一個(gè)無限序列。
(注意:由于 Variable 在之后版本中將被廢棄,建議使用 Varible 的地方都改用下面介紹的 BehaviorRelay 作為替代。)
func variableTest() {
let varible = BehaviorRelay<String>(value: "OnePice:最強(qiáng) Z")
varible.asObservable().subscribe { (event) in
print("Subscription: 1, event: \(event)")
}.disposed(by: disposeBag)
varible.accept("路飛")
varible.accept("索大")
varible.asObservable().subscribe { (event) in
print("----> Subscription: 2, event: \(event)")
}.disposed(by: disposeBag)
varible.accept("鳴人:風(fēng)遁螺旋手里?")
varible.accept("二柱子:天照")
}
打?。?br>
Subscription: 1, event: next(OnePice:最強(qiáng) Z)
Subscription: 1, event: next(路飛)
Subscription: 1, event: next(索大)
----> Subscription: 2, event: next(索大)
Subscription: 1, event: next(鳴人:風(fēng)遁螺旋手里?)
----> Subscription: 2, event: next(鳴人:風(fēng)遁螺旋手里?)
Subscription: 1, event: next(二柱子:天照)
----> Subscription: 2, event: next(二柱子:天照)
? 我們最常用的 Subject 應(yīng)該就是 Variable 。Variable 很適合做數(shù)據(jù)源,比如作為一個(gè) UITableView 的數(shù)據(jù)源,我們可以在這里保留一個(gè)完整的 Array 數(shù)據(jù),每一個(gè)訂閱者都可以獲得這個(gè) Array 。
Operator 操作符
? 操作符可以幫助大家創(chuàng)建新的序列,或者變化組合原有的序列,從而生成一個(gè)新的序列。
? 我們之前在輸入驗(yàn)證例子中就多次運(yùn)用到操作符。例如,通過 map 方法將輸入的用戶名,轉(zhuǎn)換為用戶名是否有效。然后用這個(gè)轉(zhuǎn)化后來的序列來控制紅色提示語是否隱藏。我們還通過 combineLatest 方法,將用戶名是否有效和密碼是否有效合并成兩者是否同時(shí)有效。然后用這個(gè)合成后來的序列來控制按鈕是否可點(diǎn)擊。
filter
一個(gè)序列的溫度列表,你可以用 filter 創(chuàng)建一個(gè)新的序列。這個(gè)序列只發(fā)出溫度大于 10 度的元素。
let disposeBag = DisposeBag()
Observable.of(2, 32, 22, 5, 60, 1).filter { (number) -> Bool in
number > 10
}.subscribe { (number) in
print("溫度: \(number)")
}.disposed(by: disposeBag)
溫度: next(32)
溫度: next(22)
溫度: next(60)
溫度: completed
map - 轉(zhuǎn)換
? 可以用 map 創(chuàng)建一個(gè)新的序列。這個(gè)序列將原有的 JSON 轉(zhuǎn)換成 Model 。這種轉(zhuǎn)換實(shí)際上就是解析 JSON 。
let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
.map { $0 * 10 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
10
20
30
zip - 組合
&emps; 通過一個(gè)函數(shù)將多個(gè) Observables 的元素(最多8個(gè))組合起來,然后將每一個(gè)組合的結(jié)果發(fā)出來
let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<String>()
Observable.zip(first, second) { $0 + $1 }
.subscribe(onNext: {
print($0)
}).disposed(by: disposeBag)
first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")
打?。?br>
1A
2B
3C
4D
線程切換
Schedulers(調(diào)度器)
? Schedulers 是 Rx 實(shí)現(xiàn)多線程的核心模塊,它主要用于控制任務(wù)在哪個(gè)線程或隊(duì)列運(yùn)行。
切換線程
sequence1.observeOn(backgroundScheduler) // 切換到后臺(tái)線程
.map { n in
print("在 background scheduler 執(zhí)行")
}
.observeOn(MainScheduler.instance) // 切換到主線程
.map { n in
print("在 main scheduler")
}
線程的切換支持 GCD 和 NSOperation,主要使用兩個(gè)操作符:observeOn 和 subscribeOn ,常用的還是 observeOn 。
- 調(diào)用 observeOn 指定接下來的操作在哪個(gè)線程;
- 調(diào)用 subscribeOn 決定訂閱者的操作執(zhí)行在哪個(gè)線程。
若我們沒有明確調(diào)用這兩個(gè)操作,后面的操作都是在當(dāng)前線程執(zhí)行的。
? subscribeOn 來決定數(shù)據(jù)序列的構(gòu)建函數(shù)在哪個(gè) Scheduler 上運(yùn)行。網(wǎng)絡(luò)請(qǐng)求中,由于獲取 Data 需要花很長(zhǎng)的時(shí)間,所以用 subscribeOn 切換到 后臺(tái) Scheduler 來獲取 Data。這樣可以避免主線程被阻塞。
? observeOn 來決定在哪個(gè) Scheduler 監(jiān)聽這個(gè)數(shù)據(jù)序列。UI刷新數(shù)據(jù),通過使用 observeOn 方法切換到主線程來監(jiān)聽并且處理結(jié)果。
? 一個(gè)比較典型的例子就是,在后臺(tái)發(fā)起網(wǎng)絡(luò)請(qǐng)求,然后解析數(shù)據(jù),最后在主線程刷新頁面。你就可以先用 subscribeOn 切到后臺(tái)去發(fā)送請(qǐng)求并解析數(shù)據(jù),最后用 observeOn 切換到主線程更新頁面。
MainScheduler
?MainScheduler 代表主線程。如果你需要執(zhí)行一些和 UI 相關(guān)的任務(wù),就需要切換到該 Scheduler 運(yùn)行。
public class func ensureExecutingOnScheduler()
? 可以保證代碼一定執(zhí)行在主線程的地方調(diào)用 MainScheduler.ensureExecutingOnScheduler(),特別是在線程切換來切換去的情況下,或者是調(diào)用其他的庫,我們不確定當(dāng)前是否在執(zhí)行在主線程。畢竟 UI 的更新還是要在主線程執(zhí)行的。
SerialDispatchQueueScheduler
?SerialDispatchQueueScheduler(串行的調(diào)度器)抽象了串行 DispatchQueue, MainScheduler 就是繼承于它。如果你需要執(zhí)行一些串行任務(wù),可以切換到這個(gè) Scheduler 運(yùn)行。
ConcurrentDispatchQueueScheduler
?ConcurrentDispatchQueueScheduler 抽象了并行 DispatchQueue。如果你需要執(zhí)行一些并發(fā)任務(wù),可以切換到這個(gè) Scheduler 運(yùn)行。
OperationQueueScheduler
?OperationQueueScheduler 抽象了 NSOperationQueue。它具備 NSOperationQueue 的一些特點(diǎn),例如,你可以通過設(shè)置maxConcurrentOperationCount,來控制同時(shí)執(zhí)行并發(fā)任務(wù)的最大數(shù)量。
Error Handling - 錯(cuò)誤處理
? 一旦序列里面產(chǎn)出了一個(gè) error 事件,整個(gè)序列將被終止。RxSwift 主要有兩種錯(cuò)誤處理機(jī)制:
- retry - 重試
- catch - 恢復(fù)
retryWhen
? retryWhen操作符,這個(gè)操作符主要描述應(yīng)該在何時(shí)重試,并且通過閉包里面返回的 Observable 來控制重試的時(shí)機(jī)
catchError - 恢復(fù)
? catchError 可以在錯(cuò)誤產(chǎn)生時(shí),用一個(gè)備用元素或者一組備用元素將錯(cuò)誤替換掉
參考資料:
線程切換