Kotlin/Native 異步并發(fā)模型(1)—— Worker 與對(duì)象子圖

Kotlin/Native 現(xiàn)狀的一些討論

Kotlin/Native 編寫的程序作為一種原生二進(jìn)制程序,沒(méi)有強(qiáng)大的運(yùn)行時(shí)虛擬機(jī)來(lái)提供各種運(yùn)行時(shí)的保障,
因此它需要重新思考一套自己的異步并發(fā)模型。實(shí)際上 JVM 這一套機(jī)制是 C/C++
這種傳統(tǒng)命令式編程語(yǔ)言的線程同步機(jī)制的延續(xù),但 Kotlin 在編程范式上吸收了部分函數(shù)式編程的特性,因此 Kotlin/Native
的同步方案從設(shè)計(jì)思想上向函數(shù)式編程靠攏,即對(duì)象不可變,其宗旨就是如果對(duì)象本身不可變,那就不存在線程安全的問(wèn)題。

Kotlin/Native 中,我們能實(shí)現(xiàn)的異步和并發(fā)方案有好幾種,甚是混亂。第一種方式是,我們可以直接使用相關(guān)操作系統(tǒng)平臺(tái)提供的 API
來(lái)自己開(kāi)啟線程,例如在 Linux 上,我們就可以像寫 C 語(yǔ)言程序一樣,自己手動(dòng)調(diào)用 pthread_create
來(lái)創(chuàng)建線程,但是這樣寫出來(lái)的代碼就違反了平臺(tái)通用性的原則,例如如果你要將你的程序移植到 Windows 上,那異步并發(fā)方式就得全部改用
Windows 平臺(tái)的機(jī)制,可移植性太差,在編寫多平臺(tái)程序的時(shí)候這種方式就很丑陋。

Kotlin/Native 自身提供給了我們兩套異步并發(fā)的 API,首先是協(xié)程,但 Kotlin/Native 的協(xié)程與 Kotlin/JVM
的協(xié)程區(qū)別很大,Kotlin/Native 的協(xié)程是單線程的,也就是說(shuō)它只能用來(lái)執(zhí)行一些不占用 CPU 資源的并發(fā)任務(wù),例如網(wǎng)絡(luò)請(qǐng)求,如果要利用 CPU
多核的能力來(lái)進(jìn)行并行計(jì)算,Native 版的協(xié)程就失去了作用,當(dāng)然,官方說(shuō)了要盡快解決這個(gè)問(wèn)題,并且前幾天(2019 年 12
月底)我發(fā)現(xiàn)官方已經(jīng)發(fā)布了 Native 多線程版協(xié)程的預(yù)覽版本,這個(gè)會(huì)在后文詳細(xì)討論。因?yàn)楫?dāng)前主分支版本的協(xié)程不能并行計(jì)算,因此官方在 Kotlin/Native
誕生之初就已經(jīng)提供了另一套專門做并行任務(wù)的工具,即 Worker,Worker 與 Kotlin/Native 的異步并發(fā)模型緊密相連,做到了既能利用 CPU
多核能力,又能保障線程安全(雖然做法很粗暴)。這篇文章我們會(huì)討論 Worker 與 Kotlin/Native 異步并發(fā)機(jī)制,而協(xié)程將在下一篇討論。

Worker 初步使用

首先用 Intellij IDEA 創(chuàng)建一個(gè)基本的 Kotlin/Native 工程。我當(dāng)前電腦的操作系統(tǒng)版本是
macOS 10.15.1,因此后面的一些示例和測(cè)試方案都基于該系統(tǒng),作為類 Unix 系統(tǒng),Linux 上的對(duì)應(yīng)行為可能也相差無(wú)幾,
但是這些示例不保證在 Windows 等系統(tǒng)上也全部可用,或行為完全一致。

先來(lái)看看 Worker 怎么用。然后我們?cè)?main 函數(shù)中編寫以下代碼:

fun main() {
    val worker = Worker.start(true, "worker1")  // 1
    worker.execute(TransferMode.SAFE, { 2 + 1 }) {
        (it + 100).toString()
    }.consume {
        println(it)
    }
}

