第9章 輕量級(jí)線程:協(xié)程
《Kotlin極簡(jiǎn)教程》正式上架:
點(diǎn)擊這里 > 去京東商城購(gòu)買閱讀
點(diǎn)擊這里 > 去天貓商城購(gòu)買閱讀
非常感謝您親愛的讀者,大家請(qǐng)多支持!?。∮腥魏螁栴},歡迎隨時(shí)與我交流~
在常用的并發(fā)模型中,多進(jìn)程、多線程、分布式是最普遍的,不過近些年來逐漸有一些語(yǔ)言以first-class或者library的形式提供對(duì)基于協(xié)程的并發(fā)模型的支持。其中比較典型的有Scheme、Lua、Python、Perl、Go等以first-class的方式提供對(duì)協(xié)程的支持。
同樣地,Kotlin也支持協(xié)程。
本章我們主要介紹:
- 什么是協(xié)程
- 協(xié)程的用法實(shí)例
- 掛起函數(shù)
- 通道與管道
- 協(xié)程的實(shí)現(xiàn)原理
- coroutine庫(kù)等
9.1 協(xié)程簡(jiǎn)介
從硬件發(fā)展來看,從最初的單核單CPU,到單核多CPU,多核多CPU,似乎已經(jīng)到了極限了,但是單核CPU性能卻還在不斷提升。如果將程序分為IO密集型應(yīng)用和CPU密集型應(yīng)用,二者的發(fā)展歷程大致如下:
IO密集型應(yīng)用: 多進(jìn)程->多線程->事件驅(qū)動(dòng)->協(xié)程
CPU密集型應(yīng)用:多進(jìn)程-->多線程
如果說多進(jìn)程對(duì)于多CPU,多線程對(duì)應(yīng)多核CPU,那么事件驅(qū)動(dòng)和協(xié)程則是在充分挖掘不斷提高性能的單核CPU的潛力。
常見的有性能瓶頸的API (例如網(wǎng)絡(luò) IO、文件 IO、CPU 或 GPU 密集型任務(wù)等),要求調(diào)用者阻塞(blocking)直到它們完成才能進(jìn)行下一步。后來,我們又使用異步回調(diào)的方式來實(shí)現(xiàn)非阻塞,但是異步回調(diào)代碼寫起來并不簡(jiǎn)單。
協(xié)程提供了一種避免阻塞線程并用更簡(jiǎn)單、更可控的操作替代線程阻塞的方法:協(xié)程掛起。
協(xié)程主要是讓原來要使用“異步+回調(diào)方式”寫出來的復(fù)雜代碼, 簡(jiǎn)化成可以用看似同步的方式寫出來(對(duì)線程的操作進(jìn)一步抽象)。這樣我們就可以按串行的思維模型去組織原本分散在不同上下文中的代碼邏輯,而不需要去處理復(fù)雜的狀態(tài)同步問題。
協(xié)程最早的描述是由Melvin Conway于1958年給出:“subroutines who act as the master program”(與主程序行為類似的子例程)。此后他又在博士論文中給出了如下定義:
- 數(shù)據(jù)在后續(xù)調(diào)用中始終保持( The values of data local to a coroutine persist between successive calls 協(xié)程的局部)
- 當(dāng)控制流程離開時(shí),協(xié)程的執(zhí)行被掛起,此后控制流程再次進(jìn)入這個(gè)協(xié)程時(shí),這個(gè)協(xié)程只應(yīng)從上次離開掛起的地方繼續(xù) (The execution of a coroutine is suspended as control leaves it, only to carry on where it left off when control re-enters the coroutine at some later stage)。
協(xié)程的實(shí)現(xiàn)要維護(hù)一組局部狀態(tài),在重新進(jìn)入?yún)f(xié)程前,保證這些狀態(tài)不被改變,從而能順利定位到之前的位置。
協(xié)程可以用來解決很多問題,比如nodejs的嵌套回調(diào),Erlang以及Golang的并發(fā)模型實(shí)現(xiàn)等。
實(shí)質(zhì)上,協(xié)程(coroutine)是一種用戶態(tài)的輕量級(jí)線程。它由協(xié)程構(gòu)建器(launch coroutine builder)啟動(dòng)。
下面我們通過代碼實(shí)踐來學(xué)習(xí)協(xié)程的相關(guān)內(nèi)容。
9.1.1 搭建協(xié)程代碼工程
首先,我們來新建一個(gè)Kotlin Gradle工程。生成標(biāo)準(zhǔn)gradle工程后,在配置文件build.gradle中,配置kotlinx-coroutines-core依賴:
添加 dependencies :
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.16'
kotlinx-coroutines還提供了下面的模塊:
compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-jdk8', version: '0.16'
compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-nio', version: '0.16'
compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-reactive', version: '0.16'
我們使用Kotlin最新的1.1.3-2 版本:
buildscript {
ext.kotlin_version = '1.1.3-2'
...
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
}
}
其中,kotlin-gradle-plugin是Kotlin集成Gradle的插件。
另外,配置一下JCenter 的倉(cāng)庫(kù):
repositories {
jcenter()
}
9.1.2 簡(jiǎn)單協(xié)程示例
下面我們先來看一個(gè)簡(jiǎn)單的協(xié)程示例。
運(yùn)行下面的代碼:
fun firstCoroutineDemo0() {
launch(CommonPool) {
delay(3000L, TimeUnit.MILLISECONDS)
println("Hello,")
}
println("World!")
Thread.sleep(5000L)
}
你將會(huì)發(fā)現(xiàn)輸出:
World!
Hello,
上面的這段代碼:
launch(CommonPool) {
delay(3000L, TimeUnit.MILLISECONDS)
println("Hello,")
}
等價(jià)于:
launch(CommonPool, CoroutineStart.DEFAULT, {
delay(3000L, TimeUnit.MILLISECONDS)
println("Hello, ")
})
9.1.3 launch函數(shù)
這個(gè)launch函數(shù)定義在kotlinx.coroutines.experimental下面。
public fun launch(
context: CoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.initParentJob(context[Job])
start(block, coroutine, coroutine)
return coroutine
}
launch函數(shù)有3個(gè)入?yún)ⅲ篶ontext、start、block,這些函數(shù)參數(shù)分別說明如下:
| 參數(shù) | 說明 |
|---|---|
| context | 協(xié)程上下文 |
| start | 協(xié)程啟動(dòng)選項(xiàng) |
| block | 協(xié)程真正要執(zhí)行的代碼塊,必須是suspend修飾的掛起函數(shù) |
這個(gè)launch函數(shù)返回一個(gè)Job類型,Job是協(xié)程創(chuàng)建的后臺(tái)任務(wù)的概念,它持有該協(xié)程的引用。Job接口實(shí)際上繼承自CoroutineContext類型。一個(gè)Job有如下三種狀態(tài):
| State | isActive | isCompleted |
|---|---|---|
| New (optional initial state) 新建 (可選的初始狀態(tài)) | false |
false |
| Active (default initial state) 活動(dòng)中(默認(rèn)初始狀態(tài)) | true |
false |
| Completed (final state) 已結(jié)束(最終狀態(tài)) | false |
true |
也就是說,launch函數(shù)它以非阻塞(non-blocking)當(dāng)前線程的方式,啟動(dòng)一個(gè)新的協(xié)程后臺(tái)任務(wù),并返回一個(gè)Job類型的對(duì)象作為當(dāng)前協(xié)程的引用。
另外,這里的delay()函數(shù)類似Thread.sleep()的功能,但更好的是:它不會(huì)阻塞線程,而只是掛起協(xié)程本身。當(dāng)協(xié)程在等待時(shí),線程將返回到池中, 當(dāng)?shù)却瓿蓵r(shí), 協(xié)同將在池中的空閑線程上恢復(fù)。
9.1.4 CommonPool:共享線程池
我們?cè)賮砜匆幌?code>launch(CommonPool) {...}這段代碼。
首先,這個(gè)CommonPool是代表共享線程池,它的主要作用是來調(diào)度計(jì)算密集型任務(wù)的協(xié)程的執(zhí)行。它的實(shí)現(xiàn)使用的是java.util.concurrent包下面的API。它首先嘗試創(chuàng)建一個(gè)java.util.concurrent.ForkJoinPool (ForkJoinPool是一個(gè)可以執(zhí)行ForkJoinTask的ExcuteService,它采用了work-stealing模式:所有在池中的線程嘗試去執(zhí)行其他線程創(chuàng)建的子任務(wù),這樣很少有線程處于空閑狀態(tài),更加高效);如果不可用,就使用java.util.concurrent.Executors來創(chuàng)建一個(gè)普通的線程池:Executors.newFixedThreadPool。相關(guān)代碼在kotlinx/coroutines/experimental/CommonPool.kt中:
private fun createPool(): ExecutorService {
val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
?: return createPlainPool()
if (!usePrivatePool) {
Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
?.let { return it }
}
Try { fjpClass.getConstructor(Int::class.java).newInstance(defaultParallelism()) as? ExecutorService }
?. let { return it }
return createPlainPool()
}
private fun createPlainPool(): ExecutorService {
val threadId = AtomicInteger()
return Executors.newFixedThreadPool(defaultParallelism()) {
Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
}
}
這個(gè)CommonPool對(duì)象類是CoroutineContext的子類型。它們的類型集成層次結(jié)構(gòu)如下:

