
前言
協(xié)程是一個并發(fā)方案。也是一種思想。
傳統(tǒng)意義上的協(xié)程是單線程的,面對io密集型任務(wù)他的內(nèi)存消耗更少,進而效率高。但是面對計算密集型的任務(wù)不如多線程并行運算效率高。
不同的語言對于協(xié)程都有不同的實現(xiàn),甚至同一種語言對于不同平臺的操作系統(tǒng)都有對應(yīng)的實現(xiàn)。
我們kotlin語言的協(xié)程是 coroutines for jvm的實現(xiàn)方式。底層原理也是利用java 線程。
基礎(chǔ)知識
生態(tài)架構(gòu)

相關(guān)依賴庫
dependencies {
// Kotlin
implementation "org.jetbrains.kotlin:kotlin-stdlib:1.4.32"
// 協(xié)程核心庫
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.3"
// 協(xié)程Android支持庫
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.3"
// 協(xié)程Java8支持庫
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.4.3"
// lifecycle對于協(xié)程的擴展封裝
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0"
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0"
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0"
}
為什么一些人總覺得協(xié)程晦澀難懂?
1.網(wǎng)絡(luò)上沒有詳細(xì)的關(guān)于協(xié)程的概念定義,每種語言、每個系統(tǒng)對其實現(xiàn)都不一樣。可謂是眾說紛紜,什么內(nèi)核態(tài)用戶態(tài)巴拉巴拉,很容易給我們帶偏
2.kotlin的各種語法糖對我們造成的干擾。如:
- 高階函數(shù)
- 源碼實現(xiàn)類找不到
所以扎實的kotlin語法基本功是學(xué)習(xí)協(xié)程的前提。
實在看不懂得地方就反編譯為java,以java最終翻譯為準(zhǔn)。
協(xié)程是什么?有什么用?
kotlin中的協(xié)程干的事就是把異步回調(diào)代碼拍扁了,捋直了,讓異步回調(diào)代碼同步化。除此之外,沒有任何特別之處。
創(chuàng)建一個協(xié)程,就是編譯器背后偷偷生成一系列代碼,比如說狀態(tài)機。
通過掛起和恢復(fù)讓狀態(tài)機狀態(tài)流轉(zhuǎn)實現(xiàn)把層層嵌套的回調(diào)代碼變成像同步代碼那樣直觀、簡潔。
它不是什么線程框架,也不是什么高深的內(nèi)核態(tài),用戶態(tài)。它其實對于咱們安卓來說,就是一個關(guān)于回調(diào)函數(shù)的語法糖。。。
本文將會圍繞掛起與恢復(fù)徹底剖析協(xié)程的實現(xiàn)原理
Kotlin函數(shù)基礎(chǔ)知識復(fù)習(xí)
再Kotlin中函數(shù)是一等公民,有自己的類型
函數(shù)類型
fun foo(){}
//類型為 () -> Unit
fun foo(p: Int){}
//類型為 (Int) -> String
class Foo{
fun bar(p0: String,p1: Long):Any{}
}
//那么 bar 的類型為:Foo.(String,Long) -> Any
//Foo就是bar的 receiver。也可以寫成 (Foo,String,Long) ->Any
函數(shù)引用
fun foo(){}
//引用是 ::foo
fun foo(p0: Int): String
//引用也是 ::foo
咋都一樣?沒辦法,就這樣規(guī)定的。使用的時候 只能靠編譯器推斷
val f: () -> Unit = ::foo //編譯器會推斷出是fun foo(){}
val g: (Int) -> String = ::foo //推斷為fun foo(p0: Int): String
帶Receiver的寫法
class Foo{
fun bar(p0: String,p1: Long):Any{}
}
val h: (Foo,String,Long) -> Any = Foo:bar
綁定receiver的函數(shù)引用:
val foo: Foo = Foo()
val m: (String,Long) -> Any = foo:bar
額外知識點
val x: (Foo,String,Long) -> Any = Foo:bar
val y: Function3<Foo,String,Long,Any> = x
Foo.(String,Long) -> Any = (Foo,String,Long) ->Any = Function3<Foo,String,Long,Any>
函數(shù)作為參數(shù)傳遞
fun yy(p: (Foo,String,Long)->Any){
p(Foo(),"Hello",3L)//直接p()就能調(diào)用
//p.invoke(Foo(),"Hello",3L) 也可以用invoke形式
}
Lambda
就是匿名函數(shù),它跟普通函數(shù)比是沒有名字的,聽起來好像是廢話
//普通函數(shù)
fun func(){
println("hello");
}
//去掉函數(shù)名 func,就成了匿名函數(shù)
fun(){
println("hello");
}
//可以賦值給一個變量
val func = fun(){
println("hello");
}
//匿名函數(shù)的類型
val func :()->Unit = fun(){
println("hello");
}
//Lambda表達式
val func={
print("Hello");
}
//Lambda類型
val func :()->String = {
print("Hello");
"Hello" //如果是Lambda中,最后一行被當(dāng)作返回值,能省掉return。普通函數(shù)則不行
}
//帶參數(shù)Lambda
val f1: (Int)->Unit = {p:Int ->
print(p);
}
//可進一步簡化為
val f1 = {p:Int ->
print(p);
}
//當(dāng)只有一個參數(shù)的時候,還可以寫成
val f1: (Int)->Unit = {
print(it);
}
關(guān)于函數(shù)的個人經(jīng)驗總結(jié)
函數(shù)跟匿名函數(shù)看起來沒啥區(qū)別,但是反編譯為java后還是能看出點差異
如果只是用普通的函數(shù),那么他跟普通java 函數(shù)沒啥區(qū)別。
比如 fun a() 就是對應(yīng)java方法public void a(){}
但是如果通過函數(shù)引用(:: a)來用這個函數(shù),那么他并不是直接調(diào)用fun a()而是重新生成一個Function0
掛起函數(shù)
suspend 修飾。
掛起函數(shù)中能調(diào)用任何函數(shù)。
非掛起函數(shù)只能調(diào)用非掛起函數(shù)。
換句話說,suspend函數(shù)只能在suspend函數(shù)中調(diào)用。
簡單的掛起函數(shù)展示:
//com.example.studycoroutine.chapter.CoroutineRun.kt
suspend fun suspendFun(): Int {
return 1;
}
掛起函數(shù)特殊在哪?
public static final Object suspendFun(Continuation completion) {
return Boxing.boxInt(1);
}
這下理解suspend為啥只能在suspend里面調(diào)用了吧?
想要讓道貌岸然的suspend函數(shù)干活必須要先滿足它?。?!就是給它里面塞入一顆球。
然后他想調(diào)用其他的suspend函數(shù),只需將球繼續(xù)塞到其它的suspend方法里面。
普通函數(shù)里沒這玩意啊,所以壓根沒法調(diào)用suspend函數(shù)。。。
讀到這里,想必各位會有一些疑問:
question1.這不是雞生蛋生雞的問題么?第一顆球是哪來的?
question2.為啥編譯后返回值也變了?
question3.suspendFun 如果在協(xié)程體內(nèi)被調(diào)用,那么他的球(completion)是誰?
標(biāo)準(zhǔn)庫給我們提供的最原始工具
public fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
public fun <T> (suspend () -> T).createCoroutine(completion: Continuation<T>): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
以一個最簡單的方式啟動一個協(xié)程。
Demo-K1
fun main() {
val b = suspend {
val a = hello2()
a
}
b.createCoroutine(MyCompletionContinuation()).resume(Unit)
}
suspend fun hello2() = suspendCoroutine<Int> {
thread{
Thread.sleep(1000)
it.resume(10086)
}
}
class MyContinuation() : Continuation<Int> {
override val context: CoroutineContext = CoroutineName("Co-01")
override fun resumeWith(result: Result<Int>) {
log("MyContinuation resumeWith 結(jié)果 = ${result.getOrNull()}")
}
}
兩個創(chuàng)建協(xié)程函數(shù)區(qū)別
startCoroutine 沒有返回值 ,而createCoroutine返回一個Continuation,不難看出是SafeContinuation
好像看起來主要的區(qū)別就是startCoroutine直接調(diào)用resume(Unit),所以不用包裝成SafeContinuation,而createCoroutine則返回一個SafeContinuation,因為不知道將會在何時何處調(diào)用resume,必須保證resume只調(diào)用一次,所以包裝為safeContinuation
SafeContinuationd的作用是為了確保只有發(fā)生異步調(diào)用時才掛起
分析createCoroutineUnintercepted
//kotlin.coroutines.intrinsics.CoroutinesIntrinsicsH.kt
@SinceKotlin("1.3")
public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T>): Continuation<Unit>
先說結(jié)論
其實可以簡單的理解為kotlin層面的原語,就是返回一個協(xié)程體。
開始分析
引用代碼Demo-K1首先b 是一個匿名函數(shù),他肯定要被編譯為一個FunctionX,同時它還被suspend修飾 所以它肯定跟普通匿名函數(shù)編譯后不一樣。
編譯后的源碼為
public static final void main() {
Function1 var0 = (Function1)(new Function1((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
var10000 = TestSampleKt.hello2(this);
if (var10000 == var3) {
return var3;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int a = ((Number)var10000).intValue();
return Boxing.boxInt(a);
}
@NotNull
public final Continuation create(@NotNull Continuation completion) {
Intrinsics.checkParameterIsNotNull(completion, "completion");
Function1 var2 = new <anonymous constructor>(completion);
return var2;
}
public final Object invoke(Object var1) {
return((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
}
});
boolean var1 = false;
Continuation var7 = ContinuationKt.createCoroutine(var0, (Continuation)(newMyContinuation()));
Unit var8 = Unit.INSTANCE;
boolean var2 = false;
Companion var3 = Result.Companion;
boolean var5 = false;
Object var6 = Result.constructor-impl(var8);
var7.resumeWith(var6);
}
我們可以看到先是 Function1 var0 = new Function1創(chuàng)建了一個對象,此時跟協(xié)程沒關(guān)系,這步只是編譯器層面的匿名函數(shù)語法優(yōu)化
如果直接
fun main() {
suspend {
val a = hello2()
a
}.createCoroutine(MyContinuation()).resume(Unit)
}
也是一樣會創(chuàng)建Function1 var0 = new Function1
解答question1
繼續(xù)調(diào)用createCoroutine
再繼續(xù)createCoroutineUnintercepted ,找到在JVM平臺的實現(xiàn)
//kotlin.coroutines.intrinsics.IntrinsicsJVM.class
@SinceKotlin("1.3")
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
//probeCompletion還是我們傳入completion對象,在我們的Demo就是myCoroutine
val probeCompletion = probeCoroutineCreated(completion)//probeCoroutineCreated方法點進去看了,好像是debug用的.我的理解是這樣的
//This就是這個suspend lambda。在Demo中就是myCoroutineFun
return if (this is BaseContinuationImpl)
create(probeCompletion)
else
//else分支在我們demo中不會走到
//當(dāng) [createCoroutineUnintercepted] 遇到不繼承 BaseContinuationImpl 的掛起 lambda 時,將使用此函數(shù)。
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
@NotNull
public final Continuation create(@NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function1 var2 = new <anonymous constructor>(completion);
return var2;
}
把completion傳入,并創(chuàng)建一個新的Function1,作為Continuation返回,這就是創(chuàng)建出來的協(xié)程體對象,協(xié)程的工作核心就是它內(nèi)部的狀態(tài)機,invokeSuspend函數(shù)
調(diào)用 create
@NotNull
public final Continuation create(@NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function1 var2 = new <anonymous constructor>(completion);
return var2;
}
把completion傳入,并創(chuàng)建一個新的Function1,作為Continuation返回,這就是創(chuàng)建出來的協(xié)程體對象,協(xié)程的工作核心就是它內(nèi)部的狀態(tài)機,invokeSuspend函數(shù)
補充---相關(guān)類繼承關(guān)系

解答question2&3
已知協(xié)程啟動會調(diào)用協(xié)程體的resume,該調(diào)用最終會來到BaseContinuationImpl::resumeWith
internal abstract class BaseContinuationImpl{
fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)//調(diào)用狀態(tài)機
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
//最終走到這里,這個completion就是被塞的第一顆球。
completion.resumeWith(outcome)
return
}
}
}
}
}
狀態(tài)機代碼截取
public final Object invokeSuspend(@NotNull Object $result) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
switch(this.label) {
case 0://第一次進來 label = 0
ResultKt.throwOnFailure($result);
// label改成1了,意味著下一次被恢復(fù)的時候會走case 1,這就是所謂的【狀態(tài)流轉(zhuǎn)】
this.label = 1;
//全體目光向我看齊,我宣布個事:this is 協(xié)程體對象。
var10000 = TestSampleKt.hello2(this);
if (var10000 == var3) {
return var3;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int a = ((Number)var10000).intValue();
return Boxing.boxInt(a);
}
question3答案出來了,傳進去的是create創(chuàng)建的那個continuation
最后再來聊聊question2,從上面的代碼已經(jīng)很清楚的告訴我們?yōu)樯稈炱鸷瘮?shù)反編譯后的返回值變?yōu)閛bject了。
以hello2為例子,hello2能返回代表掛起的白板,也能返回result。如果返回白板,狀態(tài)機return,協(xié)程掛起。如果返回result,那么hello2執(zhí)行完畢,是一個沒有掛起的掛起函數(shù),通常編譯器也會提醒 suspend 修飾詞無意義。所以這就是設(shè)計需要,沒有啥因為所以。
最后,除了直接返回結(jié)果的情況,掛起函數(shù)一定會以resume結(jié)尾,要么返回result,要么返回異常。代表這個掛起函數(shù)返回了。
調(diào)用resume意義在于重新回調(diào)BaseContinuationImpl的resumeWith,進而喚醒狀態(tài)機,繼續(xù)執(zhí)行協(xié)程體的代碼。
換句話說,我們自定義的suspend函數(shù),一定要利用suspendCoroutine 獲得續(xù)體,即狀態(tài)機對象,否則無法實現(xiàn)真正的掛起與resume。
suspendCoroutine
我們可以不用suspendCoroutine,用更直接的suspendCoroutineUninterceptedOrReturn也能實現(xiàn),不過這種方式要手動返回白板。不過一定要小心,要在合理的情況下返回或者不返回,不然會產(chǎn)生很多意想不到的結(jié)果
suspend fun mySuspendOne() = suspendCoroutineUninterceptedOrReturn<String> { continuation ->
thread {
TimeUnit.SECONDS.sleep(1)
continuation.resume("hello world")
}
//因為我們這個函數(shù)沒有返回正確結(jié)果,所以必須返回一個掛起標(biāo)識,否則BaseContinuationImpl會認(rèn)為完成了任務(wù)。
// 并且我們的線程又在運行沒有取消,這將很多意想不到的結(jié)果
kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
}
而suspendCoroutine則沒有這個隱患
suspend fun mySafeSuspendOne() = suspendCoroutine<String> { continuation ->
thread {
TimeUnit.SECONDS.sleep(1)
continuation.resume("hello world")
}
//suspendCoroutine函數(shù)很聰明的幫我們判斷返回結(jié)果如果不是想要的對象,自動返
kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
}
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
//封裝一個代理Continuation對象
val safe = SafeContinuation(c.intercepted())
block(safe)
//根據(jù)block返回結(jié)果判斷要不要返回COROUTINE_SUSPENDED
safe.getOrThrow()
}
SafeContinuation的奧秘
//調(diào)用單參數(shù)的這個構(gòu)造方法
internal actual constructor(delegate: Continuation<T>) : this(delegate, UNDECIDED)
@Volatile
private var result: Any? = initialResult //UNDECIDED賦值給 result
//java原子屬性更新器那一套東西
private companion object {
@Suppress("UNCHECKED_CAST")
@JvmStatic
private val RESULT = AtomicReferenceFieldUpdater.newUpdater<SafeContinuation<*>, Any?>(
SafeContinuation::class.java, Any::class.java as Class<Any?>, "result"
)
}
internal actual fun getOrThrow(): Any? {
var result = this.result // atomic read
if (result === UNDECIDED) { //如果UNDECIDED,那么就把result設(shè)置為COROUTINE_SUSPENDED
if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) returnCOROUTINE_SUSPENDED
result = this.result // reread volatile var
}
return when {
result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
result is Result.Failure -> throw result.exception
else -> result // either COROUTINE_SUSPENDED or data <-這里返回白板
}
}
public actual override fun resumeWith(result: Result<T>) {
while (true) { // lock-free loop
val cur = this.result // atomic read。不理解這里的官方注釋為啥叫做原子讀。我覺得 Volatile只能保證可見性。
when {
//這里如果是UNDECIDED 就把 結(jié)果附上去。
cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
//如果是掛起狀態(tài),就通過resumeWith回調(diào)狀態(tài)機
cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)){
delegate.resumeWith(result)
return
}
else -> throw IllegalStateException("Already resumed")
}
}
}
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()
先回顧一下什么叫真正的掛起,就是getOrThrow返回了“白板”,那么什么時候getOrThrow能返回白板?答案就是result被初始化后值沒被修改過。那么也就是說resumeWith沒有被執(zhí)行過,即:block(safe)這句代碼,block這個被傳進來的函數(shù),執(zhí)行過程中沒有調(diào)用safe的resumeWith。原理就是這么簡單,cas代碼保證關(guān)鍵邏輯的原子性與并發(fā)安全
繼續(xù)以Demo-K1為例子,這里假設(shè)hello2運行在一條新的子線程,否則仍然是沒有掛起。
{
thread{
Thread.sleep(1000)
it.resume(10086)
}
}
總結(jié)
最后,可以說開啟一個協(xié)程,就是利用編譯器生成一個狀態(tài)機對象,幫我們把回調(diào)代碼拍扁,成為同步代碼。