引子
Android應(yīng)用程序壽命與其擴(kuò)展的靈活性有關(guān),因?yàn)樗枰粋€(gè)堅(jiān)實(shí)的基礎(chǔ),這就是為什么對(duì)于每個(gè)項(xiàng)目來(lái)說(shuō),最重要的步驟是創(chuàng)建應(yīng)用程序架構(gòu),在與技術(shù)團(tuán)隊(duì)就定義系統(tǒng)中包含的元素、每個(gè)元素的功能以及它們將如何相互通信進(jìn)行長(zhǎng)時(shí)間的討論后,我們必須對(duì)整體架構(gòu)進(jìn)行清晰的設(shè)計(jì)。
Android應(yīng)用程序有不同的架構(gòu),與我去年的經(jīng)驗(yàn)不同,MVVM和MVI架構(gòu)是用于大型應(yīng)用程序的最常見(jiàn)架構(gòu),即使每個(gè)應(yīng)用程序都沒(méi)有一種實(shí)現(xiàn)方式,這取決于其應(yīng)用程序需求,以及處理它的開(kāi)發(fā)人員風(fēng)格,因?yàn)槲蚁嘈?,?dú)立于Android框架,每個(gè)開(kāi)發(fā)人員都有他們獨(dú)特的軟件開(kāi)發(fā)經(jīng)驗(yàn),他們帶來(lái)的不僅僅是他們的知識(shí),還有他們獨(dú)特的思維方式、解決問(wèn)題和設(shè)計(jì)代碼的方式。
業(yè)務(wù)場(chǎng)景是這樣的:從網(wǎng)絡(luò)拉取 Feeds 流并持久化在數(shù)據(jù)庫(kù)中,以便下次啟動(dòng)時(shí)可先展示本地?cái)?shù)據(jù),待請(qǐng)求返回后再刷新 Feeds。
現(xiàn)援引上一篇的解決方案:
// 實(shí)現(xiàn)訪問(wèn)網(wǎng)絡(luò)和數(shù)據(jù)庫(kù)的細(xì)節(jié)
class NewsRepository(context: Context) {
// 使用 Retrofit 構(gòu)建請(qǐng)求訪問(wèn)網(wǎng)絡(luò)
private val retrofit = Retrofit.Builder()
.baseUrl("https://api.apiopen.top")
.addConverterFactory(MoshiConverterFactory.create())
// 將返回?cái)?shù)據(jù)組織成 LiveData
.addCallAdapterFactory(LiveDataCallAdapterFactory())
.client(OkHttpClient.Builder().build())
.build()
private val newsApi = retrofit.create(NewsApi::class.java)
private var executor = Executors.newSingleThreadExecutor()
// 使用 room 訪問(wèn)數(shù)據(jù)庫(kù)
private var newsDatabase = NewsDatabase.getInstance(context)
private var newsDao = newsDatabase.newsDao()
// 用于將新聞流傳遞給上層的 LiveData
private var newsLiveData = MediatorLiveData<List<News>>()
fun fetchNewsLiveData(): LiveData<List<News>?> {
// 從數(shù)據(jù)庫(kù)獲取新聞
val localNews = newsDao.queryNews()
// 從網(wǎng)絡(luò)獲取新聞
val remoteNews = newsApi.fetchNewsLiveData(
mapOf("page" to "1", "count" to "4")
)
.let {
Transformations.map(it) { response: ApiResponse<NewsBean>? ->
when (response) {
is ApiSuccessResponse -> {
val news = response.body.result
// 將網(wǎng)絡(luò)新聞入庫(kù)
news?.let {executor.submit { newsDao.insertAll(it) }}
news
}
else -> null
}
}
}
// 將數(shù)據(jù)庫(kù)和網(wǎng)絡(luò)響應(yīng)的 LiveData 合并
newsLiveData.addSource(localNews) {newsLiveData.value = it}
newsLiveData.addSource(remoteNews) {newsLiveData.value = it}
return newsLiveData
}
}
這是 Clean Architecture 中的 Repository,它提供數(shù)據(jù)訪問(wèn)能力,隱藏了訪問(wèn)網(wǎng)絡(luò)和數(shù)據(jù)庫(kù)的細(xì)節(jié)。
關(guān)于 Clean Architecture 的詳細(xì)解釋可以點(diǎn)擊[我是怎么把業(yè)務(wù)代碼越寫(xiě)越復(fù)雜的 | MVP - MVVM - Clean Architecture
為了使用 LiveData 承載整個(gè)數(shù)據(jù)鏈路,Retrofit 增加了 LiveDataCallAdapterFactory,它使得接口能直接返回 LiveData:
interface NewsApi {
@POST("/getWangYiNews")
fun fetchNewsLiveData(
@FieldMap map:Map<String,String>
):LiveData<ApiResponse<NewsBean>>
}
Room 也支持將數(shù)據(jù)庫(kù)查詢內(nèi)容 LiveData 化:
@Dao
interface NewsDao {
@Query("select * from news")
fun queryNews(): LiveData<List<News>?>
}
網(wǎng)絡(luò) & 數(shù)據(jù)庫(kù) Flow 化
數(shù)據(jù)鏈路 Flow 化從鏈路源頭開(kāi)始。
Room 支持以 Flow 形式返回查詢結(jié)果:
@Dao
interface NewsDao {
@Query("select * from news")
fun queryNewsFlow(): Flow<List<News>?>
}
Retrofit 并未支持 Flow 形式的接口返回值,于是在 GitHub 上找了一遍,有是有,但 star 數(shù)都很少,不太敢用。正在猶豫之際,看到了下面 retrofit 官方的回復(fù):[[Feature Request] Support adapter for Kotlin Coroutine Flow · Issue #3497 · square/retrofit (github.com)
有人提 issue 希望 retrofit 官方支持接口 Flow 化,但作者回復(fù)說(shuō)網(wǎng)絡(luò)請(qǐng)求返回的是“一個(gè)異步結(jié)果”而不是“一串異步結(jié)果”,所以suspend就夠用了。如果想要將接口 Flow 化,可以這樣做:
flow {
emit(getPosts())
}
作者接著說(shuō):“如果有機(jī)會(huì)重寫(xiě) RxJava 的 call adapter,可能也不會(huì)支持接口 Observable 化?!?/p>
醍醐灌頂,立馬照做:
interface NewsApi {
@POST("/getWangYiNews")
suspend fun fetchNews(@FieldMap map:Map<String,String>): NewsBean
}
將接口定義為suspend方法。查詢數(shù)據(jù)庫(kù)內(nèi)容也應(yīng)該這么改:
@Dao
interface NewsDao {
@Query("select * from news")
suspend fun queryNewsSuspend(): List<News>
}
其實(shí)若將查詢數(shù)據(jù)庫(kù)的結(jié)果定義為 Flow 的話,每當(dāng)數(shù)據(jù)庫(kù)內(nèi)容發(fā)生增刪,F(xiàn)low 的訂閱者都會(huì)收到通知。相較于“多個(gè)異步結(jié)果”,當(dāng)前場(chǎng)景使用“單個(gè)異步結(jié)果”更合適。
將訪問(wèn)數(shù)據(jù)庫(kù)及請(qǐng)求網(wǎng)絡(luò)在 Repository 中轉(zhuǎn)化成流:
class NewsRepo() {
// 訪問(wèn)網(wǎng)絡(luò)的 Flow(冷流:此時(shí)并未發(fā)生網(wǎng)絡(luò)請(qǐng)求)
fun remoteNewsFlow(page: Int, count: Int) =
suspend { newApi.fetchNews(mapOf("page" to page, "count" to count)) }
.asFlow() // 將 suspend 代碼塊轉(zhuǎn)換成流
.map { newsBean ->
if (newsBean.code == 200) {
// 請(qǐng)求成功,更新緩存
if (!newsBean.result.isNullOrEmpty()) {
newsDao.deleteAllNews()
newsDao.insertAll(newsBean.result.map { it.toNews() })
newsBean.result
} else {
emptyList()
}
} else {
throw Exception(newsBean.message)
}
}
// 訪問(wèn)數(shù)據(jù)庫(kù)的 Flow(冷流:此時(shí)并未發(fā)生數(shù)據(jù)庫(kù)查詢)
val localNewsOneShotFlow = flow {
val news = newsDao.queryNewsSuspend()
val newsList = news.map { it.convert() }// 將數(shù)據(jù)庫(kù)數(shù)據(jù)統(tǒng)一為網(wǎng)絡(luò)數(shù)據(jù)
emit(newsList)
}
}
在 Flow 數(shù)據(jù)鏈路的場(chǎng)景下,Repository 作為數(shù)據(jù)鏈路的起點(diǎn),提供給上層的是“原始的冷流”。
代碼中雖然調(diào)用了訪問(wèn)網(wǎng)絡(luò)和查詢數(shù)據(jù)庫(kù)的方法,但是它們是被定義在“冷流”中的,若未發(fā)生訂閱行為,就不會(huì)執(zhí)行。訂閱行為通常是在界面中進(jìn)行。
變換 & 合流
當(dāng)鏈路用 LiveData 表達(dá)時(shí),訪問(wèn)數(shù)據(jù)庫(kù)和網(wǎng)絡(luò)的操作被定義在一個(gè) Repository 的方法中:
class NewsRepository(context: Context) {
fun fetchNewsLiveData(): LiveData<List<News>?> {
// 1.從數(shù)據(jù)庫(kù)獲取新聞
val localNews = newsDao.queryNews()
// 2.從網(wǎng)絡(luò)獲取新聞
val remoteNews = newsApi.fetchNewsLiveData(mapOf("page" to "1", "count" to "4"))
// 3.將數(shù)據(jù)庫(kù)和網(wǎng)絡(luò)響應(yīng)的 LiveData 合并
newsLiveData.addSource(localNews) {newsLiveData.value = it}
newsLiveData.addSource(remoteNews) {newsLiveData.value = it}
return newsLiveData
}
}
并且它們是串行的,即只有當(dāng)數(shù)據(jù)庫(kù)訪問(wèn)結(jié)束后才開(kāi)始網(wǎng)絡(luò)請(qǐng)求,最后再將它們通過(guò) MediatorLiveData 合流。
而使用流時(shí),數(shù)據(jù)庫(kù)和網(wǎng)絡(luò)操作被定義在不同的流中,這為它們提供了更靈活的合流方式。
串行合流
串行合流的思路是將多個(gè)流組織成“嵌套流”,然后將它們“展平”。
拿 List 舉例,List.flat()提供了在列表上的展平操作,flat 即展平,為啥要展平?因?yàn)橛星短?,比?code>List<List<Int>>,即 List 中每個(gè)元素還是 List:
val lists = listOf(
listOf(1,2,3),
listOf(4,5,6)
)
Log.v("ttaylor","${lists.flatten()}") //[1, 2, 3, 4, 5, 6]
Log.v("ttaylor","${lists.flatMap { it.map { it+1 } }}") //[2, 3, 4, 5, 6, 7]
List.flat() 將兩層嵌套結(jié)構(gòu)變成單層結(jié)構(gòu),而List.flatMap()在展平的同時(shí)提供了變換內(nèi)部 List 的機(jī)會(huì)。
流也提供了類(lèi)似的展平方法flattenConcat():
flowOf(
flow {
emit(1)
emit(2)
},
flow { emit(3) },
flow { emit(4) },
).flattenConcat().collect {
Log.v("ttaylor", "${it}") // 1,2,3,4
}
flattenConcat() 的合流是串行的,即只有消費(fèi)了前一個(gè)流中所有的數(shù)據(jù)后才會(huì)消費(fèi)后一個(gè)流。
在 ViewModel 層對(duì)原始數(shù)據(jù)流進(jìn)行合流:
// 新聞 ViewModel 持有 repo
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
fun newsFlow(type: Int, count: Int) =
flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
.flattenConcat() // 串行合流
.map { NewsModel(it, false) }
}
// 通過(guò) ViewModelProvider.Factory 定義構(gòu)建 ViewModel 的細(xì)節(jié)(注入Repository)
class NewsViewModelFactory(private val newsRepo: NewsRepo) : ViewModelProvider.Factory {
override fun <T : ViewModel> create(modelClass: Class<T>): T {
return NewsViewModel(newsRepo) as T
}
}
在 Repository + Flow 的加持下,ViewModel 變得異常簡(jiǎn)單,它持有原始數(shù)據(jù)流并對(duì)其進(jìn)行合流以及變換。
兩個(gè)原始數(shù)據(jù)流分別是數(shù)據(jù)庫(kù)流和網(wǎng)絡(luò)流,使用flowOf()將它們組織成Flow<Flow<News>>嵌套結(jié)構(gòu),然后調(diào)用 flattenConcat() 將它們串行合流并展平變成一個(gè)流,即先查詢數(shù)據(jù)庫(kù),待查詢完畢后才請(qǐng)求網(wǎng)絡(luò)。合流之后還進(jìn)行了數(shù)據(jù)變換,以將網(wǎng)絡(luò)數(shù)據(jù)轉(zhuǎn)換為界面數(shù)據(jù) NewsModel:
data class NewsModel(
val news: List<News>, // 新聞列表
val loading: Boolean, // 是否正在加載
val errorMessage: String = "" // 錯(cuò)誤信息
)
將新聞列表進(jìn)行這樣包裝的目的是實(shí)現(xiàn)“唯一可信數(shù)據(jù)源”,這是 MVI 的關(guān)鍵詞之一。關(guān)于它的詳細(xì)介紹可以點(diǎn)擊Android 架構(gòu)最新進(jìn)展 | MVI = 響應(yīng)式編程 + 單向數(shù)據(jù)流 + 唯一可信數(shù)據(jù)源(該篇和本文同時(shí)發(fā)布,若鏈接無(wú)法跳轉(zhuǎn),可能是還未過(guò)審,請(qǐng)稍等~)
并行合流
串行合流中網(wǎng)絡(luò)請(qǐng)求必須等待數(shù)據(jù)庫(kù)查詢,若兩者能并行,則性能就會(huì)更好一點(diǎn)。
flattenMerge()方法就用于多流并發(fā)的場(chǎng)景:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
fun newsFlow(type: Int, count: Int) =
flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
.flattenMerge() // 并行合流
.map { NewsModel(it, false) }
}
此時(shí)數(shù)據(jù)庫(kù)和網(wǎng)絡(luò)流會(huì)并發(fā)啟動(dòng),性能是好了,但也產(chǎn)生了新問(wèn)題。
每個(gè)流生成的數(shù)據(jù)會(huì)合成到一個(gè)流中并通知界面刷新。若數(shù)據(jù)庫(kù)流先生成數(shù)據(jù),讓用戶先看到緩存新聞,然后網(wǎng)絡(luò)流再生成數(shù)據(jù),用新數(shù)據(jù)把老數(shù)據(jù)刷掉。這個(gè)流程是符合預(yù)期的。但萬(wàn)一數(shù)據(jù)庫(kù)抽風(fēng)了,比網(wǎng)絡(luò)還慢咋辦?這就會(huì)發(fā)生老數(shù)據(jù)刷掉新數(shù)據(jù)的 bug。
解決方案是:當(dāng)接收到網(wǎng)絡(luò)流的數(shù)據(jù)時(shí),就丟棄流上后面的數(shù)據(jù)。
在 RxJava 中有一個(gè)操作符叫takeUntil()就是用來(lái)描述這個(gè)場(chǎng)景的。
但 Kotlin Flow 并未提供這個(gè)方法。。。于是我開(kāi)始在網(wǎng)上找。。。直到我發(fā)現(xiàn)了這個(gè)官方回復(fù):Flow.transformWhile operator · Issue #2065 · Kotlin/kotlinx.cor…
官方說(shuō)不會(huì)提供 takeUntil() 方法。因?yàn)?Kotlin Flow 的設(shè)計(jì)原則是“簡(jiǎn)單”,只提供必要的和高度靈活性的方法,以便自定義。Kotlin Flow 中以transform開(kāi)頭的方法都是高度靈活的,它們通常用來(lái)定義其他操作符。在Kotlin 異步 | Flow 應(yīng)用場(chǎng)景及原理中分析過(guò)Flow.transform()方法的靈活性?,F(xiàn)在來(lái)看下transformWhile():
public fun <T, R> Flow<T>.transformWhile(
transform: suspend FlowCollector<R>.(value: T) -> Boolean // 這 lambda 帶有數(shù)據(jù)發(fā)射能力
): Flow<R> =
safeFlow {
// 進(jìn)行有條件的轉(zhuǎn)發(fā)流數(shù)據(jù),條件即是 transform
return@safeFlow collectWhile { value ->
transform(value)
}
}
// 有條件的收集流數(shù)據(jù)
internal suspend inline fun <T> Flow<T>.collectWhile(
crossinline predicate: suspend (value: T) -> Boolean
) {
// 自定義流收集器,描述如何發(fā)射數(shù)據(jù)
val collector = object : FlowCollector<T> {
override suspend fun emit(value: T) {
// 當(dāng)滿足條件時(shí)才發(fā)射數(shù)據(jù),否則丟棄流往后的數(shù)據(jù)
if (!predicate(value)) {
throw AbortFlowException(this)
}
}
}
try {
collect(collector)// 收集上游流并通過(guò)自定義的方式轉(zhuǎn)發(fā)給下游
} catch (e: AbortFlowException) {
e.checkOwnership(collector)
}
}
transformWhile() 的套路依然是攔截轉(zhuǎn)發(fā)機(jī)制,即新建下游流,它生產(chǎn)數(shù)據(jù)的方式是通過(guò)收集上游數(shù)據(jù),并將數(shù)據(jù)轉(zhuǎn)發(fā)到一個(gè)帶有發(fā)射數(shù)據(jù)能力的 lambda 中,當(dāng)前這個(gè) lambda 需要有一個(gè)返回值,該值決定了是否要終止上游流數(shù)據(jù)的生產(chǎn)。
現(xiàn)在的問(wèn)題轉(zhuǎn)化為,如何讓網(wǎng)絡(luò)流告訴數(shù)據(jù)庫(kù)流“我已經(jīng)生成數(shù)據(jù)了你歇菜吧~”
“流的通信”,聽(tīng)上去有點(diǎn)高大上,但轉(zhuǎn)念一想,是我把問(wèn)題想復(fù)雜了。因?yàn)榫W(wǎng)絡(luò)和數(shù)據(jù)庫(kù)流已經(jīng)在 ViewModel 層合流了,它們并成一個(gè)流了,流動(dòng)的是List<News>,在這個(gè)數(shù)據(jù)結(jié)構(gòu)上套一層就能實(shí)現(xiàn)所謂的“流通信”:
// 新聞流包裝類(lèi)
data class NewsFlowWrapper(
val news: List<News>,// 新聞列表
val abort: Boolean // 是否中斷流
)
用 NewsFlowWrapper 改造下 NewsRepo:
class NewsRepo(context: Context) {
val localNewsFlow = flow {
val news = newsDao.queryNewsSuspend()
val newsList = news.map { it.convert() }
// 使用 NewsFlowWrapper 包裝數(shù)據(jù)庫(kù)流
emit(NewsFlowWrapper(newsList, false))
}
fun remoteNewsFlow(page: Int, count: Int) =
suspend { newApi.fetchNews(mapOf("page" to page, "count" to count)) }
.asFlow()
.map { newsBean ->
if (newsBean.code == 200) {
if (!newsBean.result.isNullOrEmpty()) {
newsDao.deleteAllNews()
newsDao.insertAll(newsBean.result.map { it.toNews() })
// 網(wǎng)絡(luò)請(qǐng)求成功時(shí),中斷流
NewsFlowWrapper(newsBean.result, true)
} else {
NewsFlowWrapper(emptyList(), false)
}
} else {
throw Exception(newsBean.message)
}
}
}
接著用 transformWhile() 改造一下 ViewModel 層的合流:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
fun newsFlow(type: Int, count: Int) =
flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
.flattenMerge()
.transformWhile {
emit(it.news)// 總是直接轉(zhuǎn)發(fā)上游數(shù)據(jù) ,直到 abort 為 true
!it.abort
}
.map { NewsModel(it, false) }
}
就這樣自定義了一個(gè)新操作符用于流通信。
在討論到用 Kotlin Flow 取代 RxJava 的時(shí)候,有一種聲音說(shuō)“相比 RxJava,Kotlin Flow 的操作符還很匱乏,有待豐富~”。我倒是覺(jué)得這是 RxJava 的劣勢(shì),Kotlin Flow 的優(yōu)勢(shì)。RxJava 讓人最望而卻步的正是因?yàn)閺?fù)雜性,品種繁多的“流”、琳瑯滿目的操作符、以及 Rx 版的回調(diào)地獄。Kotlin Flow 的策略是簡(jiǎn)單 + 高靈活性。
這樣一來(lái),用 Flow 重構(gòu)的數(shù)據(jù)鏈路上,Repository 和 ViewModel 的界限就很清晰了:Repository 提供原始的數(shù)據(jù)流,以供 ViewModel 用各種自己喜歡的方式進(jìn)行合流及變換。
異步化
若直接在界面中收集上述新聞流的話,程序會(huì) crash,提示不能在主線程操作數(shù)據(jù)庫(kù)。
所有在流中的操作,默認(rèn)情況下都是執(zhí)行在主線程的。
將流中的操作異步化也很簡(jiǎn)單:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
fun newsFlow(type: Int, count: Int) =
flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
.flattenMerge()
.transformWhile {
emit(it.news)
!it.abort
}
.map { NewsModel(it, false) }
.flowOn(Dispatchers.IO) // 將所有上游操作都分發(fā)到 IO 線程執(zhí)行
}
在 LiveData 承載數(shù)據(jù)鏈路的版本中,需自行啟動(dòng)線程池執(zhí)行數(shù)據(jù)庫(kù)操作(網(wǎng)絡(luò)操作的異步化由OkHttp實(shí)現(xiàn))。
當(dāng)用 Flow 組織數(shù)據(jù)庫(kù)流和網(wǎng)絡(luò)流時(shí),只需一個(gè)方法就能實(shí)現(xiàn)異步化,無(wú)疑大大地降低了復(fù)雜度。
捕獲異常
使用catch()可以捕獲所有上游拋出的異常:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
fun newsFlow(type: Int, count: Int) =
flowOf(newsRepo.localNewsOneShotFlow, newsRepo.remoteNewsFlow(type, count))
.flattenMerge()
.transformWhile {
emit(it.news)
!it.abort
}
.map { NewsModel(it, false) }
.flowOn(Dispatchers.IO)
.catch {
// 捕獲自定義異常并向流發(fā)送消息
if (it is YourException)
emit(NewsModel(emptyList(),false,"network error,show old news"))
}
靈活的是,在捕獲異常之后還可以繼續(xù)向流發(fā)送數(shù)據(jù)。比如當(dāng)網(wǎng)絡(luò)異常時(shí),向界面發(fā)送一個(gè)帶有 errorMessage 的 Model,界面根據(jù)此字段決定是否展示錯(cuò)誤 toast。也可以在這里處理和服務(wù)端約定的特殊錯(cuò)誤碼。
感知生命周期
流準(zhǔn)備地差不多了,下一步就是讓界面收集流并刷新:
class NewsActivity : AppCompatActivity() {
private val newsViewModel by lazy {
ViewModelProvider(
this,
NewsViewModelFactory(NewsRepo(this))
)[NewsViewModel::class.java]
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// 收集新聞流并展示
lifecycleScope.launch {
newsViewModel.newsFlow(1, 8).collect { showNews(it) }
}
}
// 這樣刷新界面是 MVI 提倡的
private fun showNews(newsModel: NewsModel) {
when {
// 展示 loading
newsModel.loading -> {
showLoading()
}
newsModel.errorMessage.isEmpty() -> {
dismissLoading()
// 將新聞?wù)故驹?RecyclerView 上
}
// 展示錯(cuò)誤提示
else -> {
dismissLoading()
showErrorMessage(newsModel.errorMessage)
}
}
}
}
其中展示/解散 loading 的方法定義如下:
// 展示 loading
fun Activity.showLoading() {
contentView()?.apply {
ProgressBar {
layout_id = "pb"
layout_width = 50
layout_height = 50
layout_gravity = gravity_center
}
}
}
// 解散 loading
fun Activity.dismissLoading() {
val pb = contentView()?.find<ProgressBar>("pb")
pb?.let { contentView()?.removeView(it) }
}
// 獲取 Activity 的 content view
fun Activity.contentView(): FrameLayout? =
takeIf { !isFinishing && !isDestroyed }?.window?.decorView?.findViewById(android.R.id.content)
展示 loading 即向當(dāng)前 Activity 的 contentView 添加一個(gè)子 View,解散 loading 即是移除該子 View。其中使用了 DSL 聲明式地構(gòu)建了界面,詳細(xì)介紹可以點(diǎn)擊Android性能優(yōu)化 | 把構(gòu)建布局用時(shí)縮短 20 倍(下)。
這樣寫(xiě)會(huì)有一個(gè)坑,若新聞流因?yàn)楦鞣N原因遲遲未生成新聞列表,此時(shí)用戶切換到另一個(gè)頁(yè)面,不久后新聞流有數(shù)據(jù)了,數(shù)據(jù)被推到界面,就發(fā)生了 crash,因?yàn)橐⑿碌慕缑嬉巡辉偾芭_(tái)。
lifecycleScope
剛才是在lifecycleScope收集新聞流的,它是一個(gè)和生命周期對(duì)象綁定的協(xié)程域:
public val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
get() = lifecycle.coroutineScope
public val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
// 獲取現(xiàn)有 lifecycleScope
val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
if (existing != null) {
return existing
}
// 若沒(méi)有現(xiàn)成的,則構(gòu)建
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
// 并通過(guò) cas + 自旋的方式保證存入 mInternalScopeRef
if (mInternalScopeRef.compareAndSet(null, newScope)) {
// 開(kāi)始觀察生命周期變化
newScope.register()
return newScope
}
}
}
lifecycleScope 是一個(gè)LifecycleCoroutineScope實(shí)例,并以 Lifecycle 對(duì)象的擴(kuò)展屬性存在。之所以能這樣做是因?yàn)?Lifecycle 開(kāi)了后門(mén):
public abstract class Lifecycle {
// 后門(mén),方便在類(lèi)的外存取“附加值”
AtomicReference<Object> mInternalScopeRef = new AtomicReference<>();
}
這種動(dòng)態(tài)為類(lèi)新增屬性的方法,在 Kotlin 源碼中很常見(jiàn)。
新建 LifecycleCoroutineScope 實(shí)例后,會(huì)當(dāng)場(chǎng)調(diào)用 register() 方法觀察生命周期變化:
internal class LifecycleCoroutineScopeImpl(
override val lifecycle: Lifecycle,
override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
fun register() {
launch(Dispatchers.Main.immediate) {
// 開(kāi)始觀察生命周期
if (lifecycle.currentState >= Lifecycle.State.INITIALIZED) {
lifecycle.addObserver(this@LifecycleCoroutineScopeImpl)
} else {
coroutineContext.cancel()
}
}
}
override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
// 當(dāng)生命周期為 DESTROYED 時(shí),取消觀察并取消協(xié)程中 job 的執(zhí)行
if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
lifecycle.removeObserver(this)
coroutineContext.cancel()
}
}
}
lifecycleScope.launch() 會(huì)立刻啟動(dòng)協(xié)程,并在生命周期 DESTROYED 時(shí)取消協(xié)程。
當(dāng) Activity 被另一個(gè) Activity 遮擋時(shí)并不會(huì) DESTROYED,所以此時(shí)若有流數(shù)據(jù)推過(guò)來(lái)還是可以更新到界面,并導(dǎo)致 crash。
flowWithLifecycle()
為此官方提供了flowWithLifecycle():
public fun <T> Flow<T>.flowWithLifecycle(
lifecycle: Lifecycle,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED
): Flow<T> = callbackFlow {
lifecycle.repeatOnLifecycle(minActiveState) {
this@flowWithLifecycle.collect {
send(it)
}
}
close()
}
flowWithLifecycle() 內(nèi)部生成了一個(gè)中間消費(fèi)者callbackFlow,中間消費(fèi)者會(huì)將上游數(shù)據(jù)轉(zhuǎn)發(fā)給下游,不過(guò)是有條件的,只有當(dāng)生命周期滿足要求時(shí)才會(huì)轉(zhuǎn)發(fā)。
其中的 repeatOnLifecycle() 是 Lifecycle 的擴(kuò)展方法:
public suspend fun Lifecycle.repeatOnLifecycle(
state: Lifecycle.State,
block: suspend CoroutineScope.() -> Unit
) { ... }
repeatOnLifecycle() 會(huì)在新的協(xié)程執(zhí)行 block,當(dāng)且僅當(dāng)生命周期至少達(dá)到 state 狀態(tài),若生命周期未達(dá)標(biāo),則會(huì)取消 block 執(zhí)行,若再次達(dá)標(biāo),則再次執(zhí)行。
讓 Flow 感知生命周期的寫(xiě)法如下:只有當(dāng)生命周期滿足要求時(shí),才收集上游并轉(zhuǎn)發(fā)給下游,否則取消收集:
class NewsActivity : AppCompatActivity() {
private val newsViewModel by lazy {
ViewModelProvider(
this,
NewsViewModelFactory(NewsRepo(this))
)[NewsViewModel::class.java]
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// 以感知生命周期的方式收集新聞流
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
newsViewModel.newsFlow(1, 8).collect { showNews(it) }
}
}
}
}
嵌套回調(diào)出現(xiàn)了,看上去有點(diǎn)復(fù)雜。 還好有擴(kuò)展方法,可以把這些細(xì)節(jié)隱藏起來(lái):
// 用感知生命周期的方式收集流
fun <T> Flow<T>.collectIn(
lifecycleOwner: LifecycleOwner,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
action: (T) -> Unit
): Job = lifecycleOwner.lifecycleScope.launch {
flowWithLifecycle(lifecycleOwner.lifecycle, minActiveState).collect(action)
}
然后就可以像這樣在界面中收集新聞流:
class NewsActivity : AppCompatActivity() {
private val newsViewModel by lazy {
ViewModelProvider(
this,
NewsViewModelFactory(NewsRepo(this))
)[NewsViewModel::class.java]
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
newsViewModel.newsFlow(1, 8).collectIn(this) { showNews(it) }
}
}
超簡(jiǎn)潔,把 LiveData 又比下去了~
這個(gè)方法需注意調(diào)用順序,當(dāng)不滿足生命周期時(shí),它只會(huì)取消訂閱上游的數(shù)據(jù),若下游還有另一流在生成數(shù)據(jù),則無(wú)法感知生命周期。(封裝的collectIn()保證了它是收集數(shù)據(jù)前的最后一個(gè)操作符)
避免重復(fù)觸發(fā)冷流
按照上面的寫(xiě)法,還是有問(wèn)題。當(dāng)從新聞界面跳轉(zhuǎn)到另一個(gè)界面再返回時(shí),會(huì)重新查數(shù)據(jù)庫(kù),重新請(qǐng)求網(wǎng)絡(luò)。。。
因?yàn)?Repository 提供的數(shù)據(jù)庫(kù)和網(wǎng)絡(luò)流都是“冷流”。冷流只有被收集之后才會(huì)生產(chǎn)數(shù)據(jù),且冷流是沒(méi)有地方存數(shù)據(jù)的,當(dāng)數(shù)據(jù)從上游經(jīng)過(guò)若干個(gè)中間消費(fèi)者最后傳遞給訂閱者,數(shù)據(jù)被展示在界面上,但整個(gè)數(shù)據(jù)鏈路上沒(méi)有一個(gè)地方把數(shù)據(jù)存了下了。
又因?yàn)槭褂昧?code>repeatOnLifecycle(Lifecycle.State.STARTED),所以從另一個(gè)界面返回時(shí),重新訂閱了冷流,那它就毫不留情地開(kāi)始重新生產(chǎn)數(shù)據(jù)。
SharedFlow
對(duì)于這種場(chǎng)景,解決方案是:讓冷流共享,即多個(gè)訂閱也不會(huì)觸發(fā)冷流重新生產(chǎn)數(shù)據(jù),最好能讓冷流的數(shù)據(jù)被緩存,這樣就能將最新的數(shù)據(jù)粘性地傳遞給新訂閱者。
SharedFlow由此而生:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
val newsFlow(type: Int, count: Int) =
flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
.flattenMerge()
.transformWhile {
emit(it.news)
!it.abort
}
.map { NewsModel(it, false) }
.flowOn(Dispatchers.IO)
.catch {
if (it is YourException)
emit(NewsModel(emptyList(),false,"network error,show old news"))
}
// 將流轉(zhuǎn)換為 SharedFlow
.shareIn(viewModelScope, SharingStarted.Lazily)
}
使用shareIn()將冷流轉(zhuǎn)換成共享熱流:
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,// 啟動(dòng)策略
replay: Int = 0 // 緩存大小,默認(rèn)不緩存(非粘性)
): SharedFlow<T> {...}
shareIn 是 Flow 的擴(kuò)展方法:
-
started參數(shù)是啟動(dòng)策略,它決定了上游流的生命周期,SharingStarted.Lazily適用于當(dāng)前的場(chǎng)景,即當(dāng)共享熱流有訂閱者時(shí)才啟動(dòng)上游流,上游流將一直存活著。 -
replay參數(shù)決定了緩存的大小,若為1,表示會(huì)緩存最新的1個(gè)值,當(dāng)有新訂閱者,會(huì)將緩存值分發(fā)給它,實(shí)現(xiàn)粘性效果(同 LiveData)。默認(rèn)為0不緩存。
可以把 SharedFlow 想象成一個(gè)中間消費(fèi)者,它收集上游流的數(shù)據(jù)并將其推送到熱流中,然后將這些數(shù)據(jù)緩存并分享給所有的下游訂閱者。
StateFlow
StateFlow 是一個(gè)特別的 SharedFlow,它是 Kotlin Flow 中更像 LiveData 的存在。因?yàn)椋?/p>
- StateFlow 總是會(huì)緩存1個(gè)最新的數(shù)據(jù),上游流產(chǎn)生新數(shù)據(jù)后就會(huì)覆蓋舊值(LiveData 也是)。
- StateFlow 持有一個(gè) value 字段,可通過(guò)
stateFlow.value讀取最新值(LiveData 也是)。 - StateFlow 是粘性的,會(huì)將緩存的最新值分發(fā)給新訂閱者(LiveData 也是)。
- StateFlow 必須有一個(gè)初始值(LiveData 不是)。
- StateFlow 會(huì)過(guò)濾重復(fù)值,即新值和舊值相同時(shí)不更新。(LiveData 不是)。
可以使用stateIn()重寫(xiě)新聞流:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
val newsFlow(type: Int, count: Int) =
flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
.flattenMerge()
.transformWhile {
emit(it.news)
!it.abort
}
.map { NewsModel(it, false) }
.flowOn(Dispatchers.IO)
.catch {
if (it is YourException)
emit(NewsModel(emptyList(),false,"network error,show old news"))
}
// 將流轉(zhuǎn)換為 StateFlow
.stateIn(viewModelScope, SharingStarted.Lazily, NewsModel(emptyList(), true))
}
stateIn() 中的第三個(gè)參數(shù)就是必須有的初始值,當(dāng) Repository 的原始數(shù)據(jù)流未生成數(shù)據(jù)時(shí),初始值就已經(jīng)推送給了訂閱者,界面可以借此展示 loading。
若使用 shareIn(),則可以這樣展示 loading:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
val newsFlow(type: Int, count: Int) =
flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count))
.flattenMerge()
.transformWhile {
emit(it.news)
!it.abort
}
.map { NewsModel(it, false) }
.flowOn(Dispatchers.IO)
.onStart { emit(NewsModel(emptyList(), true)) }// 展示loading
.catch {
if (it is YourException)
emit(NewsModel(emptyList(),false,"network error,show old news"))
}
// 將流轉(zhuǎn)換為 SharedFlow
.shareIn(viewModelScope, SharingStarted.Lazily)
}
使用onStart(),它會(huì)在流被收集時(shí)立刻發(fā)生一個(gè)數(shù)據(jù)。
到底使用 StateFlow 還是 SharedFlow?得看場(chǎng)景:
- 當(dāng)需在流以外的地方訪問(wèn)流的最新值,則用 StateFlow。
- 當(dāng)需過(guò)濾重復(fù)值,則用 StateFlow(在 SharedFlow 上用 distinctUntilChanged() 效果相同)。
- 在需粘性的場(chǎng)景下,則用 StateFlow(將 SharedFlow 的 replay 置為1效果相同)。
我試圖找到更多使用 StateFlow 的理由,但就像你看到的那樣,大部分理由都不充分。只有第一個(gè)場(chǎng)景下,必用 StateFlow 不可。其他都可用 SharedFlow 代替,而且后者提供了更大的靈活性。
MVI 化
上面的代碼已經(jīng)比較接近 MVI 的模樣了。
MVI 有三個(gè)關(guān)鍵詞:響應(yīng)式編程 + 單向數(shù)據(jù)流 + 唯一可信數(shù)據(jù)源。
現(xiàn)援引“單向數(shù)據(jù)流”圖片如下:

