由于項(xiàng)目的需要,筆者最近在學(xué)習(xí)swift,為了更加高效的擼代碼,研究了RxSwift這個(gè)牛逼的庫(kù),對(duì)于它的核心流程,進(jìn)行了一番探索(實(shí)際上耗費(fèi)了畢生心血),對(duì)于下面的敘述,如果錯(cuò)誤,還望大家多多留言指正,謝謝了??????
核心代碼塊
override func viewDidLoad() {
super.viewDidLoad()
//1、創(chuàng)建序列
let ob = Observable<Any>.create { (obserber) -> Disposable in
//3、發(fā)送信號(hào)
obserber.onNext("佐籩")
obserber.onError(NSError.init(domain: "1234", code: 10086, userInfo: nil))
obserber.onCompleted()
return Disposables.create()
}
//2、訂閱信號(hào)
let _ = ob.subscribe(onNext: { (text) in
print("訂閱到:\(text)")
}, onError: { (error) in
print("錯(cuò)誤:\(error)")
}, onCompleted: {
print("完成")
}) {
print("銷毀")
}
}
運(yùn)行代碼結(jié)果如下:

核心代碼如上代碼所示,共三步
- 創(chuàng)建序列
- 訂閱信號(hào)
- 發(fā)送信號(hào)
那么對(duì)于上面的流程中提出一下幾個(gè)問(wèn)題:
- 序列是如何創(chuàng)建?
- 序列是如何接受到發(fā)送的信號(hào)的?
- 整個(gè)流程又是如何實(shí)現(xiàn)的?
下面筆者會(huì)跟著大家一起進(jìn)入源碼,一探究竟!
想要具體了解原理的小伙伴,最好帶著源碼進(jìn)行閱讀下面的內(nèi)容,便于你的理解
源碼解析
創(chuàng)建序列
序列的創(chuàng)建很簡(jiǎn)單,就是下面一行代碼
let ob = Observable<Any>.create { 閉包A }
進(jìn)入查看create,這里的create只能在create.swift文件里查看,不然跟蹤不到這個(gè)方法??
可是,為神馬會(huì)走到這里來(lái)呢?
進(jìn)入對(duì)象Observable查看,會(huì)得到下面的繼承關(guān)系

所以在ObservableType的擴(kuò)展中查看create方法,具體實(shí)現(xiàn)如下
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
可以看到,返回了一個(gè)AnonymousObservable對(duì)象,并且傳入了一個(gè)參數(shù)subscribe,通過(guò)創(chuàng)建的代碼可以知道,這里的subscribe就是外面我們定義的閉包,后面文章中筆者會(huì)稱為它為閉包A,也就是發(fā)送信號(hào)的部分。
進(jìn)入AnonymousObservable類的查看源碼,
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
這個(gè)類的實(shí)現(xiàn)很簡(jiǎn)單,通過(guò)init初始化方法可以看到,閉包A被保存在了_subscribeHandler屬性中。 同時(shí)這里還提供了一個(gè)非常重要run方法,后面會(huì)走到這里,所以這里先強(qiáng)調(diào)一下。
到此,序列的創(chuàng)建就結(jié)束了,下面在看訂閱信號(hào)的解析
訂閱信號(hào)
//2、訂閱信號(hào)
let _ = ob.subscribe(onNext: { (text) in
print("訂閱到:\(text)")
}, onError: { (error) in
print("錯(cuò)誤:\(error)")
}, onCompleted: {
print("完成")
}) {
print("銷毀")
}
從上面代碼可以知道,重點(diǎn)解析在于方法subscribe,進(jìn)入源碼,過(guò)濾掉其他無(wú)關(guān)代碼如下

嗯,不錯(cuò),就這兩行代碼,信號(hào)的訂閱到發(fā)送,就結(jié)束了。
此刻請(qǐng)?jiān)试S筆者通過(guò)下面內(nèi)容來(lái)表達(dá)一下此刻的內(nèi)心

