RxSwift之scheduler

RxSwift之scheduler

在之前閱讀 RxSwift 源碼過程中,總是和 scheduler 偶遇,這次我們來正式認(rèn)識一下!

demo

Observable.of(1,2,3,4,5)
    .observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName:"observeOnSerial"))
    .subscribe{print("observeOn",$0,Thread.current)}
    .disposed(by: disposeBag)
    
輸出:
observeOn next(1) <NSThread: 0x600001eb5580>{number = 4, name = (null)}
observeOn next(2) <NSThread: 0x600001eb5580>{number = 4, name = (null)}
observeOn next(3) <NSThread: 0x600001eb5580>{number = 4, name = (null)}
observeOn next(4) <NSThread: 0x600001eb5580>{number = 4, name = (null)}
observeOn next(5) <NSThread: 0x600001eb5580>{number = 4, name = (null)}
observeOn completed <NSThread: 0x600001eb5580>{number = 4, name = (null)}

of函數(shù)

public static func of(_ elements: Element ..., 
                    scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
    return ObservableSequence(elements: elements, scheduler: scheduler)
}

ObservableSequence源序列

of(elements:,scheduler:)->Observable函數(shù)通過給定的元素,創(chuàng)建了一個可觀察序列ObservableSequence,它是Producer的子類,還有個 scheduler 參數(shù)默認(rèn)值為當(dāng)前線程調(diào)度者。在 demo 中就是主線程調(diào)度者。正是本片的主角scheduler。這里默認(rèn)的 scheduler 是CurrentThreadScheduler實例,遵守ImmediateSchedulerType協(xié)議。

final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
    fileprivate let _elements: Sequence
    fileprivate let _scheduler: ImmediateSchedulerType

    init(elements: Sequence, scheduler: ImmediateSchedulerType) {
        self._elements = elements
        self._scheduler = scheduler
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

CurrentThreadScheduler當(dāng)前線程調(diào)度者

public class CurrentThreadScheduler : ImmediateSchedulerType {
    /// The singleton instance of the current thread scheduler.
    public static let instance = CurrentThreadScheduler()
}
  1. 創(chuàng)建Producer的子類ObservableSequence
  2. ObservableSequence兩個屬性:
    1. _elements: Sequence(N個元素)
    2. _scheduler: ImmediateSchedulerType(當(dāng)前線程調(diào)度者CurrentThreadScheduler

observeOn函數(shù)

public func observeOn(_ scheduler: ImmediateSchedulerType) -> Observable<Element> {
    if let scheduler = scheduler as? SerialDispatchQueueScheduler {
        return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
    } else {
        return ObserveOn(source: self.asObservable(), scheduler: scheduler)
    }
}

ObservableSequence在調(diào)用observeOn時,根據(jù) scheduler 是串行還是并行去創(chuàng)建不同的類型的中間序列。我們先關(guān)注串行隊列來學(xué)習(xí)。這里是創(chuàng)建的ObserveOnSerialDispatchQueue中間序列。

ObserveOnSerialDispatchQueue中間序列

final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
    let scheduler: SerialDispatchQueueScheduler
    let source: Observable<Element>

    init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
        self.scheduler = scheduler
        self.source = source
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, 
                                                    observer: observer, 
                                                    cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

中間序列初始化時保存了源序列和串行隊列。

SerialDispatchQueueScheduler串行調(diào)度者

public class SerialDispatchQueueScheduler : SchedulerType {
    let configuration: DispatchQueueConfiguration
    
    init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
    }

    public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
        serialQueueConfiguration?(queue)
        self.init(serialQueue: queue, leeway: leeway)
    }
}

