RxSwift把我們程序中每一個(gè)操作都看成一個(gè)事件,比如一個(gè)TextField中的文本改變,一個(gè)按鈕被點(diǎn)擊,或者一個(gè)網(wǎng)絡(luò)請(qǐng)求結(jié)束等,每一個(gè)事件源就可以看成一個(gè)管道,也就是sequence,比如TextField,當(dāng)我們改變里面的文本的時(shí)候,這個(gè)TextField就會(huì)不斷的發(fā)出事件,從他的這個(gè)sequence中不斷的流出,我們只需要監(jiān)聽(tīng)這個(gè)sequence,每流出一個(gè)事件就做相應(yīng)的處理。同理,Button也是一個(gè)sequence,每點(diǎn)擊一次就流出一個(gè)事件。也就是我們把每一步都想成是一個(gè)事件就好去理解RxSwift了??聪聢D是不是很好理解了?
Observable和Observer
理解了觀察者模式這兩個(gè)概念就很好理解了,Observable就是可被觀察的,也就是我們說(shuō)的寶寶,他也是事件源。而Observer就是我們的觀察者,也就是當(dāng)收到事件的時(shí)候去做某些處理的爸爸媽媽。觀察者需要去訂閱(subscribe)被觀察者,才能收到Observable的事件通知消息。
創(chuàng)建和訂閱被觀察者
下面創(chuàng)建被觀察者其實(shí)就是創(chuàng)建一個(gè)Obserable的sequence,就是創(chuàng)建一個(gè)流,然后就可以被訂閱subscribe,這樣被觀察者發(fā)出時(shí)間消失,我們就能做相應(yīng)的處理
DisposeBag
DisposeBag其實(shí)就相當(dāng)于iOS中的ARC似得,會(huì)在適當(dāng)?shù)臅r(shí)候銷(xiāo)毀觀察者,相當(dāng)于內(nèi)存管理者吧。
subscribe
subscribe是訂閱sequence發(fā)出的事件,比如next事件,error事件等。而subscribe(onNext:)是監(jiān)聽(tīng)sequence發(fā)出的next事件中的element進(jìn)行處理,他會(huì)忽略error和completed事件。相對(duì)應(yīng)的還有subscribe(onError:) 和 subscribe(onCompleted:)
never
never就是創(chuàng)建一個(gè)sequence,但是不發(fā)出任何事件信號(hào)。
let disposeBag = DisposeBag()
let neverSequence = Observable<String>.never()
let neverSequenceSubscription = neverSequence
.subscribe { _ in
print("This will never be printed")
}.addDisposableTo(disposeBag)`
結(jié)果是什么都不打印
empty
empty就是創(chuàng)建一個(gè)空的sequence,只能發(fā)出一個(gè)completed事件
let disposeBag = DisposeBag()
Observable<Int>.empty()
.subscribe { event in
print(event)
}
.addDisposableTo(disposeBag)
completed
just
just是創(chuàng)建一個(gè)sequence只能發(fā)出一種特定的事件,能正常結(jié)束
let disposeBag = DisposeBag()
Observable.just("??")
.subscribe { event in
print(event)
}
.addDisposableTo(disposeBag)
next(??)
completed
of
of是創(chuàng)建一個(gè)sequence能發(fā)出很多種事件信號(hào)
let disposeBag = DisposeBag()
Observable.of("??", "??", "??", "??")
.subscribe(onNext: { element in
print(element)
})
.addDisposableTo(disposeBag)
??
??
??
如果把上面的onNext:去掉的話,結(jié)果會(huì)是這樣子,也正好對(duì)應(yīng)了我們subscribe中,subscribe只監(jiān)聽(tīng)事件。
next(??)
next(??)
next(??)
next(??)
completed
from
from就是從集合中創(chuàng)建sequence,例如數(shù)組,字典或者Set
let disposeBag = DisposeBag()
Observable.from(["??", "??", "??", "??"])
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
create
我們也可以自定義可觀察的sequence,那就是使用create
create操作符傳入一個(gè)觀察者observer,然后調(diào)用observer的onNext,onCompleted和onError方法。返回一個(gè)可觀察的obserable序列
let disposeBag = DisposeBag()
let myJust = { (element: String) -> Observable<String> in
return Observable.create { observer in
observer.on(.next(element))
observer.on(.completed)
return Disposables.create()
}
}
myJust("??")
.subscribe { print($0) }
.addDisposableTo(disposeBag)
next(??)
completed
range
range就是創(chuàng)建一個(gè)sequence,他會(huì)發(fā)出這個(gè)范圍中的從開(kāi)始到結(jié)束的所有事件
let disposeBag = DisposeBag()
Observable.range(start: 1, count: 10)
.subscribe { print($0) }
.addDisposableTo(disposeBag)
next(1)
next(2)
next(3)
next(4)
next(5)
next(6)
next(7)
next(8)
next(9)
next(10)
completed
repeatElement
let disposeBag = DisposeBag()
Observable.repeatElement("??")
.take(3)
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
??
??
??
generate
let disposeBag = DisposeBag()
Observable.generate(
initialState: 0,
condition: { $0 < 3 },
iterate: { $0 + 1 }
)
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
0
1
2
deferred
deferred會(huì)為每一為訂閱者observer創(chuàng)建一個(gè)新的可觀察序列
let disposeBag = DisposeBag()
var count = 1
let deferredSequence = Observable<String>.deferred {
print("Creating \(count)")
count += 1
return Observable.create { observer in
print("Emitting...")
observer.onNext("??")
observer.onNext("??")
observer.onNext("??")
return Disposables.create()
}
}
deferredSequence
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
deferredSequence
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
Creating 1
Emitting...
??
??
??
Creating 2
Emitting...
??
??
??
error
創(chuàng)建一個(gè)可觀察序列,但不發(fā)出任何正常的事件,只發(fā)出error事件并結(jié)束
let disposeBag = DisposeBag()
Observable<Int>.error(TestError.test)
.subscribe { print($0) }
.addDisposableTo(disposeBag)
error(test)
doOn
doOn我感覺(jué)就是在直接onNext處理時(shí)候,先執(zhí)行某個(gè)方法,doOnNext( :)方法就是在subscribe(onNext:)前調(diào)用,doOnCompleted(:)就是在subscribe(onCompleted:)前面調(diào)用的。
let disposeBag = DisposeBag()
Observable.of("??", "??", "??", "??")
.do(onNext: { print("Intercepted:", $0) }, onError: { print("Intercepted error:", $0) }, onCompleted: { print("Completed") })
.subscribe(onNext: { print($0) },onCompleted: { print("結(jié)束") })
.addDisposableTo(disposeBag)
Intercepted: ??
??
Intercepted: ??
??
Intercepted: ??
??
Intercepted: ??
??
Completed
結(jié)束
學(xué)會(huì)使用Subjects
Subjet是observable和Observer之間的橋梁,一個(gè)Subject既是一個(gè)Obserable也是一個(gè)Observer,他既可以發(fā)出事件,也可以監(jiān)聽(tīng)事件。
PublishSubject
當(dāng)你訂閱PublishSubject的時(shí)候,你只能接收到訂閱他之后發(fā)生的事件。subject.onNext()發(fā)出onNext事件,對(duì)應(yīng)的還有onError()和onCompleted()事件
let disposeBag = DisposeBag()
let subject = PublishSubject<String>()
subject.addObserver("1").addDisposableTo(disposeBag)
subject.onNext("??")
subject.onNext("??")
subject.addObserver("2").addDisposableTo(disposeBag)
subject.onNext("???")
subject.onNext("???")`
Subscription: 1 Event: next(??)
Subscription: 1 Event: next(??)
Subscription: 1 Event: next(???)
Subscription: 2 Event: next(???)
Subscription: 1 Event: next(???)
Subscription: 2 Event: next(???)`
ReplaySubject
當(dāng)你訂閱ReplaySubject的時(shí)候,你可以接收到訂閱他之后的事件,但也可以接受訂閱他之前發(fā)出的事件,接受幾個(gè)事件取決與bufferSize的大小
let disposeBag = DisposeBag()
let subject = ReplaySubject<String>.create(bufferSize: 1)
subject.addObserver("1").addDisposableTo(disposeBag)
subject.onNext("??")
subject.onNext("??")
subject.addObserver("2").addDisposableTo(disposeBag)
subject.onNext("???")
subject.onNext("???")
Subscription: 1 Event: next(??)
Subscription: 1 Event: next(??)
Subscription: 2 Event: next(??) //訂閱之后還可以接受一次前面發(fā)出的事件
Subscription: 1 Event: next(???)
Subscription: 2 Event: next(???)
Subscription: 1 Event: next(???)
Subscription: 2 Event: next(???)
BehaviorSubject
當(dāng)你訂閱了BehaviorSubject,你會(huì)接受到訂閱之前的最后一個(gè)事件。
let disposeBag = DisposeBag()
let subject = BehaviorSubject(value: "??")
subject.addObserver("1").addDisposableTo(disposeBag)
subject.onNext("??")
subject.onNext("??")
subject.addObserver("2").addDisposableTo(disposeBag)
subject.onNext("???")
subject.onNext("???")
subject.addObserver("3").addDisposableTo(disposeBag)
subject.onNext("??")
subject.onNext("??")
Subscription: 1 Event: next(??)
Subscription: 1 Event: next(??)
Subscription: 1 Event: next(??)
Subscription: 2 Event: next(??) //訂閱之前的最后一個(gè)事件
Subscription: 1 Event: next(???)
Subscription: 2 Event: next(???)
Subscription: 1 Event: next(???)
Subscription: 2 Event: next(???)
Subscription: 3 Event: next(???) //訂閱之前的最后一個(gè)事件
Subscription: 1 Event: next(??)
Subscription: 3 Event: next(??)
Subscription: 2 Event: next(??)
Subscription: 1 Event: next(??)
Subscription: 3 Event: next(??)
Subscription: 2 Event: next(??)
PublishSubject, ReplaySubject和BehaviorSubject是不會(huì)自動(dòng)發(fā)出completed事件的。
Variable
Variable是BehaviorSubject一個(gè)包裝箱,就像是一個(gè)箱子一樣,使用的時(shí)候需要調(diào)用asObservable()拆箱,里面的value是一個(gè)BehaviorSubject,他不會(huì)發(fā)出error事件,但是會(huì)自動(dòng)發(fā)出completed事件
let disposeBag = DisposeBag()
let variable = Variable("??")
variable.asObservable().addObserver("1").addDisposableTo(disposeBag)
variable.value = "??"
variable.value = "??"
variable.asObservable().addObserver("2").addDisposableTo(disposeBag)
variable.value = "???"
variable.value = "???"
Subscription: 1 Event: next(??)
Subscription: 1 Event: next(??)
Subscription: 1 Event: next(??)
Subscription: 2 Event: next(??)
Subscription: 1 Event: next(???)
Subscription: 2 Event: next(???)
Subscription: 1 Event: next(???)
Subscription: 2 Event: next(???)
Subscription: 1 Event: completed
Subscription: 2 Event: completed
聯(lián)合操作
聯(lián)合操作就是把多個(gè)Observable流合成單個(gè)Observable流
startWith
在發(fā)出事件消息之前,先發(fā)出某個(gè)特定的事件消息。比如發(fā)出事件2 ,3然后我startWith(1),那么就會(huì)先發(fā)出1,然后2 ,3.

let disposeBag = DisposeBag()
Observable.of("2", "3")
.startWith("1")
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
1
2
3
merge
合并兩個(gè)Observable流合成單個(gè)Observable流,根據(jù)時(shí)間軸發(fā)出對(duì)應(yīng)的事件

let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable.of(subject1, subject2)
.merge()
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
subject1.onNext("???")
subject1.onNext("???")
subject2.onNext("①")
subject2.onNext("②")
subject1.onNext("??")
subject2.onNext("③")
???
???
①
②
??
③
zip
let disposeBag = DisposeBag()
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
Observable.zip(stringSubject, intSubject) { stringElement, intElement in
"\(stringElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
stringSubject.onNext("???")
stringSubject.onNext("???")
intSubject.onNext(1)
intSubject.onNext(2)
stringSubject.onNext("??")
intSubject.onNext(3)
??? 1 將stringSubject和intSubject壓縮到一起共同處理
??? 2
?? 3
combineLatest
綁定超過(guò)最多不超過(guò)8個(gè)的Observable流,結(jié)合在一起處理。和Zip不同的是combineLatest是一個(gè)流的事件對(duì)應(yīng)另一個(gè)流的最新的事件,兩個(gè)事件都會(huì)是最新的事件,可將下圖與Zip的圖進(jìn)行對(duì)比。
let disposeBag = DisposeBag()
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
Observable.combineLatest(stringSubject, intSubject) { stringElement, intElement in
"\(stringElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
stringSubject.onNext("???")
stringSubject.onNext("???")
intSubject.onNext(1)
intSubject.onNext(2)
stringSubject.onNext("??")
??? 1
??? 2
?? 2
switchLatest
switchLatest可以對(duì)事件流進(jìn)行轉(zhuǎn)換,本來(lái)監(jiān)聽(tīng)的subject1,我可以通過(guò)更改variable里面的value更換事件源。變成監(jiān)聽(tīng)subject2了
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "??")
let subject2 = BehaviorSubject(value: "??")
let variable = Variable(subject1)
variable.asObservable()
.switchLatest()
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
subject1.onNext("??")
subject1.onNext("??")
variable.value = subject2
subject1.onNext("??")
subject2.onNext("??")
variable.value = subject1
subject2.onNext("田騰飛")
subject1.onNext("沸騰天"
??
??
??
??
??
??
沸騰天
變換操作
map
通過(guò)傳入一個(gè)函數(shù)閉包把原來(lái)的sequence轉(zhuǎn)變?yōu)橐粋€(gè)新的sequence的操作
et disposeBag = DisposeBag()
Observable.of(1, 2, 3)
.map { $0 * $0 }
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
1 每一個(gè)元素自己相乘
4
9
flatMap
將一個(gè)sequence轉(zhuǎn)換為一個(gè)sequences,當(dāng)你接收一個(gè)sequence的事件,你還想接收其他sequence發(fā)出的事件的話可以使用flatMap,她會(huì)將每一個(gè)sequence事件進(jìn)行處理以后,然后再以一個(gè)sequence形式發(fā)出事件。而且flatMap有一次拆包動(dòng)作,請(qǐng)看代碼解析。
let disposeBag = DisposeBag()
struct Player {
var score: Variable<Int> //里面是一個(gè)Variable
}
let ???? = Player(score: Variable(80))
let ???? = Player(score: Variable(90))
let ?? = Player(score: Variable(550))
let player = Variable(????) //將player轉(zhuǎn)為Variable
player.asObservable() //拆箱轉(zhuǎn)成可被監(jiān)聽(tīng)的sequence
.flatMap { $0.score.asObservable() } // flatMap有一次拆包動(dòng)作,$0本來(lái)應(yīng)該是一個(gè)BehaviorSubject類(lèi)型,但是直接訪問(wèn)了score。所以猜想flatMap對(duì)behaviorSubject進(jìn)行了onNext拆包取數(shù)據(jù)
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
????.score.value = 85
player.value = ???? //更換了value,相當(dāng)于又添加了一個(gè)sequence,兩個(gè)sequence都可以接收
????.score.value = 95
????.score.value = 222
player.value = ??
????.score.value = 100`
80
85
90
95
222
550
100
flatMapLatest
flatMapLatest只會(huì)接收最新的value事件,將上例改為flatMapLatest。結(jié)果為
80
85
90
550
scan
scan就是給一個(gè)初始化的數(shù),然后不斷的拿前一個(gè)結(jié)果和最新的值進(jìn)行處理操作。
let disposeBag = DisposeBag()
Observable.of(10, 100, 1000)
.scan(1) { aggregateValue, newValue in
aggregateValue + newValue
}
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
11
111
1111
過(guò)濾和約束
filter
filter很好理解,就是過(guò)濾掉某些不符合要求的事件
Observable.of(
"??", "??", "??",
"??", "??", "??",
"??", "??", "??")
.filter {
$0 == "??"
}
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
??
??
??
distinctUntilChanged
distinctUntilChanged就是當(dāng)下一個(gè)事件與前一個(gè)事件是不同事件的事件才進(jìn)行處理操作
let disposeBag = DisposeBag()
Observable.of("??", "??", "??", "??", "??", "??", "??")
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
??
??
??
??
??
elementAt
只處理在指定位置的事件
let disposeBag = DisposeBag()
Observable.of("??", "??", "??", "??", "??", "??")
.elementAt(3)
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
??
single
找出在sequence只發(fā)出一次的事件,如果超過(guò)一個(gè)就會(huì)發(fā)出error錯(cuò)誤
Observable.of("??", "??", "??", "??", "??", "??")
.single()
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
?? //單一信號(hào)超過(guò)了一個(gè)
Received unhandled error: /var/folders/hz/v15ld5mj0nqf83d21j13y0tw0000gn/T/./lldb/7229/playground107.swift:69:__lldb_expr_107 -> Sequence contains more than one element.
Observable.of("??", "??", "??", "??", "??", "??")
.single { $0 == "??" } //青蛙只有一個(gè),completed
.subscribe { print($0) }
.addDisposableTo(disposeBag)
Observable.of("??", "??", "??", "??", "??", "??")
.single { $0 == "??" } //兔子有兩個(gè),會(huì)發(fā)出error
.subscribe { print($0) }
.addDisposableTo(disposeBag)
Observable.of("??", "??", "??", "??", "??", "??")
.single { $0 == "??" } //沒(méi)有藍(lán)色球,會(huì)發(fā)出error
.subscribe { print($0) }
.addDisposableTo(disposeBag)
take
只處理前幾個(gè)事件信號(hào),
let disposeBag = DisposeBag()
Observable.of("??", "??", "??", "??", "??", "??")
.take(3)
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
??
??
??
takeLast
只處理后幾個(gè)事件信號(hào)
let disposeBag = DisposeBag()
Observable.of("??", "??", "??", "??", "??", "??")
.takeLast(3)
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
??
??
??
takeWhile
當(dāng)條件滿足的時(shí)候進(jìn)行處理
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4, 5, 6)
.takeWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
1
2
3
takeUntil
接收事件消息,直到另一個(gè)sequence發(fā)出事件消息的時(shí)候。
let disposeBag = DisposeBag()
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.takeUntil(referenceSequence)
.subscribe { print($0) }
.addDisposableTo(disposeBag)
sourceSequence.onNext("??")
sourceSequence.onNext("??")
sourceSequence.onNext("??")
referenceSequence.onNext("??") //停止接收消息
sourceSequence.onNext("??")
sourceSequence.onNext("??")
sourceSequence.onNext("??")
next(??)
next(??)
next(??)
completed
skip
取消前幾個(gè)事件
let disposeBag = DisposeBag()
Observable.of("??", "??", "??", "??", "??", "??")
.skip(2)
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
??
??
??
??
skipWhile
滿足條件的事件消息都取消
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4, 5, 6)
.skipWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
4
5
6
skipWhileWithIndex
滿足條件的都被取消,傳入的閉包同skipWhile有點(diǎn)區(qū)別而已
Observable.of("??", "??", "??", "??", "??", "??")
.skipWhileWithIndex { element, index in
index < 3
}
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
skipUntil
直到某個(gè)sequence發(fā)出了事件消息,才開(kāi)始接收當(dāng)前sequence發(fā)出的事件消息
let disposeBag = DisposeBag()
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.skipUntil(referenceSequence)
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
sourceSequence.onNext("??")
sourceSequence.onNext("??")
sourceSequence.onNext("??")
referenceSequence.onNext("??")
sourceSequence.onNext("??")
sourceSequence.onNext("??")
sourceSequence.onNext("??")
}
數(shù)學(xué)操作
toArray
將sequence轉(zhuǎn)換成一個(gè)array,并轉(zhuǎn)換成單一事件信號(hào),然后結(jié)束
let disposeBag = DisposeBag()
Observable.range(start: 1, count: 10)
.toArray()
.subscribe { print($0) }
.addDisposableTo(disposeBag)
next([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
completed
reduce
用一個(gè)初始值,對(duì)事件數(shù)據(jù)進(jìn)行累計(jì)操作。reduce接受一個(gè)初始值,和一個(gè)操作符號(hào)
let disposeBag = DisposeBag()
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +)
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
1111
concat
concat會(huì)把多個(gè)sequence和并為一個(gè)sequence,并且當(dāng)前面一個(gè)sequence發(fā)出了completed事件,才會(huì)開(kāi)始下一個(gè)sequence的事件。
在第一sequence完成之前,第二個(gè)sequence發(fā)出的事件都會(huì)被忽略,但會(huì)接收一完成之前的二發(fā)出的最后一個(gè)事件。不好解釋?zhuān)蠢诱f(shuō)明
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "??")
let subject2 = BehaviorSubject(value: "??")
let variable = Variable(subject1)
variable.asObservable()
.concat()
.subscribe { print($0) }
.addDisposableTo(disposeBag)
subject1.onNext("??")
subject1.onNext("??")
variable.value = subject2
subject2.onNext("??") //1完成前,會(huì)被忽略
subject2.onNext("teng") //1完成前,會(huì)被忽略
subject2.onNext("fei") //1完成前的最后一個(gè),會(huì)被接收
subject1.onCompleted()
subject2.onNext("??")
next(??)
next(??)
next(??)
next(fei)
next(??)
連接性操作
Connectable Observable有訂閱時(shí)不開(kāi)始發(fā)射事件消息,而是僅當(dāng)調(diào)用它們的connect()方法時(shí)。這樣就可以等待所有我們想要的訂閱者都已經(jīng)訂閱了以后,再開(kāi)始發(fā)出事件消息,這樣能保證我們想要的所有訂閱者都能接收到事件消息。其實(shí)也就是等大家都就位以后,開(kāi)始發(fā)出消息
publish
將一個(gè)正常的sequence轉(zhuǎn)換成一個(gè)connectable sequence
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.publish()
_ = intSequence
.subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
delay(2) { _ = intSequence.connect() } //相當(dāng)于把事件消息推遲了兩秒
delay(4) {
_ = intSequence
.subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}
delay(6) {
_ = intSequence
.subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}
Subscription 1:, Event: 0
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 3:, Event: 3
Subscription 2:, Event: 3
Subscription 1:, Event: 4
Subscription 3:, Event: 4
replay
將一個(gè)正常的sequence轉(zhuǎn)換成一個(gè)connectable sequence,然后和replaySubject相似,能接收到訂閱之前的事件消息。
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.replay(5) //接收到訂閱之前的5條事件消息
_ = intSequence
.subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
delay(2) { _ = intSequence.connect() }
delay(4) {
_ = intSequence
.subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}
delay(8) {
_ = intSequence
.subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}
multicast
將一個(gè)正常的sequence轉(zhuǎn)換成一個(gè)connectable sequence,并且通過(guò)特性的subject發(fā)送出去,比如PublishSubject,或者replaySubject,behaviorSubject等。不同的Subject會(huì)有不同的結(jié)果。
let subject = PublishSubject<Int>()
_ = subject
.subscribe(onNext: { print("Subject: \($0)") })
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.multicast(subject)
_ = intSequence
.subscribe(onNext: { print("\tSubscription 1:, Event: \($0)") })
delay(2) { _ = intSequence.connect() }
delay(4) {
_ = intSequence
.subscribe(onNext: { print("\tSubscription 2:, Event: \($0)") })
}
錯(cuò)誤處理
catchErrorJustReturn
遇到error事件的時(shí)候,就return一個(gè)值,然后結(jié)束
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchErrorJustReturn("??")
.subscribe { print($0) }
.addDisposableTo(disposeBag)
sequenceThatFails.onNext("??")
sequenceThatFails.onNext("??")
sequenceThatFails.onNext("??")
sequenceThatFails.onNext("??")
sequenceThatFails.onError(TestError.test)
next(??)
next(??)
next(??)
next(??)
next(??)
completed
catchError
捕獲error進(jìn)行處理,可以返回另一個(gè)sequence進(jìn)行訂閱
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()
sequenceThatFails
.catchError {
print("Error:", $0)
return recoverySequence
}
.subscribe { print($0) }
.addDisposableTo(disposeBag)
sequenceThatFails.onNext("??")
sequenceThatFails.onNext("??")
sequenceThatFails.onNext("??")
sequenceThatFails.onNext("??")
sequenceThatFails.onError(TestError.test)
recoverySequence.onNext("??")
next(??)
next(??)
next(??)
next(??)
Error: test
next(??)
retry
遇見(jiàn)error事件可以進(jìn)行重試,比如網(wǎng)絡(luò)請(qǐng)求失敗,可以進(jìn)行重新連接
let disposeBag = DisposeBag()
var count = 1
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("??")
observer.onNext("??")
observer.onNext("??")
if count == 1 {
observer.onError(TestError.test)
print("Error encountered")
count += 1
}
observer.onNext("??")
observer.onNext("??")
observer.onNext("??")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3) //不傳入數(shù)字的話,只會(huì)重試一次
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
debug
debug
打印所有的訂閱, 事件和disposals
sequenceThatErrors
.retry(3)
.debug()
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)
RxSwift.Resources.total
查看RxSwift所有資源的占用
print(RxSwift.Resources.total)