注:本文中使用 runBlocking 是為了方便測(cè)試,業(yè)務(wù)開(kāi)發(fā)中禁止使用
一、Flow 的基本使用
1、Sequence 序列生成器
(1)取出序列生成器中的值,需要迭代序列生成器;
(2)是同步調(diào)用,是阻塞的,無(wú)法調(diào)用其它的掛起函數(shù)。
fun sequenceFun() {
val sequence = sequence<Int> {
Thread.sleep(1000)
yield(1)
Thread.sleep(1000)
yield(2)
Thread.sleep(1000)
yield(3)
}
sequence.forEach {
println(it)
}
println("Done!")
// 1
// 2
// 3
// Done!
}
2、Flow 的簡(jiǎn)單使用
(1)flow{ ... } 內(nèi)部可以調(diào)用 suspend 函數(shù);
(2)使用 emit() 方法來(lái)發(fā)射數(shù)據(jù);
(3)使用 collect() 方法來(lái)收集結(jié)果。
fun flowFun() = runBlocking {
val flow = flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
flow.collect {
println(it)
}
println("Done!")
// 1
// 2
// 3
// Done!
}
3、創(chuàng)建 Flow 的常用方式
(1)flow{...} 需要顯示調(diào)用 emit() 發(fā)射數(shù)據(jù);
(2)flowOf() 一個(gè)發(fā)射固定值集的流, 不需要顯示調(diào)用 emit() 發(fā)射數(shù)據(jù);
(3)asFlow() 擴(kuò)展函數(shù),可以將各種集合與序列轉(zhuǎn)換為流,也不需要顯示調(diào)用 emit() 發(fā)射數(shù)據(jù)。
fun createFlowFun() = runBlocking {
val flow1 = flow {
delay(1000)
emit(1)
}
val flow2 = flowOf(2, 3).onEach {
delay(1000)
}
val flow3 = listOf(4, 5).asFlow().onEach {
delay(1000)
}
flow1.collect {
println(it)
}
flow2.collect {
println(it)
}
flow3.collect {
println(it)
}
println("Done!")
// 1
// 2
// 3
// 4
// 5
// Done!
}
4、Flow 是冷流(惰性的)
如同 Sequences 一樣, Flow 也是惰性的,即在調(diào)用末端流操作符( collect 是其中之一)之前,flow{ ... } 中的代碼不會(huì)執(zhí)行。我們稱(chēng)之為 -- 冷流。
fun coldFlowFun() = runBlocking {
val flow = flowOf(1, 2, 3)
.onEach {
delay(1000)
}
println("calling collect...")
flow.collect {
println(it)
}
println("calling collect again...")
flow.collect {
println(it)
}
// calling collect...
// 1
// 2
// 3
// calling collect again...
// 1
// 2
// 3
}
5、Flow 的取消
流采用了與協(xié)程同樣的協(xié)助取消,取消 Flow 只需要取消它所在的 協(xié)程 即可。
fun cancelFlowFun() = runBlocking {
val flow = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
withTimeoutOrNull(250) {
flow.collect {
println(it)
}
}
println("Done!")
// Emitting 1
// 1
// Emitting 2
// 2
// Done!
}
二、Flow 的常用操作符
1、末端流操作符 collect 、reduce 、fold、toxxx 等
fun terminalFlowOptFun() = runBlocking {
val flow = (1..3).asFlow().onEach { delay(200) }
flow.collect { println(it) }
// 1
// 2
// 3
val reduceSum = flow.reduce { a, b -> a + b }
println("reduce: sum = $reduceSum")
// reduce: sum = 6
val foldSum = flow.fold(100) { a, b -> a + b }
println("fold: sum = $foldSum")
// fold: sum = 106
val list = flow.toList()
val set = flow.toSet()
println("list: $list")
println("set: $set")
// list: [1, 2, 3]
// set: [1, 2, 3]
val flow2 = flowOf("one", "two").onEach { delay(200) }
flow.onEach { println(it) }.launchIn(this)
flow2.onEach { println(it) }.launchIn(this)
// 1
// one
// 2
// two
// 3
}
2、流啟動(dòng)時(shí) onStart
fun startFlowFun() = runBlocking {
(1..3).asFlow()
.onEach { delay(1000) }
.onStart { println("onStart") }
.collect { println(it) }
// onStart
// 1
// 2
// 3
}
3、流完成時(shí) onCompletion
(1)使用 try ... finally 實(shí)現(xiàn);
(2)通過(guò) onCompletion 函數(shù)實(shí)現(xiàn)。
fun completionFlowFun() = runBlocking {
try {
flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}.collect {
println(it)
}
} finally {
println("Done!")
}
// 1
// 2
// 3
// Done!
flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}.onCompletion { println("Done!") }
.collect { println(it) }
// 1
// 2
// 3
// Done!
}
4、背壓 Backpressure
Backpressure 是響應(yīng)式編程的功能之一,F(xiàn)low 的 Backpressure 是通過(guò) suspend 函數(shù)實(shí)現(xiàn)的。
(1)buffer 緩沖(這里要注意的是,buffer 的容量是從 0 開(kāi)始計(jì)算的)
?? - SUSPEND 設(shè)置緩沖區(qū),如果溢出了,則將當(dāng)前協(xié)程掛起,直到有消費(fèi)了緩沖區(qū)中的數(shù)據(jù);
?? - DROP_LATEST 設(shè)置緩沖區(qū),如果溢出了,丟棄最新的數(shù)據(jù);
?? - DROP_OLDEST 設(shè)置緩沖區(qū),如果溢出了,丟棄最老的數(shù)據(jù)。
(2)conflate 合并
?? - 不設(shè)緩沖區(qū),也就是緩沖區(qū)大小為 0,采取 DROP_OLDEST 策略,等價(jià)于 buffer(0, BufferOverflow.DROP_OLDEST) 。
fun bufferFlowFun() = runBlocking {
val cosTime = measureTimeMillis {
(1..5).asFlow()
.onEach {
delay(100)
println("produce data: $it")
}
.buffer(1, BufferOverflow.SUSPEND)
.collect {
delay(500)
println("collect: $it")
}
}
println("cosTime: $cosTime")
// produce data: 1
// produce data: 2
// produce data: 3
// collect: 1
// produce data: 4
// collect: 2
// produce data: 5
// collect: 3
// collect: 4
// collect: 5
// cosTime: 2742
}
fun conflateFlowFun() = runBlocking {
val cosTime = measureTimeMillis {
(1..5).asFlow()
.onEach {
delay(100)
println("produce data: $it")
}
.conflate()
.collect {
delay(500)
println("collect: $it")
}
}
println("cosTime: $cosTime")
// produce data: 1
// produce data: 2
// produce data: 3
// produce data: 4
// produce data: 5
// collect: 1
// collect: 5
// cosTime: 1223
}
5、Flow 異常處理 catch、retry、retryWhen
(1)catch 操作符捕獲上游異常
?? - onCompletion 用來(lái)處理 Flow 是否收集完成,即使是遇到異常也會(huì)執(zhí)行;
?? - onCompletion 有一個(gè)參數(shù)可以用來(lái)判斷上游是否出現(xiàn)異常;上游出現(xiàn)異常,不為 null,未出現(xiàn)異常,則為 null;
?? - onCompletion 只能判斷是否出現(xiàn)了異常,并不能捕獲異常;
?? - 捕獲異常使用 catch 操作符;
?? - 如果把 onCompletion 和 catch 交換一下位置,則 catch 操作捕獲到異常之后,不會(huì)再影響下游;
?? - catch 操作符用于實(shí)現(xiàn)異常透明化處理, catch 只是中間操作符不能捕獲下游的異常;
?? - catch 操作符內(nèi),可以使用 throw 再次拋出異常、可以使用 emit() 轉(zhuǎn)換為發(fā)射值、可以用于打印或者其他業(yè)務(wù)邏輯的處理等等。
(2)retry、retryWhen 操作符重試
?? - 如果上游遇到了異常,并且 retry 方法返回 true 則會(huì)進(jìn)行重試,最多重試 retries 指定的次數(shù);
?? - retry 最終調(diào)用的是 retryWhen 操作符。
fun catchFlowFun() = runBlocking {
(1..5).asFlow()
.onEach {
if (it == 4) {
throw Exception("test exception")
}
delay(100)
println("produce data: $it")
}
/*.catch { e ->
println("catch exception: $e")
}*/
.onCompletion { e ->
if (null == e) {
println("onCompletion")
} else {
println("onCompletion: $e")
}
}
.catch { e ->
println("catch exception: $e")
}
.collect {
println("collect: $it")
}
// produce data: 1
// collect: 1
// produce data: 2
// collect: 2
// produce data: 3
// collect: 3
// onCompletion: java.lang.Exception: test exception
// catch exception: java.lang.Exception: test exception
}
fun retryFlowFun() = runBlocking {
(1..5).asFlow()
.onEach {
if (it == 2) {
throw Exception("test exception")
}
delay(100)
println("produce data: $it")
}
.retry(1) {
it.message == "test exception"
}
/*.retryWhen { cause, attempt ->
cause.message == "test exception" && attempt < 1
}*/
.catch { ex ->
println("catch exception: ${ex.message}")
}
.collect {
println("collect: $it")
}
// produce data: 1
// collect: 1
// produce data: 1
// collect: 1
// catch exception: test exception
}
6、Flow 線(xiàn)程切換 flowOn
(1)響應(yīng)線(xiàn)程是由 CoroutineContext 決定的,比如,在 Main 線(xiàn)程中執(zhí)行 collect, 那么響應(yīng)線(xiàn)程就是 Dispatchers.Main;
(2)Flow 通過(guò) flowOn 方法來(lái)切換線(xiàn)程,多次調(diào)用,都會(huì)影響到它上游的代碼。
fun switchThreadFlowFun() = runBlocking {
val myDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
(1..2).asFlow()
.onEach {
printlnWithThread("produce data: $it")
}
.flowOn(Dispatchers.IO)
.onEach {
printlnWithThread("IO data: $it")
}
.flowOn(myDispatcher)
.onEach {
printlnWithThread("my data: $it")
}
.flowOn(Dispatchers.Default)
.onCompletion {
myDispatcher.close()
}
.collect {
printlnWithThread("collect: $it")
}
// Thread -> id: 12, name: DefaultDispatcher-worker-2, produce data: 1
// Thread -> id: 12, name: DefaultDispatcher-worker-2, produce data: 2
// Thread -> id: 13, name: pool-1-thread-1, IO data: 1
// Thread -> id: 13, name: pool-1-thread-1, IO data: 2
// Thread -> id: 11, name: DefaultDispatcher-worker-1, my data: 1
// Thread -> id: 11, name: DefaultDispatcher-worker-1, my data: 2
// Thread -> id: 1, name: main, collect: 1
// Thread -> id: 1, name: main, collect: 2
}
7、Flow 的中間轉(zhuǎn)換操作符
(1)map 操作符用于將流中的每個(gè)元素進(jìn)行轉(zhuǎn)換后再發(fā)射出來(lái)
fun mapFlowFun() = runBlocking {
(1..2).asFlow()
.map {
"map -> $it"
}
.collect {
println(it)
}
// map -> 1
// map -> 2
}
(2)transform 操作符,可以任意多次調(diào)用 emit ,這是 transform 跟 map 最大的區(qū)別
fun transformFlowFun() = runBlocking {
(1..2).asFlow()
.transform {
emit("transform1 -> $it")
delay(100)
emit("transform2 -> $it")
}
.collect {
println(it)
}
// transform1 -> 1
// transform2 -> 1
// transform1 -> 2
// transform2 -> 2
}
(3)onEach 遍歷
fun onEachFlowFun() = runBlocking {
(1..3).asFlow()
.onEach { println("onEach: $it") }
.collect { println(it) }
// onEach: 1
// 1
// onEach: 2
// 2
// onEach: 3
// 3
}
(4)filter 條件過(guò)濾
fun filterFlowFun() = runBlocking {
(1..5).asFlow()
.filter { it % 2 == 0 }
.collect { println(it) }
// 2
// 4
}
(5)drop 過(guò)濾掉 前 N 個(gè) 元素
fun dropFlowFun() = runBlocking {
(1..5).asFlow()
.drop(3)
.collect { println(it) }
// 4
// 5
}
(6)dropWhile 過(guò)濾 滿(mǎn)足條件 的 前 N 個(gè) 元素,一旦條件不滿(mǎn)足則不再過(guò)濾后續(xù)元素
fun dropWhileFlowFun() = runBlocking {
listOf(1, 3, 4, 2, 5).asFlow()
.dropWhile { it < 4 }
.collect { println(it) }
// 4
// 2
// 5
listOf(1, 3, 4, 2, 5).asFlow()
.dropWhile { it % 2 == 1 }
.collect { println(it) }
// 4
// 2
// 5
}
(7)take 只取 前 N 個(gè) emit 發(fā)射的值
fun takeFlowFun() = runBlocking {
(1..5).asFlow()
.take(2)
.collect { println(it) }
// 1
// 2
}
(8)takeWhile 只取 滿(mǎn)足條件 的 前 N 個(gè) 元素,一旦條件不滿(mǎn)足則不再獲取后續(xù)元素
fun takeWhileFlowFun() = runBlocking {
(5 downTo 1).asFlow()
.takeWhile { it > 3 }
.collect { println(it) }
// 5
// 4
listOf(5, 2, 4, 1).asFlow()
.takeWhile { it > 3 }
.collect { println(it) }
// 5
}
(9)zip 是可以將2個(gè) flow 進(jìn)行合并的操作符
?? - 即使 flowB 中的每一個(gè) item 都使用了 delay() 函數(shù),在合并過(guò)程中也會(huì)等待 delay() 執(zhí)行完后再進(jìn)行合并;
?? - 如果 flowA 和 flowB 中 item 個(gè)數(shù)不一致,則合并后新的 flow item 個(gè)數(shù),等于較小的 item 個(gè)數(shù)。
fun zipFlowFun() = runBlocking {
val flowA = (1..6).asFlow()
val flowB = flowOf("one", "two", "three").onEach { delay(200) }
flowA.zip(flowB) { a, b -> "$a and $b" }
.collect { println(it) }
// 1 and one
// 2 and two
// 3 and three
}
(10)combine 合并時(shí),組合每個(gè)流最新發(fā)出的元素
fun combineFlowFun() = runBlocking {
val flowA = (1..5).asFlow().onEach { delay(100) }
val flowB = flowOf("one", "two", "three", "four", "five").onEach { delay(200) }
flowA.combine(flowB) { a, b -> "$a and $b" }.collect { println(it) }
// 1 and one
// 2 and one
// 3 and one
// 3 and two
// 4 and two
// 5 and two
// 5 and three
// 5 and four
// 5 and five
}
(11)flattenConcat 將給定流按順序展平為單個(gè)流,而不交錯(cuò)嵌套流
fun flattenConcatFlowFun() = runBlocking {
val flowA = (1..3).asFlow()
val flowB = flowOf("a", "b", "c").onEach { delay(1000) }
flowOf(flowA, flowB).flattenConcat().collect { println(it) }
// 1
// 2
// 3
// a
// b
// c
}
(12)fattenMerge 有一個(gè)參數(shù),并發(fā)限制,默認(rèn) 16;參數(shù)必須大于0,為 1 時(shí),等價(jià)于 flattenConcat
fun flattenMergeFlowFun() = runBlocking {
val flowA = (1..3).asFlow().onEach { delay(1000) }
val flowB = flowOf("a", "b", "c").onEach { delay(2000) }
flowOf(flowA, flowB).flattenMerge(8).collect { println(it) }
// 1
// a
// 2
// 3
// b
// c
}
(13)flatMapContact 由 map、flattenConcat 操作符實(shí)現(xiàn),收集新值之前會(huì)等待 flatMapConcat 內(nèi)部的 flow 完成
fun flatMapContactFlowFun() = runBlocking {
(1..2).asFlow()
.flatMapConcat {
flow {
emit(it)
delay(1000)
emit("string: $it")
}
}
.collect {
println(it)
}
// 1
// string: 1
// 2
// string: 2
}
(14)flatMapMerge 由 map、flattenMerge 操作符實(shí)現(xiàn),不會(huì)等待內(nèi)部的 flow 完成
fun flatMapMergeFlowFun() = runBlocking {
(1..2).asFlow()
.flatMapMerge {
flow {
emit(it)
delay(1000)
emit("string: $it")
}
}
.collect {
println(it)
}
// 1
// 2
// string: 1
// string: 2
}
(15)flatMapLatest 當(dāng)發(fā)射了新值之后,上個(gè) flow 就會(huì)被取消
fun flatMapLatestFlowFun() = runBlocking {
(1..3).asFlow()
.onEach { delay(100) }
.flatMapLatest {
flow {
println("begin flatMapLatest: $it")
delay(200)
emit("string: $it")
println("end flatMapLatest: $it")
}
}
.collect { println(it) }
}
三、StateFlow 和 SharedFlow
StateFlow 和 SharedFlow 是用來(lái)替代 BroadcastChannel 的新的 API。用于上游發(fā)射數(shù)據(jù),能同時(shí)被 多個(gè)訂閱者 收集數(shù)據(jù)。
1、StateFlow
(1)StateFlow 是一個(gè)狀態(tài)容器式可觀(guān)察數(shù)據(jù)流,可以向其收集器發(fā)出當(dāng)前狀態(tài)更新和新?tīng)顟B(tài)更新;還可通過(guò)其 value 屬性讀取當(dāng)前狀態(tài)值;
(2)StateFlow 有兩種類(lèi)型: StateFlow(只讀) 和 MutableStateFlow(可以改變 value 的值);
(3)StateFlow 的狀態(tài)由其值表示,任何對(duì)值的更新都會(huì)反饋新值到所有流的接收器中;
(4)StateFlow 發(fā)射的數(shù)據(jù)可以被在不同的協(xié)程中的多個(gè)接受者同時(shí)收集;
(5)StateFlow 是熱流,只要數(shù)據(jù)發(fā)生變化,就會(huì)發(fā)射數(shù)據(jù);
(6)StateFlow 調(diào)用 collect 收集數(shù)據(jù)后不會(huì)停止,需要手動(dòng)取消訂閱者的協(xié)程;
(7)StateFlow 只會(huì)發(fā)射最新的數(shù)據(jù)給訂閱者。
class StateFlowTest {
private val _state = MutableStateFlow("unKnow")
val state: StateFlow<String> get() = _state
fun getApi1(scope: CoroutineScope) {
scope.launch {
delay(1000)
_state.value = "hello StateFlow"
}
}
fun getApi2(scope: CoroutineScope) {
scope.launch {
delay(1000)
_state.value = "hello Kotlin"
}
}
}
fun stateFlowFun() = runBlocking {
val test = StateFlowTest()
test.getApi1(this)
delay(1000)
test.getApi2(this)
val job1 = launch(Dispatchers.IO) {
delay(5000)
test.state.collect {
printlnWithThread(it)
}
}
val job2 = launch(Dispatchers.IO) {
delay(5000)
test.state.collect {
printlnWithThread(it)
}
}
delay(7000)
job1.cancel()
job2.cancel()
// Thread -> id: 11, name: DefaultDispatcher-worker-1, hello Kotlin
// Thread -> id: 13, name: DefaultDispatcher-worker-3, hello Kotlin
}
2、SharedFlow
(1)SharedFlow 管理一系列狀態(tài)更新(即事件流),而非管理當(dāng)前狀態(tài);
(2)SharedFlow 也有兩種類(lèi)型:SharedFlow 和 MutableSharedFlow;
?? - SharedFlow 包含可用作原子快照的 replayCache,每個(gè)新的訂閱者會(huì)先從 replay cache 中獲取值,然后才收到新發(fā)出的值;
?? - MutableSharedFlow 可用于從掛起或非掛起的上下文中發(fā)射值,顧名思義,可以重置 replayCache,而且還將訂閱者的數(shù)量作為 Flow 暴露出來(lái)。
(3)MutableSharedFlow 具有 subscriptionCount 屬性,其中包含處于活躍狀態(tài)的收集器的數(shù)量;
(4)MutableSharedFlow 包含一個(gè) resetReplayCache 函數(shù),在不想重放已向數(shù)據(jù)流發(fā)送的最新信息的情況下使用;
(5)使用 sharedIn 方法可以將 Flow 轉(zhuǎn)換為 SharedFlow
class SharedFlowTest {
private val _state = MutableSharedFlow<Int>(
replay = 2, // 當(dāng)新的訂閱者 Collect 時(shí),發(fā)送幾個(gè)已經(jīng)發(fā)送過(guò)的數(shù)據(jù)給它
extraBufferCapacity = 3, // 減去 replay 還緩存多少數(shù)據(jù)(即此處總緩存為5)
onBufferOverflow = BufferOverflow.SUSPEND // 緩存溢出時(shí)的處理策略,三種 丟掉最新值、丟掉最舊值和掛起
)
val state: SharedFlow<Int> get() = _state
fun getApi(scope: CoroutineScope) {
scope.launch {
for (i in 0..5) {
delay(200)
_state.emit(i)
printlnWithThread("send data: $i")
}
}
}
}
fun sharedFlowFun() = runBlocking {
val test = SharedFlowTest()
test.getApi(this)
val job = launch(Dispatchers.IO) {
delay(1000)
test.state.collect {
printlnWithThread("collect data: $it")
}
}
delay(5000)
job.cancel()
// 總緩存為5,訂閱時(shí)先發(fā)送2個(gè)舊數(shù)據(jù),然后再收集新數(shù)據(jù)
// Thread -> id: 1, name: main, send data: 0
// Thread -> id: 1, name: main, send data: 1
// Thread -> id: 1, name: main, send data: 2
// Thread -> id: 1, name: main, send data: 3
// Thread -> id: 11, name: DefaultDispatcher-worker-1, collect data: 2
// Thread -> id: 11, name: DefaultDispatcher-worker-1, collect data: 3
// Thread -> id: 1, name: main, send data: 4
// Thread -> id: 11, name: DefaultDispatcher-worker-1, collect data: 4
// Thread -> id: 1, name: main, send data: 5
// Thread -> id: 11, name: DefaultDispatcher-worker-1, collect data: 5
}