使用 Worker.start 函數(shù)我們就可以創(chuàng)建一個(gè)新的 Worker,然后調(diào)用 Workerexecute
函數(shù)就可以在別的線程執(zhí)行任務(wù)了。這個(gè)函數(shù)接收三個(gè)參數(shù),第一個(gè)我們先不看,第二個(gè)參數(shù),即示例中的 { 2 + 1 }
將扮演一個(gè)生產(chǎn)者的角色(為了簡(jiǎn)便,后文我們使用源碼中的命名 producer 來(lái)稱呼它),它會(huì)在外面的線程執(zhí)行,producer
的返回值將在第三個(gè)參數(shù)(也是個(gè) lambda 表達(dá)式,同樣,后文我們用源碼中的命名 job 來(lái)稱呼它)中作為參數(shù)來(lái)提供。
而 job 中的代碼會(huì)在別的線程中執(zhí)行。
最后 execute 函數(shù)的返回結(jié)果是一個(gè) Future<T> 類型的對(duì)象,調(diào)用它的成員函數(shù) consume
即可獲得在 job 執(zhí)行的結(jié)果。運(yùn)行代碼驗(yàn)證一下,結(jié)果如下:

103

現(xiàn)在還要驗(yàn)證一個(gè)問(wèn)題,producer 與 job 還有 consume 到底在哪個(gè)線程執(zhí)行,雖然官方文檔肯定不會(huì)騙我們,但是我們自己要掌握驗(yàn)證的方法:

fun main() {
    val worker = Worker.start(true, "worker1")
    println("位置 1 的線程 id:${pthread_self()!!.rawValue.toLong()}")
    worker.execute(TransferMode.SAFE, {
        println("位置 2 的線程 id:${pthread_self()!!.rawValue.toLong()}")
        2 + 1
    }) {
        println("位置 3 的線程 id:${pthread_self()!!.rawValue.toLong()}")
        (it + 100).toString()
    }.consume {
        println("位置 4 的線程 id:${pthread_self()!!.rawValue.toLong()}")
        // println(it)
    }
}

我們?cè)?3 個(gè)位置上都使用 pthread_self() 函數(shù)來(lái)打印當(dāng)前線程 id,輸出如下:

位置 1 的線程 id:4484095424
位置 2 的線程 id:4484095424
位置 3 的線程 id:123145437896704
位置 4 的線程 id:4484095424

果然,官方文檔誠(chéng)不欺我(手動(dòng)狗頭)。

有了直觀的認(rèn)識(shí)之后,我們會(huì)發(fā)現(xiàn) Worker 用起來(lái)和協(xié)程中的 async/await 有點(diǎn)像。但是我們發(fā)現(xiàn)它比 async/await
要麻煩,同樣,我們先不看 execute 函數(shù)的第一個(gè)參數(shù),我們可能會(huì)覺(jué)得 producer 有點(diǎn)多此一舉,為什么在其他線程執(zhí)行的 job
必須使用 producer 傳遞過(guò)來(lái)的參數(shù),它直接捕捉上下文的變量不行嗎?為了驗(yàn)證這一點(diǎn),于是就有了如下代碼:

fun main() {
    val worker = Worker.start(true, "worker1")
    val a = "第二個(gè)參數(shù)是干啥用的?"
    worker.execute(TransferMode.SAFE, { 2 + 1 }) {
        println(a)
        (it + 100).toString()
    }.consume {
        println(it)
    }
}

重新運(yùn)行程序,直接編譯報(bào)錯(cuò):

e: kotlin.native.concurrent.Worker.execute must take an unbound, non-capturing function or lambda

