RxSwift 核心邏輯初探

RxSwift

RxSwift: Reactive Extensions Swift,函數(shù)響應式編程的基礎框架。簡化代碼,易讀性高。

函數(shù)式,數(shù)學概念y = f(x),x也可以是個函數(shù),那么y = f(f(x))。利用面向函數(shù),可讀性更高,分段式更簡潔。
響應式,面向數(shù)據(jù)流和變化傳播的編程范式。數(shù)據(jù)流:只能以事先規(guī)定好的順序被讀取一次的數(shù)據(jù)的一個序列。變化傳播:類似觀察者模式,變化了要通知別人。

RxSwift初體驗

KVO

不需要再去寫被觀察對象的addObserver:forKeyPath:options:context: 方法注冊觀察者
觀察者也不需要實現(xiàn)observeValueForKeyPath:ofObject:change:context: 回調(diào)方法

 func setupKVO() {
        /// self.person.addObserver(self, forKeyPath: "name", options: .new, context: nil)
        self.person.rx.observeWeakly(String.self, "name")
            .subscribe(onNext: { (value) in
                print(value as Any)
            })
            .disposed(by: disposeBag)
    }
UI

button為例,業(yè)務邏輯和功能邏輯放在一起,更清晰,方便查找

 func setupButton() {
        ///直接.tap就可以實現(xiàn)touchUpInside事件
        self.button.rx.tap
            .subscribe(onNext: { () in
                print("點擊來了")
            })
            .disposed(by: disposeBag)
        ///如果不想用touchUpInside事件,可以這樣
        self.button.rx.controlEvent(.touchUpOutside)
    }
代理

可以省略代理方法實現(xiàn)

func setupTextFiled() {
        self.textFiled.rx.text.orEmpty
            .subscribe(onNext: { (text) in
               print(text)
            })
            .disposed(by: disposeBag)
        self.textFiled.rx.text
            .bind(to: self.button.rx.title())
            .disposed(by: disposeBag)

    }
通知
func setupNotification(){
        NotificationCenter.default.rx.notification(UIResponder.keyboardWillShowNotification)
            .subscribe(onNext: { (noti) in
                print(noti)
            })
        .disposed(by: disposeBag)
        
    }
手勢
func setupGestureRecognizer(){
        let tap = UITapGestureRecognizer()
        self.label.addGestureRecognizer(tap)
        self.label.isUserInteractionEnabled = true
        tap.rx.event.subscribe(onNext: { (tap) in
            print(tap.view)
        })
        .disposed(by: disposeBag)
    }
timer

核心邏輯,隔一秒鐘發(fā)送一個響應,并不是NSTimer

func setupTimer() {
        var timer: Observable<Int>!
        timer = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
        timer.subscribe(onNext: { (num) in
            print(num)
        })
        .disposed(by: disposeBag)
    }
網(wǎng)絡
  func setupNextwork() {
        let url = URL(string: "https://www.baidu.com")
      /// 原實現(xiàn)代碼
      URLSession.shared.dataTask(with: url!) { (data, response, error) in
         print(String.init(data:data!,encoding: .utf8))
      }.resume()
      /// RX實現(xiàn)
      URLSession.shared.rx.response(request: URLRequest(url: url!))
        .subscribe(onNext: { (response,data) in
            print(response)
        }).disposed(by: disposeBag)   
    }

萬物皆rx?點進去看看,會發(fā)現(xiàn)rx在ReactiveCompatible這個協(xié)議的擴展里面,

extension ReactiveCompatible {

    /// Reactive extensions.
    public static var rx: RxSwift.Reactive<Self>.Type

    /// Reactive extensions.
    public var rx: RxSwift.Reactive<Self>
}
/// A type that has reactive extensions.
public protocol ReactiveCompatible {
    /// Extended type
    associatedtype CompatibleType

    /// Reactive extensions.
    static var rx: Reactive<CompatibleType>.Type { get set }

    /// Reactive extensions.
    var rx: Reactive<CompatibleType> { get set }
}

劃重點了,所有的NSObject都遵循這個協(xié)議。

/// Extend NSObject with `rx` proxy.
extension NSObject: ReactiveCompatible { }

核心Observable可觀察序列

序列Sequence

是一系列相同類型的值的集合,并且提供對這些值的迭代能力,序列思維能讓未來的事情隨時響應,是響應式最核心的應用。包括有窮序列和無窮序列。

Observable生命周期

image.png
核心邏輯
        let ob = Observable<Any>.create { (obserber) -> Disposable in
            obserber.onNext("發(fā)送信號")
            obserber.onCompleted()
            return Disposables.create()
        }

        let _ = ob.subscribe(onNext: { (text) in
            print("訂閱到:\(text)")
        }, onError: { (error) in
            print("error: \(error)")
        }, onCompleted: {
            print("完成")
        }) {
            print("銷毀")
        }

1.創(chuàng)建序列:create
查看RxSwift的源碼,Create.swift這個文件

public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }

AnonymousObservable繼承自Producer,保存閉包subscribeHandler。Producer繼承自 Observable實現(xiàn)了subscribe方法。

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

Producer

class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

    func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        rxAbstractMethod()
    }
}

Observable

public class Observable<Element> : ObservableType {
    /// Type of elements in sequence.
    public typealias E = Element
    
    init() {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
    }
    
    public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<E> {
        return self
    }
    
    deinit {
#if TRACE_RESOURCES
        _ = Resources.decrementTotal()
#endif
    }

    // this is kind of ugly I know :(
    // Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ˉ\_(ツ)_/ˉ

    /// Optimizations for map operator
    internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
        return _map(source: self, transform: transform)
    }
}

2.訂閱序列:subscribe
點進去源碼看subscribe,會發(fā)現(xiàn)里面創(chuàng)建了一個觀察者AnonymousObserver對象,初始化里面?zhèn)鲄?,參?shù)是一個尾隨閉包,實現(xiàn)event枚舉就會調(diào)用onNext事件,在最后的return里面有個self.asObservable().subscribe(observer),asObservable()看上面的Observable ,subscribe看上面的Producer。

    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<E> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
}

3.發(fā)送信號:onNext

extension ObserverType {
    
    /// Convenience method equivalent to `on(.next(element: E))`
    ///
    /// - parameter element: Next element to send to observer(s)
    public func onNext(_ element: E) {
        self.on(.next(element))
    }
    
    /// Convenience method equivalent to `on(.completed)`
    public func onCompleted() {
        self.on(.completed)
    }
    
    /// Convenience method equivalent to `on(.error(Swift.Error))`
    /// - parameter error: Swift.Error to send to observer(s)
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

public enum Event<Element> {
    /// Next element is produced.
    case next(Element)

    /// Sequence terminated with an error.
    case error(Swift.Error)

    /// Sequence completed successfully.
    case completed
}

當序列發(fā)送onNext,就會發(fā)送Event的.next,subscribe里面的event就實現(xiàn)onNext事件。整個事件就串聯(lián)上了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容