Swift Combine 之 Publisher數(shù)據(jù)流

Publisher

  • 發(fā)布源協(xié)議,可以實(shí)現(xiàn)該協(xié)議來實(shí)現(xiàn)自己的數(shù)據(jù),
    • Subject繼承自Publisher,提供了三套默認(rèn)的內(nèi)置實(shí)現(xiàn)類
  • 容器包裝類,具體實(shí)現(xiàn)交由Subscriber實(shí)現(xiàn)類來轉(zhuǎn)發(fā)數(shù)據(jù)流
  • 內(nèi)置提供各種各樣的操作符(函數(shù)式編程的仿函數(shù),Swift語言的Operator)
    • allSatisfy
    • tryAllSatisfy
    • compactMap
    • contains
    • filter
    • tryFilter
    • throttle
    • ....等等
  • Publisher接口
    public protocol Publisher {
    /// 數(shù)據(jù)輸出流,相當(dāng)于訂閱者的數(shù)據(jù)輸入流        
    associatedtype Output
    /// 數(shù)據(jù)發(fā)布,要么發(fā)布一個(gè)真實(shí)數(shù)據(jù)流,要么發(fā)布一個(gè)錯(cuò)誤(或者可以選擇丟棄錯(cuò)誤,      Never忽略錯(cuò)誤) 
    associatedtype Failure: Error
 
     /// 接收數(shù)據(jù)源輸入流,并轉(zhuǎn)發(fā)給訂閱者
    func receive<Subscriber: OpenCombine.Subscriber>(subscriber: Subscriber) where Failure == Subscriber.Failure, Output ==       Subscriber.Input
  }
  • 所有的操作符的流程都類似,追蹤一個(gè)操作符的調(diào)用順序和觸發(fā)流程
  • 定義一個(gè)數(shù)組的數(shù)據(jù)流 [1, 2, 10000].publisher, publisherSequence的一個(gè)擴(kuò)展,內(nèi)部使用Publishers.Sequence進(jìn)行了包裝成了一個(gè)可以被觀察的數(shù)據(jù)流
 extension Sequence {
     public var publisher: Publishers.Sequence<Self, Never> {
         return .init(sequence: self)
     }
- `Publishers.Sequence` 內(nèi)部實(shí)現(xiàn)
 /// 序列流繼承于`Publisher` 
 public struct Sequence<Elements: Swift.Sequence, Failure: Error>: Publisher {
         /// 輸出源
         public typealias Output = Elements.Element
 
         public let sequence: Elements
 
         public init(sequence: Elements) {
             self.sequence = sequence
         }
         /// 實(shí)現(xiàn) `Publisher`協(xié)議方法`receive`
         public func receive<Downstream: Subscriber>(subscriber: Downstream)
             where Failure == Downstream.Failure,
                   Elements.Element == Downstream.Input
         {
             /// 包裝類`Inner`實(shí)現(xiàn)了`Subscription`協(xié)議,內(nèi)部持有當(dāng)前收到的`subscriber`, 進(jìn)行轉(zhuǎn)發(fā)
             let inner = Inner(downstream: subscriber, sequence: sequence)
 
             /// 判斷是否序列是否到末尾,如果序列結(jié)束發(fā)送完成事件并取消序列,數(shù)據(jù)流完成,反之,持續(xù)接收數(shù)據(jù)流
             if inner.isExhausted {
                 subscriber.receive(subscription: Subscriptions.empty)
                 subscriber.receive(completion: .finished)
                 inner.cancel()
             } else {
                 /// 內(nèi)部會(huì)調(diào)用`Subscription`協(xié)議的 `request`方法 
                 subscriber.receive(subscription: inner)
             }
         }
     }
  • Inner作為一個(gè)私有類,單獨(dú)實(shí)現(xiàn)了序列數(shù)據(jù)流的內(nèi)部數(shù)據(jù)源的流轉(zhuǎn)(每一個(gè)操作符都有一套內(nèi)部特有的Inner實(shí)現(xiàn)類),源碼我進(jìn)行了部分簡(jiǎn)化
  private final class Inner<Downstream: Subscriber, Elements: Sequence, Failure>
         : Subscription
         where Downstream.Input == Elements.Element,
               Downstream.Failure == Failure
     {
         
         typealias Iterator = Elements.Iterator
         typealias Element = Elements.Element
 
         private var sequence: Elements?
         private var downstream: Downstream?
         private var iterator: Iterator
         private var next: Element?
         private var pendingDemand = Subscribers.Demand.none
 
         /// 初始化持有的`downstream`數(shù)據(jù)流,方便后續(xù)數(shù)據(jù)流轉(zhuǎn)
         fileprivate init(downstream: Downstream, sequence: Elements) {
             self.sequence = sequence
             self.downstream = downstream
             self.iterator = sequence.makeIterator()
             next = iterator.next()
         }
         
         func request(_ demand: Subscribers.Demand) {
             guard downstream != nil else {
                 return
             }
 
             while let downstream = self.downstream, pendingDemand > 0 {
                 if let current = self.next {
                     /// 迭代數(shù)據(jù)流,依次進(jìn)行數(shù)據(jù)的轉(zhuǎn)發(fā),交給訂閱者接收
                     let additionalDemand = downstream.receive(current)
                 }
 
                 if next == nil {
                     self.downstream = nil
                     self.sequence = nil
                     /// 序列結(jié)束,發(fā)送完成事件
                     downstream.receive(completion: .finished)
                     return
                 }
             }
         }
     }
  • 上面定義了數(shù)據(jù)流源,并發(fā)出了數(shù)據(jù),等待訂閱者監(jiān)聽數(shù)據(jù)流,sinkassign操作符可以進(jìn)行訂閱,后續(xù)會(huì)列出sink和assign的源碼

代碼演示片段

         class Root: NSObject {
             var name: String = ""
         }
         
         let root = Root()
         
         let arr: [Int] = [1, 2, 100]
         
         /// 將數(shù)組轉(zhuǎn)換成一個(gè)數(shù)據(jù)流
         arr.publisher
             /// 過濾數(shù)據(jù)流中大于2的元素
             .filter{$0 > 2}
             /// 進(jìn)行一次轉(zhuǎn)換,轉(zhuǎn)成String類型
             .compactMap{"\($0)"}
             /// `Sink`訂閱數(shù)據(jù)源
             .sink { value in
                 debugPrint("數(shù)據(jù)流: \(value)")
             }.store(in: &cancel)
         
          /// 使用keypath進(jìn)行賦值
         arr.publisher
             .filter{$0 > 2}
             .compactMap{"\($0)"}
             /// `Assign` keypath 賦值
             .assign(to: \.name, on: root).store(in: &cancel)
         
         debugPrint("root name: \(root.name)")
 
        /// 控制臺(tái)輸出
        "數(shù)據(jù)流: 100"
        "root name: 100"
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容