RxSwift之scheduler
在之前閱讀 RxSwift 源碼過程中,總是和 scheduler 偶遇,這次我們來正式認(rèn)識一下!
demo
Observable.of(1,2,3,4,5)
.observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName:"observeOnSerial"))
.subscribe{print("observeOn",$0,Thread.current)}
.disposed(by: disposeBag)
輸出:
observeOn next(1) <NSThread: 0x600001eb5580>{number = 4, name = (null)}
observeOn next(2) <NSThread: 0x600001eb5580>{number = 4, name = (null)}
observeOn next(3) <NSThread: 0x600001eb5580>{number = 4, name = (null)}
observeOn next(4) <NSThread: 0x600001eb5580>{number = 4, name = (null)}
observeOn next(5) <NSThread: 0x600001eb5580>{number = 4, name = (null)}
observeOn completed <NSThread: 0x600001eb5580>{number = 4, name = (null)}
of函數(shù)
public static func of(_ elements: Element ...,
scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
return ObservableSequence(elements: elements, scheduler: scheduler)
}
ObservableSequence源序列
of(elements:,scheduler:)->Observable函數(shù)通過給定的元素,創(chuàng)建了一個可觀察序列ObservableSequence,它是Producer的子類,還有個 scheduler 參數(shù)默認(rèn)值為當(dāng)前線程調(diào)度者。在 demo 中就是主線程調(diào)度者。正是本片的主角scheduler。這里默認(rèn)的 scheduler 是CurrentThreadScheduler實例,遵守ImmediateSchedulerType協(xié)議。
final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
fileprivate let _elements: Sequence
fileprivate let _scheduler: ImmediateSchedulerType
init(elements: Sequence, scheduler: ImmediateSchedulerType) {
self._elements = elements
self._scheduler = scheduler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
CurrentThreadScheduler當(dāng)前線程調(diào)度者
public class CurrentThreadScheduler : ImmediateSchedulerType {
/// The singleton instance of the current thread scheduler.
public static let instance = CurrentThreadScheduler()
}
- 創(chuàng)建
Producer的子類ObservableSequenceObservableSequence兩個屬性:
- _elements: Sequence(N個元素)
- _scheduler: ImmediateSchedulerType(當(dāng)前線程調(diào)度者
CurrentThreadScheduler)
observeOn函數(shù)
public func observeOn(_ scheduler: ImmediateSchedulerType) -> Observable<Element> {
if let scheduler = scheduler as? SerialDispatchQueueScheduler {
return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
} else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
}
ObservableSequence在調(diào)用observeOn時,根據(jù) scheduler 是串行還是并行去創(chuàng)建不同的類型的中間序列。我們先關(guān)注串行隊列來學(xué)習(xí)。這里是創(chuàng)建的ObserveOnSerialDispatchQueue中間序列。
ObserveOnSerialDispatchQueue中間序列
final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
let scheduler: SerialDispatchQueueScheduler
let source: Observable<Element>
init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
self.scheduler = scheduler
self.source = source
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler,
observer: observer,
cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
中間序列初始化時保存了源序列和串行隊列。
SerialDispatchQueueScheduler串行調(diào)度者
public class SerialDispatchQueueScheduler : SchedulerType {
let configuration: DispatchQueueConfiguration
init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
}
public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
serialQueueConfiguration?(queue)
self.init(serialQueue: queue, leeway: leeway)
}
}
這里貼出串行隊列調(diào)度者的代碼,可以和前面of函數(shù)部分相呼應(yīng)??梢郧宄目吹?,序列、調(diào)度者,還有訂閱后產(chǎn)生的觀察者、管道都是兩套的。
- 中間序列
ObserveOnSerialDispatchQueue兩個屬性:
- scheduler: SerialDispatchQueueScheduler(串行調(diào)度者,
SerialDispatchQueueScheduler)- source: Observable<Element>(源序列,
ObservableSequence)
subscribe函數(shù)
好戲開始!隨著 subscribe 的調(diào)用,在創(chuàng)建匿名觀察者后會走父類 Producer 的 subscribe 中:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
} else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
之前在學(xué)習(xí)RxSwift核心邏輯的時候,也經(jīng)常路過這部分流程,這次我們看看這個分支到底是什么?CurrentThreadScheduler.isScheduleRequired,當(dāng)前線程調(diào)度者是否必需調(diào)用 schedule ?
public class CurrentThreadScheduler : ImmediateSchedulerType {
private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
defer { key.deallocate() }
guard pthread_key_create(key, nil) == 0 else {
rxFatalError("isScheduleRequired key creation failed")
}
return key.pointee
}()
static var queue : ScheduleQueue? {
get {
return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
}
set {
Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
}
}
/// Gets a value that indicates whether the caller must call a `schedule` method.
public static fileprivate(set) var isScheduleRequired: Bool {
get {
return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set(isScheduleRequired) {
if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
rxFatalError("pthread_setspecific failed")
}
}
}
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
if CurrentThreadScheduler.isScheduleRequired {
CurrentThreadScheduler.isScheduleRequired = false
let disposable = action(state)
defer {
CurrentThreadScheduler.isScheduleRequired = true
CurrentThreadScheduler.queue = nil
}
guard let queue = CurrentThreadScheduler.queue else {
return disposable
}
while let latest = queue.value.dequeue() {
if latest.isDisposed {
continue
}
latest.invoke()
}
return disposable
}
let existingQueue = CurrentThreadScheduler.queue
let queue: RxMutableBox<Queue<ScheduledItemType>>
if let existingQueue = existingQueue {
queue = existingQueue
}
else {
queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
CurrentThreadScheduler.queue = queue
}
let scheduledItem = ScheduledItem(action: action, state: state)
queue.value.enqueue(scheduledItem)
return scheduledItem
}
}
CurrentThreadScheduler調(diào)用isScheduleRequiredKey的getter時,用線程中存儲的pthread_key_t變量來標(biāo)記是否需要調(diào)用 schedule 函數(shù)。pthread_key_t變量的設(shè)置也是在isScheduleRequiredKey的setter中,后面在調(diào)用 schedule 函數(shù)后,第一步就是把isScheduleRequiredKey設(shè)置為false,pthread_key_t也會被創(chuàng)建pthread_key_create。
現(xiàn)在subscribe中會走 else 部分,當(dāng)前線程調(diào)用schedule函數(shù),傳參一個元組和action閉包。
schedule函數(shù)中,第一個條件語句進(jìn)來后,馬上就置反了isScheduleRequiredKey。然后就開始回調(diào)action閉包。
中間序列ObserveOnSerialDispatchQueue開始run:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
ObserveOnSerialDispatchQueueSink初始化時,除了保存串行隊列調(diào)度者、匿名觀察者、銷毀者之外,還設(shè)置了一個匿名閉包:cachedScheduleLambda。
init(scheduler: SerialDispatchQueueScheduler, observer: Observer, cancel: Cancelable) {
self.scheduler = scheduler
self.observer = observer
self.cancel = cancel
super.init()
self.cachedScheduleLambda = { pair in
guard !cancel.isDisposed else { return Disposables.create() }
pair.sink.observer.on(pair.event)
if pair.event.isStopEvent {
pair.sink.dispose()
}
return Disposables.create()
}
}
創(chuàng)建完中間序列管道ObserveOnSerialDispatchQueueSink后,就是源序列開始訂閱,把中間序列管道傳入,最終作為源序列管道的觀察者。此時,因為之前CurrentThreadScheduler.isScheduleRequired已經(jīng)是false,所以走的是 Producer 的 subscribe 函數(shù)中第一個分支里面。然后源序列管道run,開始響應(yīng)訂閱消息。
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator
if let next = mutableIterator.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
} else {
self.forwardOn(.completed)
self.dispose()
}
}
}
這里面的代碼確是讓源序列的調(diào)度者(當(dāng)前線程)去遞歸回調(diào)的。把源序列的元素迭代器(state)和響應(yīng)信號的閉包傳入。當(dāng)前線程調(diào)度者是遵守ImmediateSchedulerType協(xié)議的,我們在協(xié)議拓展中可以看到scheduleRecursive的實現(xiàn):
public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
recursiveScheduler.schedule(state)
return Disposables.create(with: recursiveScheduler.dispose)
}
重點是RecursiveImmediateScheduler的schedule:
func schedule(_ state: State) {
var scheduleState: ScheduleState = .initial
let d = self._scheduler.schedule(state) { state -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
}
let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
break
case .done:
break
}
scheduleState = .done
return self._action
}
if let action = action {
action(state, self.schedule)
}
return Disposables.create()
}
self._lock.performLocked {
switch scheduleState {
case .added:
rxFatalError("Invalid state")
case .initial:
if let removeKey = self._group.insert(d) {
scheduleState = .added(removeKey)
}
else {
scheduleState = .done
}
case .done:
break
}
}
}
看到這里真的懵了,之前不是就在CurrentThreadScheduler.instance.schedule里面么?怎么又回到原點了?鬼撞墻?兜還在伊邪那美里面砍鼬。。。
從中間序列 subscribe 開始,到第一次判斷是否必需執(zhí)行 schedule 函數(shù),條件語句中開始回調(diào)let disposable = action(state),這個action(state)執(zhí)行到源序列的調(diào)度者遞歸調(diào)用中又是self._scheduler.schedule。
靜心分析一下,只有第一次會走if語句里面的,這次也是直接走了if后面的。獲取到當(dāng)前線程的隊列,把state和action包裝成scheduledItem,然后入隊:queue.value.enqueue(scheduledItem)??梢钥吹角懊娴?code>if里還有出隊的代碼:queue.value.dequeue(),還沒有執(zhí)行到。這次調(diào)用 schedule 應(yīng)該是為了管理隊列。
代碼走完,我們繼續(xù)回到RecursiveImmediateScheduler的schedule下半部分,看起來像是對訂閱生命周期的管理。
此時此刻才執(zhí)行完了我們第一次CurrentThreadScheduler.instance.schedule中action(state)的回調(diào)。后面的主要就是scheduledItem的調(diào)用了:latest.invoke():
func invoke() {
self._disposable.setDisposable(self._action(self._state))
}
然后才是之前那些閉包的回調(diào):
latest.invoke()self._disposable.setDisposable(self._action(self._state))-
RecursiveImmediateScheduler的schedule里第二次調(diào)用 schedule:self._scheduler.schedule的閉包 action(state, self.schedule)- 源序列管道
run中的閉包 self.forwardOn(.next(next))self._observer.on(event)
前面我們記得源序列管道中的觀察者是中間序列管道,那么就會到中間序列管道的onCore里面。
override func onCore(_ event: Event<Element>) {
_ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}
又是它,又是 schedule 函數(shù)。不過這次的調(diào)用者是中間序列的串行調(diào)度者SerialDispatchQueueScheduler:
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
很清楚了,self.queue.async,用pthread_getspecific找到這個線程的隊列,然后異步執(zhí)行。action 就是之前設(shè)置的匿名閉包:cachedScheduleLambda :
self.cachedScheduleLambda = { pair in
guard !cancel.isDisposed else { return Disposables.create() }
pair.sink.observer.on(pair.event)
if pair.event.isStopEvent {
pair.sink.dispose()
}
return Disposables.create()
}
閉包里就是讓中間序列管道的觀察者去響應(yīng)外界的訂閱:pair.sink.observer.on(pair.event)。
這樣就完成了主線程中的序列發(fā)出的信號在子線程中響應(yīng)外界的訂閱。