界面產(chǎn)生的數(shù)據(jù)叫事件(意圖)Intent,它流向 ViewModel,經(jīng)加工后轉(zhuǎn)換成 狀態(tài)State供界面刷新。
sealed class FeedsIntent {
// Feeds 初始化
data class InitIntent(val type: Int, val count: Int) : FeedsIntent()
// Feeds 加載更多
data class MorePageIntent(val timestamp: Long, val count: Int) : FeedsIntent()
// 刪除某個(gè)帖子
data class RemoveIntent(val id: Long) : FeedsIntent()
}
原本界面發(fā)起的事件是通過(guò) ViewModel 的一個(gè)方法調(diào)用傳遞的。為了使用響應(yīng)式編程形成數(shù)據(jù)流,得把函數(shù)調(diào)用用“數(shù)據(jù)”的形式包裝起來(lái)。
事件產(chǎn)生自界面,所以事件流理所當(dāng)然在界面組織:
class StateFlowActivity : AppCompatActivity() {
private val refreshLayout: RefreshLayout
// 在界面層組織事件流
private val intents by lazy {
merge(
// 加載 Feeds 首頁(yè)事件
flowOf(FeedsIntent.InitIntent(1, 5))
// 加載更多 Feeds 事件
loadMoreFeedsFlow()
)
}
private fun loadMoreFeedsFlow(): Flow<FeedsIntent> = callbackFlow {
refreshLayout.setOnRefreshListener {
trySend(FeedsIntent.MorePageIntent)
}
awaitClose()
}
}
上述代碼包含了兩個(gè)事件,分別是加載首頁(yè)和加載更多,它倆都被組織成流,并使用 merge 進(jìn)行合流,merge 會(huì)將每個(gè) Flow 中的數(shù)據(jù)合起來(lái)并發(fā)地轉(zhuǎn)發(fā)到一個(gè)新的流上。
當(dāng)流被訂閱后,加載首頁(yè)的事件會(huì)立刻產(chǎn)生并無(wú)條件的分發(fā)給下游,而加載更多事件需等待上拉動(dòng)作發(fā)生時(shí)才會(huì)生成。
class StateFlowActivity : AppCompatActivity() {
private val newsViewModel by lazy {
ViewModelProvider(
this,
NewsViewModelFactory(NewsRepo(this))
)[NewsViewModel::class.java]
}
private val intents by lazy {
merge(
flowOf(FeedsIntent.InitIntent(1, 5))
loadMoreFeedsFlow()
)
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// 訂閱事件流,將事件傳遞給 ViewModel
intents
.onEach(newsViewModel::send) // .onEach { newsViewModel.send(it) } 效果一樣
.launchIn(lifecycleScope)
}
}
在 onCreate() 訂閱事件流,每產(chǎn)生一個(gè)事件都會(huì)調(diào)用 NewsViewModel.send() 方法將事件傳遞給 ViewModel。其中::用于將一個(gè)方法變?yōu)?lambda,方法就可以作為參數(shù)傳給另一個(gè)方法,以簡(jiǎn)化代碼。
NewsViewModel.send() 方法定義如下:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
// 用于接收界面事件的共享流
private val _feedsIntent = MutableSharedFlow<FeedsIntent>()
// 界面事件唯一入口,向流中發(fā)送事件
fun send(intent: FeedsIntent) {
viewModelScope.launch { _feedsIntent.emit(intent) }
}
}
現(xiàn)在界面事件已經(jīng)以數(shù)據(jù)流Flow<FeedsIntent>的方式流入了 ViewModel,下一步就是在流上進(jìn)行數(shù)據(jù)變換,即流入的是 Intent,流出的是 State。遂定義一個(gè)將Flow<FeedsIntent>轉(zhuǎn)化成Flow<NewsState>的擴(kuò)展方法:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
// 將事件轉(zhuǎn)換成狀態(tài)(NewsState即是上面的NewsModel,換了個(gè)名字而已)
private fun Flow<FeedsIntent>.toNewsStateFlow(): Flow<NewsState> = merge(
// 加載首頁(yè)事件處理
filterIsInstance<FeedsIntent.InitIntent>()
.flatMapConcat { it.toFetchInitPageFlow() },
// 刪除帖子事件處理
filterIsInstance<FeedsIntent.RemoveIntent>()
.flatMapConcat { ... },
// 加載更多事件處理
filterIsInstance<FeedsIntent.MorePageIntent>()
.flatMapConcat { ... }
)
}
每一個(gè)上游的FeedsIntent都會(huì)在這里被轉(zhuǎn)換成一個(gè)Flow<NewsState>,就形成了Flow<Flow<NewsState>>這樣的結(jié)構(gòu),然后用 flatMapConcat() 將其展平變成Flow<NewsState>。
由于有多種事件,遂使用 filterIsInstance() 按事件類(lèi)型過(guò)濾,實(shí)現(xiàn)了事件分流,即是用流的方式寫(xiě) if-else。
其中toFetchInitPageFlow()描述了如何將加載首頁(yè)事件轉(zhuǎn)換成Flow<NewsState>:
// NewsViewModel.kt
private fun FeedsIntent.InitIntent.toFetchInitPageFlow() =
flowOf(
newsRepo.localNewsOneShotFlow,
newsRepo.remoteNewsFlow(this.type, this.count)
)
.flattenMerge()
.transformWhile {
emit(it.news)
!it.abort
}
.map { NewsState(it, false) }
.onStart { emit(NewsState(emptyList(), true)) }
.catch {
if (it is SSLHandshakeException)
emit(
NewsState(
emptyList(),
false,
"network error,show old news"
)
)
}
轉(zhuǎn)化的方法即是拉取數(shù)據(jù)庫(kù)以及網(wǎng)絡(luò)(就是把之前定義好的數(shù)據(jù)庫(kù)網(wǎng)絡(luò)合流拿過(guò)來(lái))。
是時(shí)候把事件流以及它的變換操作合起來(lái)了:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() {
// 事件流
private val _feedsIntent = MutableSharedFlow<FeedsIntent>()
// 狀態(tài)流
val newsState =
_feedsIntent
.toNewsStateFlow() // 將事件流轉(zhuǎn)換成狀態(tài)流
.flowOn(Dispatchers.IO) // 異步地進(jìn)行變換操作
.shareIn(viewModelScope, SharingStarted.Eagerly) // 將流轉(zhuǎn)換成共享流以供界面訂閱
}
最后界面觀察狀態(tài)流:
class StateFlowActivity : AppCompatActivity() {
private val newsViewModel by lazy {
ViewModelProvider(
this,
NewsViewModelFactory(NewsRepo(this))
)[NewsViewModel::class.java]
}
// 組織界面事件
private val intents by lazy {
merge(
flowOf(FeedsIntent.InitIntent(1, 5))
loadMoreFeedsFlow()
)
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// 數(shù)據(jù)流起點(diǎn):發(fā)送事件
intents
.onEach(newsViewModel::send)
.launchIn(lifecycleScope)
// 數(shù)據(jù)流終點(diǎn):消費(fèi)狀態(tài)
newsViewModel.newsState
.collectIn(this) { showNews(it) }
}
// 渲染界面
private fun showNews(newsModel: NewsState) {
when {
newsModel.loading -> {
showLoading()
}
newsModel.errorMessage.isEmpty() -> {
dismissLoading()
newsAdapter.news = newsModel.news
rvNews.adapter = newsAdapter
}
else -> {
dismissLoading()
tv.text = newsModel.errorMessage
}
}
}
}
(這里的 MVI 是一個(gè)半成品,比如該代碼結(jié)構(gòu)就無(wú)法實(shí)現(xiàn)“上拉加載更多”這個(gè)需求,后續(xù)文章會(huì)在此基礎(chǔ)上做重構(gòu),歡迎持續(xù)關(guān)注~)
LiveData vs Flow
對(duì)于承載數(shù)據(jù)來(lái)說(shuō),Kotlin Flow 相較于 LiveData 只能說(shuō)有過(guò)之而無(wú)不及:
- LiveData 不能方便地支持異步化。
- LiveData 粘性問(wèn)題的解決方案雖然很多,但用起來(lái)都很變扭。
- LiveData 可能發(fā)生數(shù)據(jù)丟失的情況。
- LiveData 的數(shù)據(jù)變換能力遠(yuǎn)遠(yuǎn)不如 Flow。
- LiveData 多數(shù)據(jù)源的合流能力遠(yuǎn)遠(yuǎn)不如 Flow。
除此之外,F(xiàn)low 還有一點(diǎn)非常吸引人,那就是 簡(jiǎn)潔,F(xiàn)low 可以用及其輕松簡(jiǎn)單的方式實(shí)現(xiàn)復(fù)雜的效果,代碼的復(fù)雜度斗降,可讀性斗升。更重要的是,這是大勢(shì)所趨,還在猶豫什么~