這里貼出串行隊列調(diào)度者的代碼,可以和前面of函數(shù)部分相呼應(yīng)??梢郧宄目吹?,序列、調(diào)度者,還有訂閱后產(chǎn)生的觀察者、管道都是兩套的。

  • 中間序列ObserveOnSerialDispatchQueue兩個屬性:
    • scheduler: SerialDispatchQueueScheduler(串行調(diào)度者,SerialDispatchQueueScheduler
    • source: Observable<Element>(源序列,ObservableSequence

subscribe函數(shù)

好戲開始!隨著 subscribe 的調(diào)用,在創(chuàng)建匿名觀察者后會走父類 Producer 的 subscribe 中:

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    if !CurrentThreadScheduler.isScheduleRequired {
        let disposer = SinkDisposer()
        let sinkAndSubscription = self.run(observer, cancel: disposer)
        disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

        return disposer
    } else {
        return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
    }
}

之前在學(xué)習(xí)RxSwift核心邏輯的時候,也經(jīng)常路過這部分流程,這次我們看看這個分支到底是什么?CurrentThreadScheduler.isScheduleRequired,當(dāng)前線程調(diào)度者是否必需調(diào)用 schedule ?

public class CurrentThreadScheduler : ImmediateSchedulerType {
    private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
        let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
        defer { key.deallocate() }
                                                               
        guard pthread_key_create(key, nil) == 0 else {
            rxFatalError("isScheduleRequired key creation failed")
        }

        return key.pointee
    }()

    static var queue : ScheduleQueue? {
        get {
            return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
        }
        set {
            Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
        }
    }

    /// Gets a value that indicates whether the caller must call a `schedule` method.
    public static fileprivate(set) var isScheduleRequired: Bool {
        get {
            return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
        }
        set(isScheduleRequired) {
            if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
                rxFatalError("pthread_setspecific failed")
            }
        }
    }
    
    public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        if CurrentThreadScheduler.isScheduleRequired {
            CurrentThreadScheduler.isScheduleRequired = false

            let disposable = action(state)

            defer {
                CurrentThreadScheduler.isScheduleRequired = true
                CurrentThreadScheduler.queue = nil
            }

            guard let queue = CurrentThreadScheduler.queue else {
                return disposable
            }

            while let latest = queue.value.dequeue() {
                if latest.isDisposed {
                    continue
                }
                latest.invoke()
            }

            return disposable
        }

        let existingQueue = CurrentThreadScheduler.queue

        let queue: RxMutableBox<Queue<ScheduledItemType>>
        if let existingQueue = existingQueue {
            queue = existingQueue
        }
        else {
            queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
            CurrentThreadScheduler.queue = queue
        }

        let scheduledItem = ScheduledItem(action: action, state: state)
        queue.value.enqueue(scheduledItem)

        return scheduledItem
    }
}

CurrentThreadScheduler調(diào)用isScheduleRequiredKeygetter時,用線程中存儲的pthread_key_t變量來標(biāo)記是否需要調(diào)用 schedule 函數(shù)。pthread_key_t變量的設(shè)置也是在isScheduleRequiredKeysetter中,后面在調(diào)用 schedule 函數(shù)后,第一步就是把isScheduleRequiredKey設(shè)置為false,pthread_key_t也會被創(chuàng)建pthread_key_create。

現(xiàn)在subscribe中會走 else 部分,當(dāng)前線程調(diào)用schedule函數(shù),傳參一個元組和action閉包。

schedule函數(shù)中,第一個條件語句進(jìn)來后,馬上就置反了isScheduleRequiredKey。然后就開始回調(diào)action閉包。

中間序列ObserveOnSerialDispatchQueue開始run

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
    let subscription = self.source.subscribe(sink)
    return (sink: sink, subscription: subscription)
}

ObserveOnSerialDispatchQueueSink初始化時,除了保存串行隊列調(diào)度者、匿名觀察者、銷毀者之外,還設(shè)置了一個匿名閉包:cachedScheduleLambda。

init(scheduler: SerialDispatchQueueScheduler, observer: Observer, cancel: Cancelable) {
    self.scheduler = scheduler
    self.observer = observer
    self.cancel = cancel
    super.init()

    self.cachedScheduleLambda = { pair in
        guard !cancel.isDisposed else { return Disposables.create() }
        pair.sink.observer.on(pair.event)
        if pair.event.isStopEvent {
            pair.sink.dispose()
        }
        return Disposables.create()
    }
}

創(chuàng)建完中間序列管道ObserveOnSerialDispatchQueueSink后,就是源序列開始訂閱,把中間序列管道傳入,最終作為源序列管道的觀察者。此時,因為之前CurrentThreadScheduler.isScheduleRequired已經(jīng)是false,所以走的是 Producer 的 subscribe 函數(shù)中第一個分支里面。然后源序列管道run,開始響應(yīng)訂閱消息。