嗯。。。耐下心來(lái),慢慢琢磨可以知道,想要突破,那么關(guān)鍵代碼就在下面這一行
self.asObservable().subscribe(observer)
通過(guò)斷點(diǎn)調(diào)試,可以知道這里的self表示著AnonymousObservable,或者通過(guò)我們寫的核心代碼塊也可以知道,首先創(chuàng)建了一個(gè)序列ob,它的類型上面說(shuō)過(guò)了是AnonymousObservable類型,接著訂閱信號(hào)是直接用ob調(diào)用的方法subscribe,所以這里的self表示著AnonymousObservable,它就是創(chuàng)建序列時(shí)返回的對(duì)象。
但是在上面對(duì)AnonymousObservable解析中,并沒(méi)有發(fā)現(xiàn)它擁有一個(gè)叫做subscribe的方法,再次回到AnonymousObservable類的源碼中,可以發(fā)現(xiàn),它是繼承自Producer類,不錯(cuò),在它的父類Producer中我們發(fā)現(xiàn)了subscribe方法。
這里停一下,因?yàn)榇藭r(shí)傳了一個(gè)參數(shù)observer,它也是一個(gè)AnonymousObserver對(duì)象,同時(shí)帶著一個(gè)尾隨閉包,暫且稱這個(gè)參數(shù)為閉包B,以便后面使用它的時(shí)候,我們可以容易想起來(lái)。
接著上面的思路,進(jìn)入Producer類中查看,在subscribe方法中,調(diào)用了一個(gè)run方法,本類中,并沒(méi)有對(duì)這個(gè)方法進(jìn)行具體實(shí)現(xiàn),而是分發(fā)給了它的子類去實(shí)現(xiàn),那么這里又需要回到AnonymousObservable類中,在上面的序列創(chuàng)建的內(nèi)容中,筆者也就和大家說(shuō)了這里run方法的重要,下面具體分析。
在此之前,需要注意的是,參數(shù)閉包B一直在傳遞。
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
創(chuàng)建了一個(gè)實(shí)例對(duì)象sink,然后調(diào)用了一個(gè)run方法。這里看不出來(lái)什么東西,進(jìn)入AnonymousObservableSink查看源碼。
可以發(fā)現(xiàn)下面幾點(diǎn)
- 第一,重寫了父類的
init方法,而傳進(jìn)來(lái)的閉包B,傳遞給了父類Sink,同時(shí)賦值給了父類的_observer屬性。 - 第二,方法
on,參數(shù)為event - 第三,方法
run,參數(shù)parent
OK,AnonymousObservableSink類的內(nèi)容,差不多就這么多,回到之前的代碼中
let subscription = sink.run(self)
這行代碼里的self表示類AnonymousObservable,我想大家應(yīng)該都沒(méi)有意見,那么具體看下這個(gè)方法的實(shí)現(xiàn)。
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
轉(zhuǎn)化一下代碼AnonymousObservable._subscribeHandler(AnyObserver(self)),這樣看的話,我相信大家應(yīng)該非常的熟悉了。
AnonymousObservable是什么? 它是在我們創(chuàng)建序列時(shí)的返回的那個(gè)對(duì)象。
_subscribeHandler是什么? 它是在我們創(chuàng)建序列時(shí)傳的參數(shù)閉包A。
那么這句話的意思就是執(zhí)行閉包A,即發(fā)送信號(hào)
發(fā)送信號(hào)
分兩步來(lái)分析AnonymousObservable._subscribeHandler(AnyObserver(self))這行代碼;
1、發(fā)送信號(hào)的閉包
回到發(fā)送信號(hào)的閉包中,
obserber.onNext("佐籩")
這里只分析onNext(),因?yàn)槠渌硪粯?,就不過(guò)多描述了??!
按住command,追蹤onNext()方法,可以跳轉(zhuǎn)到ObserverType擴(kuò)展中
public func onNext(_ element: Element) {
self.on(.next(element))
}
可以通過(guò)斷點(diǎn)調(diào)試知道,這里的self指的是AnyObserver類,或者可以回到create.swift文件中的create方法中查看

這里已經(jīng)把對(duì)應(yīng)的obserber的類型已經(jīng)描述的很清晰了。obserber為AnyObserver類型,且遵循ObserverType協(xié)議,調(diào)用的onNext()方法就是ObserverType協(xié)議里的方法(查看一下AnyObserver類的源碼便知)。
回到AnyObserver類中,查看on方法
public func on(_ event: Event<Element>) {
return self.observer(event)
}
可以斷點(diǎn)看一下,這里已經(jīng)把信號(hào)傳了進(jìn)來(lái)

那么這里的self.observer()具體是什么呢?請(qǐng)看下面的分析
2、解析 AnyObserver(self)
通過(guò)上面流程可以知道,關(guān)鍵代碼在于AnyObserver(self),通過(guò)進(jìn)入源碼查看
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}
在初始化代碼里有這么一個(gè)操作,observer.on賦值的操作,這里的observer是通過(guò)初始化構(gòu)造方法傳來(lái)的,回顧上面的流程可以知道,它是AnonymousObservableSink類。那么它執(zhí)行on方法,跟著代碼走,會(huì)調(diào)用方法forwardOn,最后會(huì)來(lái)到Sink類里的forwardOn方法,源碼如下
final func forwardOn(_ event: Event<Observer.Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
關(guān)鍵代碼在于self._observer.on(event),在訂閱信號(hào)的分析中,特別說(shuō)明了_observer是閉包B,如果這里有疑問(wèn)的小伙伴,可會(huì)回到前面看一下。
閉包B的具體結(jié)構(gòu)如下(部分代碼省略):
AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
那么對(duì)于self._observer.on(event),可以理解為類AnonymousObserver調(diào)用方法on,進(jìn)入源碼查看,AnonymousObserver類中并無(wú)該方法,然后通過(guò)它的父類ObserverBase可以找到該方法,具體實(shí)現(xiàn)如下:
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
緊接著,執(zhí)行self.onCore(event),同樣的,該方法,父類ObserverBase并沒(méi)有具體實(shí)現(xiàn),而是分發(fā)給子類去實(shí)現(xiàn),回到類AnonymousObserver,查看該方法的實(shí)現(xiàn)代碼
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
那么對(duì)于self._eventHandler()是個(gè)什么東西,可以具體分析一下,在當(dāng)前類AnonymousObserver中,
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
追溯到枚舉Event
public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
到這里,我想大家應(yīng)該可以知道下面我們應(yīng)該走到哪里了,沒(méi)錯(cuò),就是剛剛說(shuō)的閉包B,根據(jù)event執(zhí)行不同的方法,比如這里所說(shuō)的onNext方法,那么就會(huì)打印輸出訂閱到:佐籩
總結(jié)
OK,到這里關(guān)于RxSwift的核心流程的源碼分析已經(jīng)結(jié)束了,了解了其中的原理,筆者相信,對(duì)于RxSwift的運(yùn)用會(huì)更加得心應(yīng)手。
好難,筆者在整理這份筆記的時(shí)候,確實(shí)花費(fèi)了不少時(shí)間,本想畫個(gè)流程圖來(lái)幫助大家去理解,奈何時(shí)間有限,后續(xù)筆者會(huì)整理一份思路較清晰的流程圖;
上述的分析流程,如果有什么錯(cuò)誤,希望大家給筆者留言指正一下,謝謝?。?!
持續(xù)學(xué)習(xí),希望大家持續(xù)關(guān)注?。?!