為了讓信息簡(jiǎn)潔一點(diǎn),上面復(fù)制過(guò)來(lái)的報(bào)錯(cuò)信息省略了報(bào)錯(cuò)的文件以及行數(shù)。我們可以看到報(bào)錯(cuò)信息中說(shuō),在 Worker
中執(zhí)行的函數(shù)或 lambda 表達(dá)式不能有變量捕捉。于是,這就代表著,producer 是 job 與外界線程進(jìn)行數(shù)據(jù)傳遞的唯一入口,job
無(wú)法通過(guò)變量捕捉自由訪問(wèn)外界線程的對(duì)象。這么看起來(lái) Worker 的實(shí)際太粗暴了,如果我要一次傳遞兩個(gè)對(duì)象怎么辦?用
Pair 包裝一下,那一次要傳遞三個(gè)對(duì)象呢?用 Triple!四個(gè)呢?呃……F**k。

對(duì)象的傳遞

現(xiàn)在,我們?cè)谥骶€程創(chuàng)建了一個(gè)對(duì)象,我們想把它傳遞到 Worker 中,由于 producer 是在外部線程中運(yùn)行的,
且對(duì)外部的對(duì)象進(jìn)行變量捕捉不會(huì)失敗,因此我們自然而然可能會(huì)寫出如下代碼。

fun main() {
    val worker = Worker.start(true, "worker1")
    val testData = TestData()
    val future = worker.execute(TransferMode.SAFE, { testData }) {
        it
    }
    future.consume { println(it.index) }
}

data class TestData(var index: Int = 0)

然后理所當(dāng)然的運(yùn)行報(bào)錯(cuò):

Uncaught Kotlin exception: kotlin.IllegalStateException: Illegal transfer state

然后我們?nèi)タ纯?execute 的第一個(gè)參數(shù) TransferMode,這是一個(gè)枚舉類型,共有兩個(gè)枚舉值,
我們?nèi)ピ创a注釋中看看這兩個(gè)值的區(qū)別:

……

不復(fù)制粘貼了,有點(diǎn)長(zhǎng),大意就是:在 SAFE 模式下,如果傳遞到 Worker 的對(duì)象可被別的線程或 Worker 引用到,則直接拋出異常,而在
UNSAFE 模式下,不做檢查,而是直接把對(duì)象傳遞過(guò)去,但是有可能會(huì)造成程序崩潰。接下來(lái)我們要驗(yàn)證兩個(gè)事情:

第一,當(dāng)主線程把對(duì)象傳遞給 Worker 后就不再持有對(duì)該對(duì)象的引用,SAFE 模式是否可以正常工作:

fun main() {
    val worker = Worker.start(true, "worker1")
    var testData: TestData? = TestData()
    val future = worker.execute(TransferMode.SAFE, {
        val data = testData!!
        testData = null
        data
    }) { data ->
        repeat(20000) { data.index++ }
        data
    }
    future.consume { println(it.index) }
}

data class TestData(var index: Int = 0)

程序正常打印輸出 20000。這樣來(lái)看 SAFE 模式這樣設(shè)計(jì)的確是合理的,如果主線程將對(duì)象傳遞給 Worker
之后仍然可以繼續(xù)訪問(wèn)對(duì)象,那就可能發(fā)生線程安全問(wèn)題,因此 SAFE 模式直接拒絕了這種事情的發(fā)生而拋出異常,但是這樣的寫法太丑陋了,
如果要實(shí)現(xiàn)更優(yōu)雅的寫法,唯一的辦法就是讓 testData 的引用范圍不超出 produce,也就是說(shuō)把數(shù)據(jù)產(chǎn)生的過(guò)程都寫到 produce
里面,雖然這樣也沒(méi)有那么優(yōu)雅,但是還能接受。

官方提供了一套理論來(lái)解釋上面示例程序所表現(xiàn)出來(lái)的行為:被 producer 傳遞的對(duì)象會(huì)被包裝一個(gè)叫做對(duì)象子圖(object
subgraph)的東西,對(duì)象子圖生成之后,原線程就不能再訪問(wèn)對(duì)象子圖,如果是在 SAFE
模式,就會(huì)使用圖遍歷算法檢查對(duì)象子圖的訪問(wèn)。以上都是目前官方文檔的闡述,
關(guān)于 Worker 的更多資料我覺(jué)得官方在日后還會(huì)有更多補(bǔ)充,等到那時(shí)再詳細(xì)分析。

再來(lái)看看 UNSAFE 模式:

fun main() {
    val worker = Worker.start(true, "worker1")
    val testData = TestData()
    val future = worker.execute(TransferMode.UNSAFE, { testData }) { data ->
        repeat(20000) { data.index++ }
        data
    }
    repeat(20000) { testData.index++ }
    future.consume { println(it.index) }
}

data class TestData(var index: Int = 0)

如果線程訪問(wèn)是安全的,應(yīng)該輸出 40000,但是你每次運(yùn)行這段代碼得到的結(jié)果都會(huì)不同,反正都小于 40000。所以,果然 UNSAFE
模式簡(jiǎn)單粗暴,直接撒手不管了,我最初的預(yù)測(cè)是,當(dāng)兩個(gè)線程真正發(fā)生同一時(shí)刻訪問(wèn)同一個(gè)變量的時(shí)候會(huì)發(fā)生崩潰,
而在其他情況下,程序照常運(yùn)行,就像源碼注釋里說(shuō)的那樣。但事實(shí)并非如此,所以我建議,千萬(wàn)不要靠"人"來(lái)保障線程安全,在
99.99% 的情況下都應(yīng)該使用 SAFE 模式,如果使用 UNSAFE 模式,風(fēng)險(xiǎn)將直接暴露出來(lái),且 Kotlin/Native
沒(méi)有線程鎖來(lái)幫你兜底。

對(duì)象子圖凍結(jié)、全局變量以及單例

上面已經(jīng)討論了很多情況,但是跨線程訪問(wèn)都是在函數(shù)內(nèi)部,也就是局部變量的跨線程訪問(wèn)。但如果是全局變量、
單例這種在多個(gè)函數(shù)內(nèi)都可以訪問(wèn)的變量,情況則會(huì)有所不同。

先闡述一個(gè)對(duì)象子圖凍結(jié)的概念,對(duì)于某些變量,我們確切知道其一定不可變,那對(duì)于這種變量,無(wú)論在多少個(gè)線程中同時(shí)訪問(wèn)它都是安全的,
既然如此,那 Kotlin/Native 也沒(méi)必要對(duì)這種變量在訪問(wèn)的時(shí)候做子圖校驗(yàn),對(duì)于這樣的變量,我們就可以稱其為被凍結(jié)的變量,
官方文檔關(guān)于這個(gè)地方有些前后矛盾,
先說(shuō)凍結(jié)的變量只有枚舉一種,但后面又闡述了兩種變量?jī)鼋Y(jié)的情況(后文會(huì)介紹)。還有一種情況,也有可能一個(gè)變量一開(kāi)始是非凍結(jié)的,
后面又被凍結(jié)了,但是有一點(diǎn)是不變的,那就是已凍結(jié)的對(duì)象不可解凍。關(guān)于在多個(gè) Worker
中訪問(wèn)枚舉變量的情況這里也就不舉例了,很簡(jiǎn)單。

下面講講幾個(gè)重要的注解和幾種重要的情況

訪問(wèn)全局變量

val abc = "abc"

fun main() {
    val worker = Worker.start(true, "worker1")
    val future = worker.execute(TransferMode.UNSAFE, {}) {
        println(abc)
    }
    future.consume { println(abc) }
}

程序正常運(yùn)行,打印輸出:

abc
abc

這很奇怪,官方文檔說(shuō)全局變量(沒(méi)有特殊標(biāo)記)
只能在主線程訪問(wèn),但是我們明明在子線程訪問(wèn)了它,程序卻正常運(yùn)行。我們把修飾變量 abcval 改成
var 再試一試,程序果然拋出異常:IncorrectDereferenceException。

那如果是非 String 的引用類型呢?

val testData = TestData()

fun main() {
    val worker = Worker.start(true, "worker1")
    val future = worker.execute(TransferMode.UNSAFE, {}) {
        println(testData)
    }
    future.consume { println(testData) }
}

class TestData