func run() -> Disposable {
    return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
        var mutableIterator = iterator
        if let next = mutableIterator.next() {
            self.forwardOn(.next(next))
            recurse(mutableIterator)
        } else {
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

這里面的代碼確是讓源序列的調(diào)度者(當(dāng)前線程)去遞歸回調(diào)的。把源序列的元素迭代器(state)和響應(yīng)信號的閉包傳入。當(dāng)前線程調(diào)度者是遵守ImmediateSchedulerType協(xié)議的,我們在協(xié)議拓展中可以看到scheduleRecursive的實現(xiàn):

public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
    let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
    recursiveScheduler.schedule(state)
    return Disposables.create(with: recursiveScheduler.dispose)
}

重點是RecursiveImmediateSchedulerschedule

func schedule(_ state: State) {
    var scheduleState: ScheduleState = .initial

    let d = self._scheduler.schedule(state) { state -> Disposable in
        // best effort
        if self._group.isDisposed {
            return Disposables.create()
        }
            
        let action = self._lock.calculateLocked { () -> Action? in
            switch scheduleState {
            case let .added(removeKey):
                self._group.remove(for: removeKey)
            case .initial:
                break
            case .done:
                break
            }

            scheduleState = .done

            return self._action
        }
            
        if let action = action {
            action(state, self.schedule)
        }
            
        return Disposables.create()
    }
        
    self._lock.performLocked {
        switch scheduleState {
            case .added:
                rxFatalError("Invalid state")
            case .initial:
                if let removeKey = self._group.insert(d) {
                    scheduleState = .added(removeKey)
                }
                else {
                    scheduleState = .done
                }
            case .done:
                break
        }
    }
}

看到這里真的懵了,之前不是就在CurrentThreadScheduler.instance.schedule里面么?怎么又回到原點了?鬼撞墻?兜還在伊邪那美里面砍鼬。。。

從中間序列 subscribe 開始,到第一次判斷是否必需執(zhí)行 schedule 函數(shù),條件語句中開始回調(diào)let disposable = action(state),這個action(state)執(zhí)行到源序列的調(diào)度者遞歸調(diào)用中又是self._scheduler.schedule。

靜心分析一下,只有第一次會走if語句里面的,這次也是直接走了if后面的。獲取到當(dāng)前線程的隊列,把stateaction包裝成scheduledItem,然后入隊:queue.value.enqueue(scheduledItem)??梢钥吹角懊娴?code>if里還有出隊的代碼:queue.value.dequeue(),還沒有執(zhí)行到。這次調(diào)用 schedule 應(yīng)該是為了管理隊列。

代碼走完,我們繼續(xù)回到RecursiveImmediateSchedulerschedule下半部分,看起來像是對訂閱生命周期的管理。

此時此刻才執(zhí)行完了我們第一次CurrentThreadScheduler.instance.scheduleaction(state)的回調(diào)。后面的主要就是scheduledItem的調(diào)用了:latest.invoke()

func invoke() {
    self._disposable.setDisposable(self._action(self._state))
}

然后才是之前那些閉包的回調(diào):

  • latest.invoke()
  • self._disposable.setDisposable(self._action(self._state))
  • RecursiveImmediateSchedulerschedule里第二次調(diào)用 schedule:self._scheduler.schedule的閉包
  • action(state, self.schedule)
  • 源序列管道run中的閉包
  • self.forwardOn(.next(next))
  • self._observer.on(event)

前面我們記得源序列管道中的觀察者是中間序列管道,那么就會到中間序列管道的onCore里面。

override func onCore(_ event: Event<Element>) {
    _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}

又是它,又是 schedule 函數(shù)。不過這次的調(diào)用者是中間序列的串行調(diào)度者SerialDispatchQueueScheduler

func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    let cancel = SingleAssignmentDisposable()
    self.queue.async {
        if cancel.isDisposed {
            return
        }
        cancel.setDisposable(action(state))
    }
    return cancel
}

很清楚了,self.queue.async,用pthread_getspecific找到這個線程的隊列,然后異步執(zhí)行。action 就是之前設(shè)置的匿名閉包:cachedScheduleLambda

self.cachedScheduleLambda = { pair in
    guard !cancel.isDisposed else { return Disposables.create() }
    pair.sink.observer.on(pair.event)
    if pair.event.isStopEvent {
        pair.sink.dispose()
    }
    return Disposables.create()
}

閉包里就是讓中間序列管道的觀察者去響應(yīng)外界的訂閱:pair.sink.observer.on(pair.event)。

這樣就完成了主線程中的序列發(fā)出的信號在子線程中響應(yīng)外界的訂閱。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容