本文章已授權(quán)微信公眾號(hào)鴻洋(hongyangAndroid)轉(zhuǎn)載。
本文章講解的內(nèi)容是在在Android中使用協(xié)程以及協(xié)程源碼分析。
在說(shuō)協(xié)程之前,我先說(shuō)下線(xiàn)程和線(xiàn)程池:
線(xiàn)程是操作系統(tǒng)的內(nèi)核資源,是CPU調(diào)度的最小單位,所有的應(yīng)用程序都運(yùn)行在線(xiàn)程上,它是我們實(shí)現(xiàn)并發(fā)和異步的基礎(chǔ)。在Java的API中,Thread是實(shí)現(xiàn)線(xiàn)程的基礎(chǔ)類(lèi),每創(chuàng)建一個(gè)Thread對(duì)象,操作系統(tǒng)內(nèi)核就會(huì)啟動(dòng)一個(gè)線(xiàn)程,在Thread的源碼中,它的內(nèi)部實(shí)現(xiàn)是大量的JNI調(diào)用,因?yàn)?strong>線(xiàn)程的實(shí)現(xiàn)必須由操作系統(tǒng)提供直接支持,在Linux操作系統(tǒng)中,每一個(gè)Java thread對(duì)應(yīng)一個(gè)native thread,它們是一一對(duì)應(yīng)的,在Android中,創(chuàng)建Thread的過(guò)程中都會(huì)調(diào)用Linux API中的pthread_create函數(shù)。
線(xiàn)程的調(diào)用會(huì)有存在以下問(wèn)題:
- 線(xiàn)程不是輕量級(jí)資源,大量創(chuàng)建線(xiàn)程會(huì)消耗系統(tǒng)大量資源,傳統(tǒng)的阻塞調(diào)用會(huì)導(dǎo)致系統(tǒng)存在大量的因?yàn)?strong>阻塞而不能運(yùn)行的線(xiàn)程,這很浪費(fèi)系統(tǒng)資源。
- 線(xiàn)程阻塞狀態(tài)和運(yùn)行狀態(tài)的切換會(huì)存在相當(dāng)大的開(kāi)銷(xiāo),一直以來(lái)都是個(gè)優(yōu)化點(diǎn),例如:JVM在運(yùn)行時(shí)會(huì)對(duì)鎖進(jìn)行優(yōu)化,就像自旋鎖、鎖粗化和鎖消除等等。
線(xiàn)程池(Thread Pool)是一種基于池化思想管理線(xiàn)程的工具,使用線(xiàn)程池有如下好處:
- 降低資源消耗:通過(guò)池化技術(shù)重復(fù)利用已創(chuàng)建的線(xiàn)程,降低線(xiàn)程的創(chuàng)建和銷(xiāo)毀的損耗。
- 提高響應(yīng)速度:任務(wù)到達(dá)時(shí),無(wú)需等待線(xiàn)程創(chuàng)建即可立即執(zhí)行。
- 提高線(xiàn)程的可管理性:線(xiàn)程是稀缺資源,如果無(wú)限制創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)因?yàn)?strong>線(xiàn)程的不合理分布導(dǎo)致資源調(diào)度失衡,降低系統(tǒng)的穩(wěn)定性,使用線(xiàn)程池可以進(jìn)行統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控。
- 提供更多更強(qiáng)大的功能:線(xiàn)程池具備可拓展性,允許開(kāi)發(fā)人員向其中增加更多的功能。
那協(xié)程與線(xiàn)程有什么關(guān)系呢?在Java中,協(xié)程是基于線(xiàn)程池的API,它并沒(méi)有脫離Java或者Kotlin已經(jīng)有的東西。
協(xié)程的定義
協(xié)程源自Simula和Modula-2語(yǔ)言,它是一種編程思想,并不局限于特定的語(yǔ)言,在1958年的時(shí)候,Melvin Edward Conway提出這個(gè)術(shù)語(yǔ)并用于構(gòu)建匯編程序。在Android中使用它可以簡(jiǎn)化異步執(zhí)行的代碼,它是在版本1.3中添加到Kotlin。
協(xié)程的使用
下面來(lái)介紹如何使用協(xié)程:
依賴(lài)
要使用協(xié)程,需要在build.gradle文件中添加如下依賴(lài):
項(xiàng)目根目錄的build.gradle文件:
// build.gradle(AndroidGenericFramework)
ext {
// 省略部分代碼
kotlinxCoroutinesVersion = '1.3.1'
// 省略部分代碼
}
module的build.gradle文件:
// build.gradle(:app)
dependencies {
// 省略部分代碼
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlinxCoroutinesVersion"
// 省略部分代碼
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:$kotlinxCoroutinesVersion"
// 省略部分代碼
}
- org.jetbrains.kotlinx:kotlinx-coroutines-core:協(xié)程的核心庫(kù),它是協(xié)程的公共API,有了這一層公共代碼才能使協(xié)程在各個(gè)平臺(tái)的接口得到統(tǒng)一。
- org.jetbrains.kotlinx:kotlinx-coroutines-android:協(xié)程的當(dāng)前平臺(tái)對(duì)應(yīng)的平臺(tái)庫(kù),當(dāng)前平臺(tái)是Android,它是協(xié)程在具體平臺(tái)的具體實(shí)現(xiàn),因?yàn)轭?lèi)似多線(xiàn)程在各個(gè)平臺(tái)的實(shí)現(xiàn)方式是有差異的。
- org.jetbrains.kotlinx:kotlinx-coroutines-test:協(xié)程的測(cè)試庫(kù),它方便我們?cè)?strong>測(cè)試中使用協(xié)程。
這里要注意的是,這三個(gè)庫(kù)的版本要保持一致。
基礎(chǔ)
下面是協(xié)程的基礎(chǔ)部分:
啟動(dòng)協(xié)程
可以通過(guò)以下兩種方式來(lái)啟動(dòng)協(xié)程:
- launch:可以啟動(dòng)新協(xié)程,但是不將結(jié)果返回給調(diào)用方。
- async:可以啟動(dòng)新協(xié)程,并且允許使用await暫停函數(shù)返回結(jié)果。
通常我們使用launch函數(shù)從常規(guī)函數(shù)啟動(dòng)新協(xié)程,如果要執(zhí)行并行分解的話(huà)才使用async函數(shù)。
async函數(shù)可以返回結(jié)果,代碼如下所示:
// Builders.common.kt
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
async函數(shù)返回的是Deferred接口,繼承Job接口,它是一個(gè)非阻塞、可取消的future。
要注意的是launch函數(shù)和async函數(shù)以不同的方式處理異常,在使用async函數(shù)時(shí)候可以調(diào)用await函數(shù)得到結(jié)果,如果出現(xiàn)異常將會(huì)以靜默方式拋出,也就是說(shuō)不會(huì)出現(xiàn)在崩潰指標(biāo)中,也不會(huì)在logcat中注明。
await函數(shù)是針對(duì)單個(gè)協(xié)程的,awaitAll函數(shù)是針對(duì)多個(gè)協(xié)程的,它們都能保證這些協(xié)程在返回結(jié)果之前完成。
通常協(xié)程有三種方式創(chuàng)建,如下所示:
runBlocking
使用runBlocking頂層函數(shù)來(lái)創(chuàng)建協(xié)程,這種方式是線(xiàn)程阻塞的,適用于單元測(cè)試,一般業(yè)務(wù)開(kāi)發(fā)不會(huì)使用這種,示例代碼如下所示:
runBlocking {
login()
}
runBlocking函數(shù)源碼如下所示:
// Builders.kt
@Throws(InterruptedException::class)
// 第一個(gè)參數(shù)context是協(xié)程上下文,默認(rèn)值為EmptyCoroutineContext,第二個(gè)參數(shù)是帶有CoroutineScope接受者對(duì)象,不接受任何參數(shù)返回T的Lambda表達(dá)式
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// 如果沒(méi)有指定調(diào)度程序(dispatcher),就創(chuàng)建或者使用私有事件循環(huán)
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// 看看上下文(context)的攔截器(interceptor)是否是一個(gè)我們將要使用的事件循環(huán)(用來(lái)支持TestContext)或者如果存在thread-local事件循環(huán),就使用它來(lái)避免阻塞,不過(guò)它不會(huì)去新建一個(gè)
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
GlobalScope
使用GlobalScope單例對(duì)象,并且調(diào)用launch函數(shù)來(lái)創(chuàng)建協(xié)程,這種方式不會(huì)阻塞線(xiàn)程,但是不推薦在Android中使用這種方式,因?yàn)樗?strong>生命周期是整個(gè)應(yīng)用程序的生命周期,如果處理不好,容易導(dǎo)致內(nèi)存泄漏,而且不能取消,示例代碼如下所示:
GlobalScope.launch {
login()
}
GlobalScope源碼如下所示:
// CoroutineScope.kt
public object GlobalScope : CoroutineScope {
/**
* Returns [EmptyCoroutineContext].
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
EmptyCoroutineContext是一個(gè)空的協(xié)程上下文。
CoroutineScope
使用CoroutineScope對(duì)象,并且調(diào)用launch函數(shù)來(lái)創(chuàng)建協(xié)程,這種方式可以通過(guò)傳入的CoroutineContext來(lái)控制協(xié)程的生命周期,推薦使用這種方式,示例代碼如下所示:
CoroutineScope(Dispatchers.IO).launch {
login()
}
Dispatchers.IO是CoroutineContext其中一種類(lèi)型,下面會(huì)講到這個(gè)。
CoroutineScope可以管理一個(gè)或者多個(gè)相關(guān)的協(xié)程,可以使用它在指定范圍內(nèi)啟動(dòng)新協(xié)程。
與調(diào)度程序不同,CoroutineScope不運(yùn)行協(xié)程。
CoroutineScope的一項(xiàng)重要功能就是在用戶(hù)離開(kāi)你應(yīng)用中的內(nèi)容區(qū)域時(shí)停止執(zhí)行協(xié)程,它可以確保所有正在運(yùn)行的操作都能正確停止。
CoroutineScope源碼如下所示:
// CoroutineScope.kt
// 參數(shù)block是帶有CoroutineScope接受者對(duì)象,不接受任何參數(shù)返回R的Lambda表達(dá)式
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R =
suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = ScopeCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
在Android中使用協(xié)程
在Android平臺(tái)上,協(xié)程有助于解決兩個(gè)主要問(wèn)題:
- 管理長(zhǎng)時(shí)間運(yùn)行的任務(wù),如果管理不當(dāng),這些任務(wù)可能會(huì)阻塞主線(xiàn)程并導(dǎo)致你的應(yīng)用界面凍結(jié)。
- 提供主線(xiàn)程安全性,或者從主線(xiàn)程安全地調(diào)用網(wǎng)絡(luò)或者磁盤(pán)操作。
管理長(zhǎng)時(shí)間運(yùn)行的任務(wù)
在Android平臺(tái)上,每個(gè)應(yīng)用都有一個(gè)用于處理界面并且管理用戶(hù)交互的主線(xiàn)程。如果你的應(yīng)用為主線(xiàn)程分配的工作太多,會(huì)導(dǎo)致界面呈現(xiàn)速度緩慢或者界面凍結(jié),對(duì)觸摸事件的響應(yīng)速度很慢,例如:網(wǎng)絡(luò)請(qǐng)求、JSON解析、寫(xiě)入或者讀取數(shù)據(jù)庫(kù)、遍歷大型列表,這些都應(yīng)該在工作線(xiàn)程完成。
協(xié)程在常規(guī)函數(shù)的基礎(chǔ)上添加了兩項(xiàng)操作,用于處理長(zhǎng)時(shí)間運(yùn)行的任務(wù)。在invoke或者call和return之外,協(xié)程添加了suspend和resume:
- suspend用于暫停執(zhí)行當(dāng)前協(xié)程,并保存所有的局部變量。
- resume用于讓已暫停的協(xié)程從其暫停處繼續(xù)執(zhí)行。
要調(diào)用suspend函數(shù),只能從其他suspend函數(shù)進(jìn)行調(diào)用,或者通過(guò)使用協(xié)程構(gòu)建器(例如:launch)來(lái)啟動(dòng)新的協(xié)程。
Kotlin使用堆棧幀來(lái)管理要運(yùn)行哪個(gè)函數(shù)以及所有的局部變量。暫停協(xié)程時(shí)會(huì)復(fù)制并保存當(dāng)前的堆棧幀以供稍后使用;恢復(fù)協(xié)程時(shí)會(huì)將堆棧幀從其保存位置復(fù)制回來(lái),然后函數(shù)再次開(kāi)始運(yùn)行。
編譯器會(huì)在編譯期間對(duì)被suspend修飾符修飾的函數(shù)進(jìn)行續(xù)體傳遞風(fēng)格(CPS)變換,它會(huì)改變suspend函數(shù)的函數(shù)簽名,我舉個(gè)例子:
await函數(shù)是個(gè)suspend函數(shù),函數(shù)簽名如下所示:
suspend fun <T> CompletableFuture<T>.await(): T
在編譯期間進(jìn)行續(xù)體傳遞風(fēng)格(CPS)變換后:
fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?
我們可以看到進(jìn)行續(xù)體傳遞風(fēng)格(CPS)變換后的函數(shù)多了一個(gè)類(lèi)型為Continuation<T>的參數(shù),Continuation代碼如下所示:
interface Continuation<in T> {
val context: CoroutineContext
fun resumeWith(result: Result<T>)
}
續(xù)體包裝了協(xié)程在掛起之后繼續(xù)執(zhí)行的代碼,在編譯過(guò)程中,一個(gè)完整的協(xié)程被分割成一個(gè)又一個(gè)續(xù)體,在await函數(shù)的掛起結(jié)束之后,它會(huì)調(diào)用參數(shù)continuation的resumeWith函數(shù)來(lái)恢復(fù)執(zhí)行await之后的代碼。
進(jìn)行續(xù)體傳遞風(fēng)格(CPS)變換后的函數(shù)返回值是Any?,這是因?yàn)檫@個(gè)函數(shù)發(fā)生變換后,它會(huì)返回一個(gè)類(lèi)型為T(mén)(返回它本身)和COROUTINE_SUSPENDED標(biāo)記的聯(lián)合類(lèi)型,因?yàn)?strong>Kotlin沒(méi)有聯(lián)合類(lèi)型語(yǔ)法,所以就使用最泛化的類(lèi)型Any?來(lái)表示,COROUTINE_SUSPENDED標(biāo)記表示的是這個(gè)suspend函數(shù)會(huì)發(fā)生事實(shí)上的掛起操作。
在下面介紹的三個(gè)調(diào)度程序,它們都會(huì)繼承CoroutineDispatcher類(lèi),源碼如下所示:
// CorountineDispatcher.kt
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
// 省略部分代碼
}
這個(gè)類(lèi)實(shí)現(xiàn)了ContinuationInterceptor接口,源碼如下所示:
// ContinuationInterceptor.kt
@SinceKotlin("1.3")
public interface ContinuationInterceptor : CoroutineContext.Element {
// 定義上下文攔截器的鍵
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
// 返回原始封裝的Continuation,從而攔截所有的恢復(fù)
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
// 初始化時(shí),為interceptContinuation返回的Continuation實(shí)例調(diào)用
public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
/* do nothing by default */
}
public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
// getPolymorphicKey專(zhuān)門(mén)用于ContinuationInterceptor的鍵
@OptIn(ExperimentalStdlibApi::class)
if (key is AbstractCoroutineContextKey<*, *>) {
@Suppress("UNCHECKED_CAST")
return if (key.isSubKey(this.key)) key.tryCast(this) as? E else null
}
@Suppress("UNCHECKED_CAST")
return if (ContinuationInterceptor === key) this as E else null
}
public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext {
// minusPolymorphicKey專(zhuān)門(mén)用于ContinuationInterceptor的鍵
@OptIn(ExperimentalStdlibApi::class)
if (key is AbstractCoroutineContextKey<*, *>) {
return if (key.isSubKey(this.key) && key.tryCast(this) != null) EmptyCoroutineContext else this
}
return if (ContinuationInterceptor === key) EmptyCoroutineContext else this
}
}
這個(gè)接口叫做續(xù)體攔截器,它負(fù)責(zé)攔截協(xié)程在恢復(fù)后執(zhí)行的代碼(也就是續(xù)體),并且將其在指定線(xiàn)程或者線(xiàn)程池恢復(fù)。
在編譯期間,每個(gè)suspend函數(shù)會(huì)被編譯成實(shí)現(xiàn)了Continuation接口的匿名類(lèi),其實(shí)調(diào)用suspend函數(shù)時(shí),并不一定會(huì)掛起協(xié)程,舉個(gè)例子:有個(gè)網(wǎng)絡(luò)請(qǐng)求的邏輯調(diào)用了await函數(shù),如果網(wǎng)絡(luò)請(qǐng)求還沒(méi)得到結(jié)果,那么協(xié)程就會(huì)被掛起,直到得到結(jié)果為止,續(xù)體攔截器只會(huì)攔截發(fā)生掛起后的掛起點(diǎn)的續(xù)體,而對(duì)于沒(méi)發(fā)生掛起的掛起點(diǎn),協(xié)程會(huì)調(diào)用resumeWith函數(shù),而不再需要續(xù)體攔截器處理。
續(xù)體攔截器會(huì)緩存攔截過(guò)的續(xù)體,并且在不需要它的時(shí)候調(diào)用releaseInterceptedContinuation函數(shù)釋放。
使用協(xié)程確保主線(xiàn)程安全
Kotlin協(xié)程使用調(diào)度程序來(lái)確定哪些線(xiàn)程用于執(zhí)行協(xié)程,所有協(xié)程都必須在調(diào)度程序中運(yùn)行,協(xié)程可以可以暫停,而調(diào)度程序負(fù)責(zé)將其恢復(fù)。
Kotlin提供了三個(gè)調(diào)度程序,可以使用它們來(lái)指定應(yīng)在何處運(yùn)行協(xié)程:
- Dispatchers.Main:使用此調(diào)度程序可在Android主線(xiàn)程上運(yùn)行協(xié)程,只能用于界面交互和執(zhí)行快速工作,例如:調(diào)用suspend函數(shù)、運(yùn)行Android界面框架操作和更新LiveData對(duì)象。
- Dispatchers.Default:此調(diào)度程序適合在主線(xiàn)程之外執(zhí)行占用大量CPU資源的工作,例如:對(duì)列表排序和解析JSON。
- Dispatchers.IO:此調(diào)度程序適合在主線(xiàn)程之外執(zhí)行磁盤(pán)或者網(wǎng)絡(luò)I/O,例如:操作數(shù)據(jù)庫(kù)(使用Room)、向文件中寫(xiě)入數(shù)據(jù)或者從文件中讀取數(shù)據(jù)和運(yùn)行任何網(wǎng)絡(luò)操作。
我們可以調(diào)用withContext函數(shù),并且傳入相應(yīng)的協(xié)程上下文(CoroutineContext)就可以調(diào)度線(xiàn)程。
withContext函數(shù)是個(gè)suspend函數(shù),它可以在不引用回調(diào)的情況下控制任何代碼行的線(xiàn)程池,因此可以將其應(yīng)用于非常小的函數(shù),示例代碼如下所示:
suspend fun getUserInfo() { // Dispatchers.Main
val data = fetchUserInfo() // Dispatchers.Main
show(data) // Dispatchers.Main
}
suspend fun fetchUserInfo() { // Dispatchers.Main
withContext(Dispatchers.IO) { // Dispatchers.IO
// 執(zhí)行網(wǎng)絡(luò)請(qǐng)求 // Dispatchers.IO
} // Dispatchers.Main
}
在示例代碼中,getUserInfo函數(shù)在主線(xiàn)程上執(zhí)行,它可以安全地調(diào)用fetchUserInfo函數(shù),在工作線(xiàn)程中執(zhí)行網(wǎng)絡(luò)請(qǐng)求,并且掛起,在withContext代碼塊執(zhí)行完成后,主線(xiàn)程上的協(xié)程就會(huì)根據(jù)fetchUserInfo函數(shù)的結(jié)果恢復(fù)后面的邏輯。
相比于回調(diào)實(shí)現(xiàn),使用withContext函數(shù)不會(huì)增加額外的開(kāi)銷(xiāo),在某些情況下,甚至優(yōu)于回調(diào)實(shí)現(xiàn),例如:某個(gè)函數(shù)執(zhí)行了很多次的網(wǎng)絡(luò)請(qǐng)求,使用外部withContext函數(shù)可以讓Kotlin停留在同一個(gè)調(diào)度程序,并且只切換一次線(xiàn)程,此外,Kotlin還優(yōu)化了Dispatchers.Default和Dispatchers.IO之間的切換,以盡可能避免線(xiàn)程切換。
要注意的是,Dispatchers.Default和Dispatchers.IO都是使用線(xiàn)程池的調(diào)度程序,它們不能保證代碼塊在同一線(xiàn)程從上到下執(zhí)行,因?yàn)樵谀承┣闆r下,Kotlin會(huì)在掛起和恢復(fù)后,將執(zhí)行工作移交給另外一個(gè)線(xiàn)程,這意味著,對(duì)于整個(gè)withContext代碼塊,線(xiàn)程局部變量并不指向同一個(gè)值。
Dispatchers.Main
源碼如下所示:
// Dispatchers.kt
public actual object Dispatchers {
// 省略部分代碼
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
// 省略部分代碼
}
然后看下MainDispatcherLoader.dispatcher,源碼如下所示:
// MainDispatchers.kt
internal object MainDispatcherLoader {
// 省略部分代碼
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
// 省略部分代碼
}
變量dispatcher為MainCoroutineDispatcher類(lèi)型,MainCoroutineDispatcher是個(gè)抽象類(lèi),然后它的其中一個(gè)實(shí)現(xiàn)類(lèi)是包裝類(lèi)(sealed class)HandlerDispatcher,也就是它的子類(lèi)肯定是在這個(gè)類(lèi)的所在的文件中,然后我找到HandlerContext這個(gè)類(lèi),這個(gè)類(lèi)是HandlerDispatcher的唯一子類(lèi),源碼如下所示:
// MainCoroutineDispatcher.kt
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
// HandlerContext的構(gòu)造函數(shù),參數(shù)handler為要傳進(jìn)來(lái)的Handler,參數(shù)name為用于調(diào)試的可選名稱(chēng)
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
// 判斷是否需要調(diào)度,參數(shù)context為CoroutineContext
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
// 判斷invokeImmediately的值或者是否是同一個(gè)線(xiàn)程
return !invokeImmediately || Looper.myLooper() != handler.looper
}
// 調(diào)度線(xiàn)程,參數(shù)context為CoroutineContext,參數(shù)block為Runnable
override fun dispatch(context: CoroutineContext, block: Runnable) {
// 調(diào)用Handler的post方法,將Runnable添加到消息隊(duì)列中,這個(gè)Runnable會(huì)在這個(gè)Handler附加在線(xiàn)程上的時(shí)候運(yùn)行
handler.post(block)
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
// 調(diào)用Handler的postDelayed方法,將Runnable添加到消息隊(duì)列中,并且在指定的時(shí)間結(jié)束后運(yùn)行
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
// 調(diào)用Handler的postDelayed方法,將Runnable添加到消息隊(duì)列中,并且在指定的時(shí)間結(jié)束后運(yùn)行
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
return object : DisposableHandle {
override fun dispose() {
// 調(diào)用Handler的removeCallbacks方法,刪除消息隊(duì)列中的Runnable
handler.removeCallbacks(block)
}
}
}
override fun toString(): String =
if (name != null) {
if (invokeImmediately) "$name [immediate]" else name
} else {
handler.toString()
}
override fun equals(other: Any?): Boolean = other is HandlerContext && other.handler === handler
override fun hashCode(): Int = System.identityHashCode(handler)
}
然后我們找下調(diào)用HandlerContext的構(gòu)造函數(shù)的地方,源碼如下所示:
// HandlerDispatcher.kt
@JvmField
@Deprecated("Use Dispatchers.Main instead", level = DeprecationLevel.HIDDEN)
internal val Main: HandlerDispatcher? = runCatching { HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main") }.getOrNull()
可以看到傳入了Looper.getMainLooper方法,也就是應(yīng)用程序的主循環(huán)程序(Main Looper),它位于應(yīng)用程序的主線(xiàn)程中。
可以看到使用了很多Handler相關(guān)的方法,也就是它還是依賴(lài)于Android的消息機(jī)制。
Dispatchers.Default
源碼如下所示:
// Dispatchers.kt
public actual object Dispatchers {
// 省略部分代碼
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
// 省略部分代碼
}
然后看下createDefaultDispatcher函數(shù),源碼如下所示:
// CoroutineContext.kt
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
這里會(huì)根據(jù)內(nèi)部變量(internal val)useCoroutinesScheduler判斷返回是DefaultScheduler還是CommonPool,useCoroutinesScheduler源碼如下所示:
// CoroutineContext.kt
internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"
internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
when (value) {
null, "", "on" -> true
"off" -> false
else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")
}
}
這個(gè)內(nèi)部變量(internal val)useCoroutinesScheduler是根據(jù)JVM的System.getProperty方法獲取的,通過(guò)傳入"kotlinx.coroutines.scheduler"作為鍵(key),返回的值為on,useCoroutinesScheduler為true;返回的值是off,useCoroutinesScheduler為false。
先看下DefaultScheduler這種情況,源碼如下所示:
// Dispatcher.kt
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
override fun close() {
throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed")
}
override fun toString(): String = DEFAULT_SCHEDULER_NAME
@InternalCoroutinesApi
@Suppress("UNUSED")
public fun toDebugString(): String = super.toString()
}
它繼承ExperimentalCoroutineDispatcher類(lèi),它是個(gè)不穩(wěn)定的類(lèi),以后可能會(huì)改變,可以看下這個(gè)類(lèi)的dispatch函數(shù),這個(gè)函數(shù)負(fù)責(zé)調(diào)度線(xiàn)程,源碼如下所示:
// Dispatcher.kt
@InternalCoroutinesApi
open class ExperimentalCoroutineDispatcher(
// 核心線(xiàn)程數(shù)
private val corePoolSize: Int,
// 最大線(xiàn)程數(shù)
private val maxPoolSize: Int,
// 調(diào)度器保持存活的時(shí)間(單位:納秒)
private val idleWorkerKeepAliveNs: Long,
// 調(diào)度器名字
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
@Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
// 省略部分代碼
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
// 調(diào)用了coroutineScheduler的dispatch函數(shù)
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
// 省略部分代碼
}
看下CoroutineScheduler這個(gè)類(lèi),然后再看下它的dispatch函數(shù),源碼如下所示:
// CoroutineScheduler.kt
@Suppress("NOTHING_TO_INLINE")
internal class CoroutineScheduler(
// 核心線(xiàn)程數(shù)
@JvmField val corePoolSize: Int,
// 最大線(xiàn)程數(shù)
@JvmField val maxPoolSize: Int,
// 調(diào)度器保持存活的時(shí)間(單位:納秒)
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
// 調(diào)度器名字
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
init {
require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
"Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
}
require(maxPoolSize >= corePoolSize) {
"Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
}
require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
"Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
}
require(idleWorkerKeepAliveNs > 0) {
"Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
}
}
// 省略部分代碼
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
// 用于支持虛擬時(shí)間
trackTask()
val task = createTask(block, taskContext)
// 嘗試將任務(wù)提交到本地隊(duì)列,并且根據(jù)結(jié)果采取執(zhí)行相關(guān)的邏輯
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// 全局隊(duì)列在最后一步關(guān)閉時(shí)不應(yīng)該接受更多的任務(wù)
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
// 執(zhí)行任務(wù)
signalCpuWork()
} else {
// 增加阻塞任務(wù)
signalBlockingWork(skipUnpark = skipUnpark)
}
}
// 省略部分代碼
}
可以看到CoroutineScheduler實(shí)現(xiàn)了Executor接口,在Java中線(xiàn)程池的核心實(shí)現(xiàn)類(lèi)是ThreadPoolExecutor類(lèi),它也是實(shí)現(xiàn)了Executor接口,所以這個(gè)CoroutineScheduler是協(xié)程中線(xiàn)程池的一種實(shí)現(xiàn)。
corePoolSize是核心線(xiàn)程數(shù)量,它是通過(guò)調(diào)用JVM的Runtime.getRuntime().availableProcessors()方法得到當(dāng)前處理器可運(yùn)行的線(xiàn)程數(shù),它的缺省值強(qiáng)制設(shè)置為至少兩個(gè)線(xiàn)程。
maxPoolSize是最大線(xiàn)程數(shù)量,最小值為corePoolSize,最大值為(1 shl BLOCKING_SHIFT) - 2,BLOCKING_SHIFT為21,也就是1向左位移21位再減去2,確保Runtime.getRuntime().availableProcessors()得到的值再乘以2在最小值和最大值之間。
這個(gè)函數(shù)做的事情就是將傳入的任務(wù)壓入任務(wù)棧,然后調(diào)用signalCpuWork執(zhí)行任務(wù)或者調(diào)用signalBlockingWork來(lái)增加阻塞任務(wù)。
然后再看下另外一種情況:CommonPool,源碼如下所示:
// CommonPool.kt
internal object CommonPool : ExecutorCoroutineDispatcher() {
// 省略部分代碼
private fun createPool(): ExecutorService {
if (System.getSecurityManager() != null) return createPlainPool()
// ForkJoinPool類(lèi)的反射,方便它在JDK6上可以運(yùn)行(這里沒(méi)有),如果沒(méi)有就使用普通線(xiàn)程池
val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
?: return createPlainPool()
// 嘗試使用commonPool,除非顯式指定了并行性或者在調(diào)試privatePool模式
if (!usePrivatePool && requestedParallelism < 0) {
Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
?.takeIf { isGoodCommonPool(fjpClass, it) }
?.let { return it }
}
// 嘗試創(chuàng)建私有ForkJoinPool實(shí)例
Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService }
?. let { return it }
// 使用普通線(xiàn)城市
return createPlainPool()
}
// 省略部分代碼
// 創(chuàng)建普通線(xiàn)程池
private fun createPlainPool(): ExecutorService {
val threadId = AtomicInteger()
// 使用Java的newFixedThreadPool線(xiàn)程池
return Executors.newFixedThreadPool(parallelism) {
Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
}
}
// 省略部分代碼
// 調(diào)度線(xiàn)程
override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
(pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
DefaultExecutor.enqueue(block)
}
}
// 省略部分代碼
}
可以看到使用CommonPool,其實(shí)就是使用Java的newFixedThreadPool線(xiàn)程池。
Dispatchers.Default調(diào)度器的核心線(xiàn)程池和處理器的線(xiàn)程數(shù)是相等的,因此它可以用于處理密集型計(jì)算,適合在主線(xiàn)程之外執(zhí)行占用大量CPU資源的工作,例如:對(duì)列表排序和解析JSON,和RxJava的計(jì)算線(xiàn)程池的思想有點(diǎn)類(lèi)似。
Dispatchers.IO
源碼如下所示:
// Dispatchers.kt
public actual object Dispatchers {
// 省略部分代碼
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
可以看到IO其實(shí)是DefaultScheduler的一個(gè)成員變量,源碼如下所示:
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
// 調(diào)用了父類(lèi)ExperimentalCoroutineDispatcher的blocking函數(shù)
val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
override fun close() {
throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed")
}
override fun toString(): String = DEFAULT_SCHEDULER_NAME
@InternalCoroutinesApi
@Suppress("UNUSED")
public fun toDebugString(): String = super.toString()
}
可以看下它的父類(lèi)ExperimentalCoroutineDispatcher的blocking函數(shù),源碼如下所示:
// Dispatcher.kt
@InternalCoroutinesApi
open class ExperimentalCoroutineDispatcher(
// 核心線(xiàn)程數(shù)
private val corePoolSize: Int,
// 最大線(xiàn)程數(shù)
private val maxPoolSize: Int,
// 調(diào)度器保持存活的時(shí)間(單位:納秒)
private val idleWorkerKeepAliveNs: Long,
// 調(diào)度器名字
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
@Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
// 省略部分代碼
public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
// 創(chuàng)建LimitingDispatcher對(duì)象
return LimitingDispatcher(this, parallelism, TASK_PROBABLY_BLOCKING)
}
// 省略部分代碼
}
看下LimitingDispatcher類(lèi),源碼如下所示:
// Dispatcher.kt
private class LimitingDispatcher(
// final變量dispatcher為ExperimentalCoroutineDispatcher類(lèi)型
val dispatcher: ExperimentalCoroutineDispatcher,
val parallelism: Int,
override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
// 省略部分代碼
// 調(diào)度線(xiàn)程,調(diào)用dispatch(block: Runnable, tailDispatch: Boolean)函數(shù)
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
var taskToSchedule = block
while (true) {
// 提交正在執(zhí)行的任務(wù)槽
val inFlight = inFlightTasks.incrementAndGet()
// 快速路徑,如果沒(méi)有達(dá)到并行性限制,就會(huì)分派任務(wù)并且返回
if (inFlight <= parallelism) {
// 調(diào)用ExperimentalCoroutineDispatcher的dispatchWithContext函數(shù)
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
return
}
// 達(dá)到并行性限制后就將任務(wù)添加到隊(duì)列中
queue.add(taskToSchedule)
if (inFlightTasks.decrementAndGet() >= parallelism) {
return
}
taskToSchedule = queue.poll() ?: return
}
}
// 省略部分代碼
}
可以看到其實(shí)Dispatchers.Default調(diào)度器和Dispatchers.IO調(diào)度器是共用同一個(gè)線(xiàn)程池的。
指定CoroutineScope
在定義協(xié)程時(shí),必須指定其CoroutineScope,CoroutineScope可以管理一個(gè)或者多個(gè)相關(guān)的協(xié)程,可以使用它在指定范圍內(nèi)啟動(dòng)新協(xié)程。
與調(diào)度程序不同,CoroutineScope不運(yùn)行協(xié)程。
CoroutineScope的一項(xiàng)重要功能就是在用戶(hù)離開(kāi)應(yīng)用中內(nèi)容區(qū)域時(shí)停止執(zhí)行協(xié)程,可以確保所有正在運(yùn)行的操作都能正確停止。
在Android平臺(tái)上,可以將CoroutineScope實(shí)現(xiàn)與組件中生命周期相關(guān)聯(lián),例如:Lifecycle和ViewModel,這樣可以避免內(nèi)存泄漏和不再對(duì)與用戶(hù)相關(guān)的Activity或者Fragment執(zhí)行額外的工作,例如:ViewModelScope、LifecycleScope和liveData。
添加KTX依賴(lài)項(xiàng)
- 對(duì)于ViewModelScope,請(qǐng)使用androidx.lifecycle:lifecycle-viewmodel-ktx:2.1.0-beta01或更高版本。
- 對(duì)于LifecycleScope,請(qǐng)使用androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-alpha01或更高版本。
- 對(duì)于liveData,請(qǐng)使用androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-alpha01或更高版本。
生命周期感知型CoroutineScope
ViewModelScope
為ViewModel定義ViewModelScope,如果ViewModel已清除,那么在這個(gè)范圍內(nèi)的啟動(dòng)的協(xié)程就會(huì)自動(dòng)取消,如果你的工作需要在ViewModel處于活動(dòng)狀態(tài)下才能完成的話(huà),可以使用它,示例代碼如下所示:
class MyViewModel : ViewModel() {
init {
viewModelScope.launch {
// 當(dāng)ViewModel被清除,這個(gè)范圍內(nèi)啟動(dòng)的協(xié)程就會(huì)自動(dòng)取消
}
}
}
LifecycleScope
為每個(gè)Lifecycle對(duì)象定義LifecycleScope,在這個(gè)范圍內(nèi)啟動(dòng)的協(xié)程會(huì)在Lifecycle被銷(xiāo)毀的時(shí)候自動(dòng)取消,可以通過(guò)lifecycle.coroutineScope或者lifecycleOwner.lifecycleScope屬性訪(fǎng)問(wèn)Lifecycle的CoroutineScope,示例代碼如下所示:
class MyFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
viewLifecycleOwner.lifecycleScope.launch {
// 在這個(gè)范圍內(nèi)啟動(dòng)的協(xié)程會(huì)在Lifecycle被銷(xiāo)毀的時(shí)候自動(dòng)取消
}
}
}
即使CoroutineScope提供了適當(dāng)?shù)姆椒▉?lái)自動(dòng)取消長(zhǎng)時(shí)間運(yùn)行的操作,在某些情況下,可能需要暫停執(zhí)行代碼塊,例如:要使用FragmentTransaction,Fragment的生命周期至少處于STARTED狀態(tài),對(duì)于這種情況,Lifecycle提供了這些方法:lifecycle.whenCreated、lifecycle.whenStarted和lifecycle.whenResumed,如果Lifecycle未至少處于所需的最低狀態(tài),那么就會(huì)暫停在這些代碼塊內(nèi)運(yùn)行的任何協(xié)程,示例代碼如下所示:
class MyFragment : Fragment() {
init {
// 在Fragment的構(gòu)造函數(shù)可以安全地啟動(dòng)
lifecycleScope.launch {
whenCreateed {
// 只有當(dāng)Fragment的生命周期至少處于CREATED狀態(tài)下,這個(gè)代碼塊才會(huì)執(zhí)行,并且可以調(diào)用其他suspend函數(shù)
}
whenStarted {
// 只有當(dāng)Fragment的生命周期至少處于STARTED狀態(tài)下,這個(gè)代碼塊才會(huì)執(zhí)行,并且可以調(diào)用其他suspend函數(shù)
}
whenResumed {
// 只有當(dāng)Fragment的生命周期至少處于RESUMED狀態(tài)下,這個(gè)代碼塊才會(huì)執(zhí)行,并且可以調(diào)用其他suspend函數(shù)
}
}
}
}
liveData
使用LiveData時(shí),我們可能需要異步計(jì)算值,例如:獲取了用戶(hù)信息后顯示到界面,在這種情況下,我們可以使用liveData構(gòu)建器函數(shù)調(diào)用suspend函數(shù),并且將結(jié)果作為LiveData對(duì)象返回,示例代碼如下所示:
val userInfoData: LiveData<UserInfoData> = liveData {
// getUserInfo函數(shù)是一個(gè)suspend函數(shù)
val data = remoteSource.getUserInfo()
emit(data)
}
liveData構(gòu)建塊用作協(xié)程和LiveData之間的結(jié)構(gòu)化并發(fā)基元。
當(dāng)LiveData變?yōu)?strong>活動(dòng)狀態(tài)的時(shí)候,代碼塊開(kāi)始執(zhí)行;當(dāng)LiveData變?yōu)?strong>非活動(dòng)狀態(tài)的時(shí)候,代碼塊會(huì)在可配置的超時(shí)過(guò)后自動(dòng)取消。如果代碼塊在完成前取消,則會(huì)在LiveData再次變成活動(dòng)狀態(tài)后重啟;如果在上次運(yùn)行中成功完成,則不會(huì)重啟。要注意的是,代碼塊只有在自動(dòng)取消的情況下才會(huì)重啟,如果代碼塊由于任何其他原因(例如:拋出CancelationException)而取消,則不會(huì)重啟。
我們可以從代碼塊中發(fā)出多個(gè)值,每次調(diào)用emit函數(shù)都會(huì)暫停執(zhí)行代碼塊,直到在主線(xiàn)程上設(shè)置LiveData值。
我的GitHub:TanJiaJunBeyond
Android通用框架:Android通用框架
我的掘金:譚嘉俊
我的簡(jiǎn)書(shū):譚嘉俊
我的CSDN:譚嘉俊