9.1.5 掛起函數(shù)
代碼塊中的delay(3000L, TimeUnit.MILLISECONDS)函數(shù),是一個(gè)用suspend關(guān)鍵字修飾的函數(shù),我們稱之為掛起函數(shù)。掛起函數(shù)只能從協(xié)程代碼內(nèi)部調(diào)用,普通的非協(xié)程的代碼不能調(diào)用。
掛起函數(shù)只允許由協(xié)程或者另外一個(gè)掛起函數(shù)里面調(diào)用, 例如我們?cè)趨f(xié)程代碼中調(diào)用一個(gè)掛起函數(shù),代碼示例如下:
suspend fun runCoroutineDemo() {
run(CommonPool) {
delay(3000L, TimeUnit.MILLISECONDS)
println("suspend,")
}
println("runCoroutineDemo!")
Thread.sleep(5000L)
}
fun callSuspendFun() {
launch(CommonPool) {
runCoroutineDemo()
}
}
如果我們用Java中的Thread類來寫類似功能的代碼,上面的代碼可以寫成這樣:
fun threadDemo0() {
Thread({
Thread.sleep(3000L)
println("Hello,")
}).start()
println("World!")
Thread.sleep(5000L)
}
輸出結(jié)果也是:
World!
Hello,
另外, 我們不能使用Thread來啟動(dòng)協(xié)程代碼。例如下面的寫法編譯器會(huì)報(bào)錯(cuò):
/**
* 錯(cuò)誤反例:用線程調(diào)用協(xié)程 error
*/
fun threadCoroutineDemo() {
Thread({
delay(3000L, TimeUnit.MILLISECONDS) // error, Suspend functions are only allowed to be called from a coroutine or another suspend function
println("Hello,")
})
println("World!")
Thread.sleep(5000L)
}
9.2 橋接 阻塞和非阻塞
上面的例子中,我們給出的是使用非阻塞的delay函數(shù),同時(shí)有使用了阻塞的Thread.sleep函數(shù),這樣代碼寫在一起可讀性不是那么地好。讓我們來使用純的Kotlin的協(xié)程代碼來實(shí)現(xiàn)上面的 阻塞+非阻塞 的例子(不用Thread)。
9.2.1 runBlocking函數(shù)
Kotlin中提供了runBlocking函數(shù)來實(shí)現(xiàn)類似主協(xié)程的功能:
fun main(args: Array<String>) = runBlocking<Unit> {
// 主協(xié)程
println("${format(Date())}: T0")
// 啟動(dòng)主協(xié)程
launch(CommonPool) {
//在common thread pool中創(chuàng)建協(xié)程
println("${format(Date())}: T1")
delay(3000L)
println("${format(Date())}: T2 Hello,")
}
println("${format(Date())}: T3 World!") // 當(dāng)子協(xié)程被delay,主協(xié)程仍然繼續(xù)運(yùn)行
delay(5000L)
println("${format(Date())}: T4")
}
運(yùn)行結(jié)果:
14:37:59.640: T0
14:37:59.721: T1
14:37:59.721: T3 World!
14:38:02.763: T2 Hello,
14:38:04.738: T4
可以發(fā)現(xiàn),運(yùn)行結(jié)果跟之前的是一樣的,但是我們沒有使用Thread.sleep,我們只使用了非阻塞的delay函數(shù)。如果main函數(shù)不加 = runBlocking<Unit> , 那么我們是不能在main函數(shù)體內(nèi)調(diào)用delay(5000L)的。
如果這個(gè)阻塞的線程被中斷,runBlocking拋出InterruptedException異常。
該runBlocking函數(shù)不是用來當(dāng)做普通協(xié)程函數(shù)使用的,它的設(shè)計(jì)主要是用來橋接普通阻塞代碼和掛起風(fēng)格的(suspending style)的非阻塞代碼的, 例如用在 main 函數(shù)中,或者用于測(cè)試用例代碼中。
@RunWith(JUnit4::class)
class RunBlockingTest {
@Test fun testRunBlocking() = runBlocking<Unit> {
// 這樣我們就可以在這里調(diào)用任何suspend fun了
launch(CommonPool) {
delay(3000L)
}
delay(5000L)
}
}
9.3 等待一個(gè)任務(wù)執(zhí)行完畢
我們先來看一段代碼:
fun firstCoroutineDemo() {
launch(CommonPool) {
delay(3000L, TimeUnit.MILLISECONDS)
println("[firstCoroutineDemo] Hello, 1")
}
launch(CommonPool, CoroutineStart.DEFAULT, {
delay(3000L, TimeUnit.MILLISECONDS)
println("[firstCoroutineDemo] Hello, 2")
})
println("[firstCoroutineDemo] World!")
}
運(yùn)行這段代碼,我們會(huì)發(fā)現(xiàn)只輸出:
[firstCoroutineDemo] World!
這是為什么?
為了弄清上面的代碼執(zhí)行的內(nèi)部過程,我們打印一些日志看下:
fun testJoinCoroutine() = runBlocking<Unit> {
// Start a coroutine
val c1 = launch(CommonPool) {
println("C1 Thread: ${Thread.currentThread()}")
println("C1 Start")
delay(3000L)
println("C1 World! 1")
}
val c2 = launch(CommonPool) {
println("C2 Thread: ${Thread.currentThread()}")
println("C2 Start")
delay(5000L)
println("C2 World! 2")
}
println("Main Thread: ${Thread.currentThread()}")
println("Hello,")
println("Hi,")
println("c1 is active: ${c1.isActive} ${c1.isCompleted}")
println("c2 is active: ${c2.isActive} ${c2.isCompleted}")
}
再次運(yùn)行:
C1 Thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
C1 Start
C2 Thread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
C2 Start
Main Thread: Thread[main,5,main]
Hello,
Hi,
c1 is active: true false
c2 is active: true false
我們可以看到,這里的C1、C2代碼也開始執(zhí)行了,使用的是ForkJoinPool.commonPool-worker線程池中的worker線程。但是,我們?cè)诖a執(zhí)行到最后打印出這兩個(gè)協(xié)程的狀態(tài)isCompleted都是false,這表明我們的C1、C2的代碼,在Main Thread結(jié)束的時(shí)刻(此時(shí)的運(yùn)行main函數(shù)的Java進(jìn)程也退出了),還沒有執(zhí)行完畢,然后就跟著主線程一起退出結(jié)束了。
所以我們可以得出結(jié)論:運(yùn)行 main () 函數(shù)的主線程, 必須要等到我們的協(xié)程完成之前結(jié)束 , 否則我們的程序在 打印Hello, 1和Hello, 2之前就直接結(jié)束掉了。
我們?cè)鯓幼屵@兩個(gè)協(xié)程參與到主線程的時(shí)間順序里呢?我們可以使用join, 讓主線程一直等到當(dāng)前協(xié)程執(zhí)行完畢再結(jié)束, 例如下面的這段代碼
fun testJoinCoroutine() = runBlocking<Unit> {
// Start a coroutine
val c1 = launch(CommonPool) {
println("C1 Thread: ${Thread.currentThread()}")
println("C1 Start")
delay(3000L)
println("C1 World! 1")
}
val c2 = launch(CommonPool) {
println("C2 Thread: ${Thread.currentThread()}")
println("C2 Start")
delay(5000L)
println("C2 World! 2")
}
println("Main Thread: ${Thread.currentThread()}")
println("Hello,")
println("c1 is active: ${c1.isActive} isCompleted: ${c1.isCompleted}")
println("c2 is active: ${c2.isActive} isCompleted: ${c2.isCompleted}")
c1.join() // the main thread will wait until child coroutine completes
println("Hi,")
println("c1 is active: ${c1.isActive} isCompleted: ${c1.isCompleted}")
println("c2 is active: ${c2.isActive} isCompleted: ${c2.isCompleted}")
c2.join() // the main thread will wait until child coroutine completes
println("c1 is active: ${c1.isActive} isCompleted: ${c1.isCompleted}")
println("c2 is active: ${c2.isActive} isCompleted: ${c2.isCompleted}")
}
將會(huì)輸出:
C1 Thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
C1 Start
C2 Thread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
C2 Start
Main Thread: Thread[main,5,main]
Hello,
c1 is active: true isCompleted: false
c2 is active: true isCompleted: false
C1 World! 1
Hi,
c1 is active: false isCompleted: true
c2 is active: true isCompleted: false
C2 World! 2
c1 is active: false isCompleted: true
c2 is active: false isCompleted: true
通常,良好的代碼風(fēng)格我們會(huì)把一個(gè)單獨(dú)的邏輯放到一個(gè)獨(dú)立的函數(shù)中,我們可以重構(gòu)上面的代碼如下:
fun testJoinCoroutine2() = runBlocking<Unit> {
// Start a coroutine
val c1 = launch(CommonPool) {
fc1()
}
val c2 = launch(CommonPool) {
fc2()
}
...
}
private suspend fun fc2() {
println("C2 Thread: ${Thread.currentThread()}")
println("C2 Start")
delay(5000L)
println("C2 World! 2")
}
private suspend fun fc1() {
println("C1 Thread: ${Thread.currentThread()}")
println("C1 Start")
delay(3000L)
println("C1 World! 1")
}
可以看出,我們這里的fc1, fc2函數(shù)是suspend fun。
9.4 協(xié)程是輕量級(jí)的
直接運(yùn)行下面的代碼:
fun testThread() {
val jobs = List(100_1000) {
Thread({
Thread.sleep(1000L)
print(".")
})
}
jobs.forEach { it.start() }
jobs.forEach { it.join() }
}
我們應(yīng)該會(huì)看到輸出報(bào)錯(cuò):
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at com.easy.kotlin.LightWeightCoroutinesDemo.testThread(LightWeightCoroutinesDemo.kt:30)
at com.easy.kotlin.LightWeightCoroutinesDemoKt.main(LightWeightCoroutinesDemo.kt:40)
...........................................................................................
我們這里直接啟動(dòng)了100,000個(gè)線程,并join到一起打印".", 不出意外的我們收到了java.lang.OutOfMemoryError。
這個(gè)異常問題本質(zhì)原因是我們創(chuàng)建了太多的線程,而能創(chuàng)建的線程數(shù)是有限制的,導(dǎo)致了異常的發(fā)生。在Java中, 當(dāng)我們創(chuàng)建一個(gè)線程的時(shí)候,虛擬機(jī)會(huì)在JVM內(nèi)存創(chuàng)建一個(gè)Thread對(duì)象同時(shí)創(chuàng)建一個(gè)操作系統(tǒng)線程,而這個(gè)系統(tǒng)線程的內(nèi)存用的不是JVMMemory,而是系統(tǒng)中剩下的內(nèi)存(MaxProcessMemory - JVMMemory - ReservedOsMemory)。 能創(chuàng)建的線程數(shù)的具體計(jì)算公式如下:
Number of Threads = (MaxProcessMemory - JVMMemory - ReservedOsMemory) / (ThreadStackSize)
其中,參數(shù)說明如下:
| 參數(shù) | 說明 |
|---|---|
| MaxProcessMemory | 指的是一個(gè)進(jìn)程的最大內(nèi)存 |
| JVMMemory | JVM內(nèi)存 |
| ReservedOsMemory | 保留的操作系統(tǒng)內(nèi)存 |
| ThreadStackSize | 線程棧的大小 |
我們通常在優(yōu)化這種問題的時(shí)候,要么是采用減小thread stack的大小的方法,要么是采用減小heap或permgen初始分配的大小方法等方式來臨時(shí)解決問題。
在協(xié)程中,情況完全就不一樣了。我們看一下實(shí)現(xiàn)上面的邏輯的協(xié)程代碼:
fun testLightWeightCoroutine() = runBlocking {
val jobs = List(100_000) {
// create a lot of coroutines and list their jobs
launch(CommonPool) {
delay(1000L)
print(".")
}
}
jobs.forEach { it.join() } // wait for all jobs to complete
}
運(yùn)行上面的代碼,我們將看到輸出:
START: 21:22:28.913
.....................
.....................(100000個(gè))
.....END: 21:22:30.956
上面的程序在2s左右的時(shí)間內(nèi)正確執(zhí)行完畢。
9.5 協(xié)程 vs 守護(hù)線程
在Java中有兩類線程:用戶線程 (User Thread)、守護(hù)線程 (Daemon Thread)。
所謂守護(hù) 線程,是指在程序運(yùn)行的時(shí)候在后臺(tái)提供一種通用服務(wù)的線程,比如垃圾回收線程就是一個(gè)很稱職的守護(hù)者,并且這種線程并不屬于程序中不可或缺的部分。因此,當(dāng)所有的非守護(hù)線程結(jié)束時(shí),程序也就終止了,同時(shí)會(huì)殺死進(jìn)程中的所有守護(hù)線程。
我們來看一段Thread的守護(hù)線程的代碼:
fun testDaemon2() {
val t = Thread({
repeat(100) { i ->
println("I'm sleeping $i ...")
Thread.sleep(500L)
}
})
t.isDaemon = true // 必須在啟動(dòng)線程前調(diào)用,否則會(huì)報(bào)錯(cuò):Exception in thread "main" java.lang.IllegalThreadStateException
t.start()
Thread.sleep(2000L) // just quit after delay
}
這段代碼啟動(dòng)一個(gè)線程,并設(shè)置為守護(hù)線程。線程內(nèi)部是間隔500ms 重復(fù)打印100次輸出。外部主線程睡眠2s。
運(yùn)行這段代碼,將會(huì)輸出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
I'm sleeping 3 ...
協(xié)程跟守護(hù)線程很像,用協(xié)程來寫上面的邏輯,代碼如下:
fun testDaemon1() = runBlocking {
launch(CommonPool) {
repeat(100) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(2000L) // just quit after delay
}
運(yùn)行這段代碼,我們發(fā)現(xiàn)也輸出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
I'm sleeping 3 ...
我們可以看出,活動(dòng)的協(xié)程不會(huì)使進(jìn)程保持活動(dòng)狀態(tài)。它們的行為就像守護(hù)程序線程。
9.6 協(xié)程執(zhí)行的取消
我們知道,啟動(dòng)函數(shù)launch返回一個(gè)Job引用當(dāng)前協(xié)程,該Job引用可用于取消正在運(yùn)行協(xié)程:
fun testCancellation() = runBlocking<Unit> {
val job = launch(CommonPool) {
repeat(1000) { i ->
println("I'm sleeping $i ... CurrentThread: ${Thread.currentThread()}")
delay(500L)
}
}
delay(1300L)
println("CurrentThread: ${Thread.currentThread()}")
println("Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
val b1 = job.cancel() // cancels the job
println("job cancel: $b1")
delay(1300L)
println("Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
val b2 = job.cancel() // cancels the job, job already canceld, return false
println("job cancel: $b2")
println("main: Now I can quit.")
}
運(yùn)行上面的代碼,將會(huì)輸出:
I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 2 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
CurrentThread: Thread[main,5,main]
Job is alive: true Job is completed: false
job cancel: true
Job is alive: false Job is completed: true
job cancel: false
main: Now I can quit.
我們可以看出,當(dāng)job還在運(yùn)行時(shí),isAlive是true,isCompleted是false。當(dāng)調(diào)用job.cancel取消該協(xié)程任務(wù),cancel函數(shù)本身返回true, 此時(shí)協(xié)程的打印動(dòng)作就停止了。此時(shí),job的狀態(tài)是isAlive是false,isCompleted是true。 如果,再次調(diào)用job.cancel函數(shù),我們將會(huì)看到cancel函數(shù)返回的是false。
9.6.1 計(jì)算代碼的協(xié)程取消失效
kotlinx 協(xié)程的所有suspend函數(shù)都是可以取消的。我們可以通過job的isActive狀態(tài)來判斷協(xié)程的狀態(tài),或者檢查手否有拋出 CancellationException 時(shí)取消。
例如,協(xié)程正工作在循環(huán)計(jì)算中,并且不檢查協(xié)程當(dāng)前的狀態(tài), 那么調(diào)用cancel來取消協(xié)程將無法停止協(xié)程的運(yùn)行, 如下面的示例所示:
fun testCooperativeCancellation1() = runBlocking<Unit> {
val job = launch(CommonPool) {
var nextPrintTime = 0L
var i = 0
while (i < 20) { // computation loop
val currentTime = System.currentTimeMillis()
if (currentTime >= nextPrintTime) {
println("I'm sleeping ${i++} ... CurrentThread: ${Thread.currentThread()}")
nextPrintTime = currentTime + 500L
}
}
}
delay(3000L)
println("CurrentThread: ${Thread.currentThread()}")
println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
val b1 = job.cancel() // cancels the job
println("job cancel1: $b1")
println("After Cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
delay(30000L)
val b2 = job.cancel() // cancels the job, job already canceld, return false
println("job cancel2: $b2")
println("main: Now I can quit.")
}
運(yùn)行上面的代碼,輸出:
I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
...
I'm sleeping 6 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
CurrentThread: Thread[main,5,main]
Before cancel, Job is alive: true Job is completed: false
job cancel1: true
After Cancel, Job is alive: false Job is completed: true
I'm sleeping 7 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
...
I'm sleeping 18 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 19 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
job cancel2: false
main: Now I can quit.
我們可以看出,即使我們調(diào)用了cancel函數(shù),當(dāng)前的job狀態(tài)isAlive是false了,但是協(xié)程的代碼依然一直在運(yùn)行,并沒有停止。
9.6.2 計(jì)算代碼協(xié)程的有效取消
有兩種方法可以使計(jì)算代碼取消成功。
方法一: 顯式檢查取消狀態(tài)isActive
我們直接給出實(shí)現(xiàn)的代碼:
fun testCooperativeCancellation2() = runBlocking<Unit> {
val job = launch(CommonPool) {
var nextPrintTime = 0L
var i = 0
while (i < 20) { // computation loop
if (!isActive) {
return@launch
}
val currentTime = System.currentTimeMillis()
if (currentTime >= nextPrintTime) {
println("I'm sleeping ${i++} ... CurrentThread: ${Thread.currentThread()}")
nextPrintTime = currentTime + 500L
}
}
}
delay(3000L)
println("CurrentThread: ${Thread.currentThread()}")
println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
val b1 = job.cancel() // cancels the job
println("job cancel1: $b1")
println("After Cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
delay(3000L)
val b2 = job.cancel() // cancels the job, job already canceld, return false
println("job cancel2: $b2")
println("main: Now I can quit.")
}
運(yùn)行這段代碼,輸出:
I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 2 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 3 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 4 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 5 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 6 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
CurrentThread: Thread[main,5,main]
Before cancel, Job is alive: true Job is completed: false
job cancel1: true
After Cancel, Job is alive: false Job is completed: true
job cancel2: false
main: Now I can quit.
正如您所看到的, 現(xiàn)在這個(gè)循環(huán)可以被取消了。這里的isActive屬性是CoroutineScope中的屬性。這個(gè)接口的定義是:
public interface CoroutineScope {
public val isActive: Boolean
public val context: CoroutineContext
}
該接口用于通用協(xié)程構(gòu)建器的接收器,以便協(xié)程中的代碼可以方便的訪問其isActive狀態(tài)值(取消狀態(tài)),以及其上下文CoroutineContext信息。
方法二: 循環(huán)調(diào)用一個(gè)掛起函數(shù)yield()
該方法實(shí)質(zhì)上是通過job的isCompleted狀態(tài)值來捕獲CancellationException完成取消功能。
我們只需要在while循環(huán)體中循環(huán)調(diào)用yield()來檢查該job的取消狀態(tài),如果已經(jīng)被取消,那么isCompleted值將會(huì)是true,yield函數(shù)就直接拋出CancellationException異常,從而完成取消的功能:
val job = launch(CommonPool) {
var nextPrintTime = 0L
var i = 0
while (i < 20) { // computation loop
yield()
val currentTime = System.currentTimeMillis()
if (currentTime >= nextPrintTime) {
println("I'm sleeping ${i++} ... CurrentThread: ${Thread.currentThread()}")
nextPrintTime = currentTime + 500L
}
}
}
運(yùn)行上面的代碼,輸出:
I'm sleeping 0 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
I'm sleeping 1 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
I'm sleeping 2 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
I'm sleeping 3 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-3,5,main]
I'm sleeping 4 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-3,5,main]
I'm sleeping 5 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-3,5,main]
I'm sleeping 6 ... CurrentThread: Thread[ForkJoinPool.commonPool-worker-2,5,main]
CurrentThread: Thread[main,5,main]
Before cancel, Job is alive: true Job is completed: false
job cancel1: true
After Cancel, Job is alive: false Job is completed: true
job cancel2: false
main: Now I can quit.
如果我們想看看yield函數(shù)拋出的異常,我們可以加上try catch打印出日志:
try {
yield()
} catch (e: Exception) {
println("$i ${e.message}")
}
我們可以看到類似:Job was cancelled 這樣的信息。
這個(gè)yield函數(shù)的實(shí)現(xiàn)是:
suspend fun yield(): Unit = suspendCoroutineOrReturn sc@ { cont ->
val context = cont.context
val job = context[Job]
if (job != null && job.isCompleted) throw job.getCompletionException()
if (cont !is DispatchedContinuation<Unit>) return@sc Unit
if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
cont.dispatchYield(job, Unit)
COROUTINE_SUSPENDED
}
如果調(diào)用此掛起函數(shù)時(shí),當(dāng)前協(xié)程的Job已經(jīng)完成 (isActive = false, isCompleted = true),當(dāng)前協(xié)程將以CancellationException取消。
9.6.3 在finally中的協(xié)程代碼
當(dāng)我們?nèi)∠粋€(gè)協(xié)程任務(wù)時(shí),如果有try {...} finally {...}代碼塊,那么finally {...}中的代碼會(huì)被正常執(zhí)行完畢:
fun finallyCancelDemo() = runBlocking {
val job = launch(CommonPool) {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
println("I'm running finally")
}
}
delay(2000L)
println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
job.cancel()
println("After cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
delay(2000L)
println("main: Now I can quit.")
}
運(yùn)行這段代碼,輸出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
I'm sleeping 3 ...
Before cancel, Job is alive: true Job is completed: false
I'm running finally
After cancel, Job is alive: false Job is completed: true
main: Now I can quit.
我們可以看出,在調(diào)用cancel之后,就算當(dāng)前協(xié)程任務(wù)Job已經(jīng)結(jié)束了,finally{...}中的代碼依然被正常執(zhí)行。
但是,如果我們?cè)?code>finally{...}中放入掛起函數(shù):
fun finallyCancelDemo() = runBlocking {
val job = launch(CommonPool) {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
println("I'm running finally")
delay(1000L)
println("And I've delayed for 1 sec ?")
}
}
delay(2000L)
println("Before cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
job.cancel()
println("After cancel, Job is alive: ${job.isActive} Job is completed: ${job.isCompleted}")
delay(2000L)
println("main: Now I can quit.")
}
運(yùn)行上述代碼,我們將會(huì)發(fā)現(xiàn)只輸出了一句:I'm running finally。因?yàn)橹骶€程在掛起函數(shù)delay(1000L)以及后面的打印邏輯還沒執(zhí)行完,就已經(jīng)結(jié)束退出。
} finally {
println("I'm running finally")
delay(1000L)
println("And I've delayed for 1 sec ?")
}
9.6.4 協(xié)程執(zhí)行不可取消的代碼塊
如果我們想要上面的例子中的finally{...}完整執(zhí)行,不被取消函數(shù)操作所影響,我們可以使用 run 函數(shù)和 NonCancellable 上下文將相應(yīng)的代碼包裝在 run (NonCancellable) {...} 中, 如下面的示例所示:
fun testNonCancellable() = runBlocking {
val job = launch(CommonPool) {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
run(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(2000L)
println("main: I'm tired of waiting!")
job.cancel()
delay(2000L)
println("main: Now I can quit.")
}
運(yùn)行輸出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
I'm sleeping 3 ...
main: I'm tired of waiting!
I'm running finally
And I've just delayed for 1 sec because I'm non-cancellable
main: Now I can quit.
9.7 設(shè)置協(xié)程超時(shí)時(shí)間
我們通常取消協(xié)同執(zhí)行的原因給協(xié)程的執(zhí)行時(shí)間設(shè)定一個(gè)執(zhí)行時(shí)間上限。我們也可以使用 withTimeout 函數(shù)來給一個(gè)協(xié)程任務(wù)的執(zhí)行設(shè)定最大執(zhí)行時(shí)間,超出這個(gè)時(shí)間,就直接終止掉。代碼示例如下:
fun testTimeouts() = runBlocking {
withTimeout(3000L) {
repeat(100) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
運(yùn)行上述代碼,我們將會(huì)看到如下輸出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
I'm sleeping 3 ...
I'm sleeping 4 ...
I'm sleeping 5 ...
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutException: Timed out waiting for 3000 MILLISECONDS
at kotlinx.coroutines.experimental.TimeoutExceptionCoroutine.run(Scheduled.kt:110)
at kotlinx.coroutines.experimental.EventLoopImpl$DelayedRunnableTask.invoke(EventLoop.kt:199)
at kotlinx.coroutines.experimental.EventLoopImpl$DelayedRunnableTask.invoke(EventLoop.kt:195)
at kotlinx.coroutines.experimental.EventLoopImpl.processNextEvent(EventLoop.kt:111)
at kotlinx.coroutines.experimental.BlockingCoroutine.joinBlocking(Builders.kt:205)
at kotlinx.coroutines.experimental.BuildersKt.runBlocking(Builders.kt:150)
at kotlinx.coroutines.experimental.BuildersKt.runBlocking$default(Builders.kt:142)
at com.easy.kotlin.CancellingCoroutineDemo.testTimeouts(CancellingCoroutineDemo.kt:169)
at com.easy.kotlin.CancellingCoroutineDemoKt.main(CancellingCoroutineDemo.kt:193)
由 withTimeout 拋出的 TimeoutException 是 CancellationException 的一個(gè)子類。這個(gè)TimeoutException類型定義如下:
private class TimeoutException(
time: Long,
unit: TimeUnit,
@JvmField val coroutine: Job
) : CancellationException("Timed out waiting for $time $unit")
如果您需要在超時(shí)時(shí)執(zhí)行一些附加操作, 則可以把邏輯放在 try {...} catch (e: CancellationException) {...} 代碼塊中。例如:
try {
ccd.testTimeouts()
} catch (e: CancellationException) {
println("I am timed out!")
}
Kotlin 開發(fā)者社區(qū)
國(guó)內(nèi)第一Kotlin 開發(fā)者社區(qū)公眾號(hào),主要分享、交流 Kotlin 編程語(yǔ)言、Spring Boot、Android、React.js/Node.js、函數(shù)式編程、編程思想等相關(guān)主題。