程序拋出異常:IncorrectDereferenceException,多測(cè)試幾次后,基本可以得出一個(gè)結(jié)論:

  • 結(jié)論 1:對(duì)于原生類型與 String 來(lái)說(shuō),如果這些變量是用 val 修飾的,則在多個(gè)線程中訪問(wèn)沒(méi)有問(wèn)題,如果是 var
    修飾的變量,則會(huì)拋出異常。對(duì)于其他引用類型的全局變量(不加特殊修飾)來(lái)說(shuō),無(wú)論用 val 還是 var
    修飾,都只能在主線程訪問(wèn)。

這條結(jié)論是官方文檔中沒(méi)有提到的,也算是踩坑的一個(gè)收獲。

在這里有個(gè)插曲,既然 val 修飾的基本類型與 String 一定是不可變的,那對(duì)于局部變量這個(gè)結(jié)論是否也成立?
我們把對(duì)象的傳遞小節(jié)中的第一個(gè)示例修改一下:

fun main() {
    val worker = Worker.start(true, "worker1")
    val testData = "abc"
    val future = worker.execute(TransferMode.SAFE, { testData }) {
        println(it)
        it
    }
    future.consume { println(it) }
}

最主要的變化就是把 testData 換成了一個(gè) String,程序正常,多測(cè)試幾次,對(duì)原生類型也是成立的,因此結(jié)論 1對(duì)局部變量也成立。
其實(shí)仔細(xì)思考一下,對(duì)于 val 修飾的原生類型與 String,從邏輯上確實(shí)可以證明它們一定是不可變。

@ThreadLocal 與 @SharedImmutable 以及單例

修改上面的示例:

@ThreadLocal
val testData = TestData()

fun main() {
    val worker = Worker.start(true, "worker1")
    val future = worker.execute(TransferMode.UNSAFE, {}) {
        println(++testData.index)
    }
    future.consume { println(testData.index) }
}

data class TestData(var index: Int = 0)

輸出如下:

1
0

結(jié)果與官方的相同,如果全局變量使用 @ThreadLocal 修飾,則該變量在每個(gè)線程都有不同的副本,即使修改,也在線程之間不可見(jiàn)。

再修改示例,僅僅把上一個(gè)示例中的 @ThreadLocal 改成 @SharedImmutable,然后程序拋出異常;再把 println(++testData.index)
改成 println(testData.index) 程序運(yùn)行正常,根據(jù)官方的說(shuō)法 @SharedImmutable 的作用是將變量?jī)鼋Y(jié),這樣的話該變量就可以共享了,
但它畢竟只是一個(gè)注解,如果你編寫了修改該變量的代碼,也只能在運(yùn)行時(shí)才能發(fā)現(xiàn)問(wèn)題。

最后看看單例:

object A {
    var index = 1
}

fun main() {
    val worker = Worker.start(true, "worker1")
    val future = worker.execute(TransferMode.UNSAFE, {}) {
        println(A.index)
    }
    future.consume { println(A.index) }
}

如果運(yùn)行程序,我們就發(fā)現(xiàn) object 修飾的單例與使用 @SharedImmutable 修飾的全局變量行為是一致的,不過(guò),
單例也可以使用 @ThreadLocal 來(lái)修飾,這也就不多說(shuō)了。

總結(jié)以及其他

如果說(shuō)還有什么是我沒(méi)有提到的,那應(yīng)該就是對(duì)象子圖分離和原始共享內(nèi)存,不過(guò)這兩部分內(nèi)容主要是用于 C
程序與 Kotlin/Native 交互的情況,例如將 Kotlin/Native 對(duì)象保存到 C 結(jié)構(gòu)體中,在真實(shí)的用例中,我們使用 Kotlin/Native
調(diào)用 C 代碼的情況應(yīng)該占絕大多數(shù),而使用 C 調(diào)用 Kotlin/Native 應(yīng)該極少發(fā)生,因此以后有機(jī)會(huì)再探討這部分內(nèi)容。

開(kāi)篇講過(guò) Worker 是目前 Kotlin/Native 實(shí)現(xiàn)并行計(jì)算的主要工具,不過(guò) Native 版的協(xié)程最近也推出了多線程版本的預(yù)覽版,
關(guān)于這部分內(nèi)容將是下一篇文章要重點(diǎn)探討的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容