Kotlin 語言中的協(xié)程 Coroutine 極大地幫助了開發(fā)者更加容易地處理異步編程。就 JVM 的角度而言,協(xié)程一定程度上減少了 “回調(diào)地獄” 的問題,切實地改進了異步處理的編碼方式。Coroutine 中封裝的諸多高效 API,可以確保開發(fā)者花費更小的精力去完成并發(fā)任務。今天就來對比一下 Coroutine 中的 delay() 和 Java 語言中的 sleep()。
delay()
如果使用過協(xié)程,對于 delay() 必然不陌生,先來看一下官方描述:
Delays coroutine for a given time without blocking a thread and resumes it after a specified time. If the given timeMillis is non-positive, this function returns immediately.
delay() 用來延遲協(xié)程一段時間,但不阻塞線程,并且能在指定的時間后恢復協(xié)程的執(zhí)行。
用法也很簡單,delay() 是 suspend 函數(shù),直接在 CoroutineScope 里調(diào)用即可:
lifecycleScope.launch {
Log.d(TAG, "1")
delay(1000)
Log.d(TAG, "2")
}
lifecycleScope.launch {
Log.d(TAG, "3")
}
上述代碼創(chuàng)建了兩個協(xié)程,且在第一個協(xié)程中使用了 delay(),但是這并不影響第二個協(xié)程。因此日志輸出結(jié)果為:1,3,2,其中1和2兩個日志輸出時間間隔1秒。
總結(jié)一下關(guān)于 delay() 的特點:
- 用于延遲當前協(xié)程
- 不會阻塞當前運行的線程
- 允許其他協(xié)程在同線程運行
- 當延遲的時間到了,協(xié)程會被恢復并繼續(xù)執(zhí)行
sleep()
sleep() 是 Java 語言中標準的多線程處理 API:促使當前執(zhí)行的線程進入休眠,并持續(xù)指定的一段時間。該方法一般用來告知 CPU 讓出處理時間給 App 的其他線程或者其他 App 的線程。
如果在協(xié)程里使用該函數(shù),它會導致當前運行的線程被阻塞,同時也會導致該線程的其他協(xié)程被阻塞,直到指定的阻塞時間完成。
對比 delay() 和 sleep()
假使在單線程里執(zhí)行并發(fā)任務。
下面的代碼分別啟動兩個協(xié)程,并各自調(diào)用了 1000ms 的 delay() 或 sleep()。
lifecycleScope.launch {
val totalTime = measureTimeMillis {
supervisorScope {
launch {
Log.d(TAG, "1")
delay(1000)
// Thread.sleep(1000)
Log.d(TAG, "2")
}
launch {
Log.d(TAG, "3")
delay(1000)
// Thread.sleep(1000)
Log.d(TAG, "4")
}
}
}
Log.d(TAG, "totalTime:$totalTime")
}
當調(diào)用 delay() 時,兩個協(xié)程在同一時間執(zhí)行,先輸出日志1和3,經(jīng)過1秒后,再輸出日志2和4,兩個協(xié)程一共花了 1144 ms。
當調(diào)用 sleep() 時,先執(zhí)行第一個協(xié)程輸出日志1,經(jīng)過1秒后,輸入日志2,同時執(zhí)行第二個協(xié)程,輸出日志3,再經(jīng)過1秒后,輸入之日4,兩個協(xié)程一共花了 2152 ms。
這也印證了上面提到的特性差異:delay() 只是掛起當前協(xié)程、同時允許其他協(xié)程運行該線程,而 sleep() 則在一段時間內(nèi)直接阻塞了整個線程。
再來看一下 delay() 的其他特點
下面先定義了一個最大創(chuàng)建 2 個線程的線程池 context 示例,然后創(chuàng)建兩個協(xié)程,并在第一個協(xié)程中使用了 delay(),日志輸出當前線程名字:
val duetContext = newFixedThreadPoolContext(2, "Duet")
runBlocking(duetContext) {
launch {
Log.d(TAG, "1-${Thread.currentThread().name}")
delay(1000)
Log.d(TAG, "2-${Thread.currentThread().name}")
}
launch {
Log.d(TAG, "3-${Thread.currentThread().name}")
}
}
我們已經(jīng)知道日志輸出結(jié)果為:1,3,2,其中1和2兩個日志輸出時間間隔1秒。每一個日志輸出的當前線程名字是什么呢?
1-Duet-2
3-Duet-1
2-Duet-1
一開始第一個協(xié)程在 delay 函數(shù)執(zhí)行前是運行在Duet-2線程的,但當 delay 完成后,它卻恢復到了另一個線程:Duet-1。這就是 delay() 的另一個特點:協(xié)程可以掛起一個 thread 并且恢復到另一個 thread!
delay() 原理
delay() 會先在協(xié)程上下文里找到 Delay 的實現(xiàn),接著執(zhí)行具體的延時處理。
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}
Delay 是 interface 類型,其定義了延時之后調(diào)度協(xié)程的方法 scheduleResumeAfterDelay() 等。開發(fā)者直接調(diào)用的 delay()、withTimeout() 正是 Delay 接口提供的支持。
public interface Delay {
public fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>)
public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
DefaultDelay.invokeOnTimeout(timeMillis, block, context)
}
事實上,Delay 接口由運行協(xié)程的各 CoroutineDispatcher 實現(xiàn)。
CoroutineDispatcher 是抽象類,Dispatchers 類會利用線程相關(guān) API 來實現(xiàn)它。比如:
-
Dispatchers.Default、Dispatchers.IO使用 java.util.concurrent 包下的 Executor API 來實現(xiàn)。 -
Dispatchers.Main使用 Android 平臺上特有的 Handler API 來實現(xiàn)。
各 Dispatcher 需要實現(xiàn) Delay 接口,主要就是實現(xiàn) scheduleResumeAfterDelay() ,去返回指定毫秒時間之后執(zhí)行協(xié)程的 Continuation 實例。
以下是 ExecutorCoroutineDispatcherImpl 類實現(xiàn)該方法的具體代碼:
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
ResumeUndispatchedRunnable(this, continuation),
continuation.context,
timeMillis
)
// Other implementation
}
private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
return try {
schedule(block, timeMillis, TimeUnit.MILLISECONDS)
} catch (e: RejectedExecutionException) {
cancelJobOnRejection(context, e)
null
}
}
可以看到借助了 Java 包 ScheduledExecutorService 的 schedule() 來調(diào)度了 Continuation 的恢復。
Dispatchers.Main 使用 HandlerDispatcher,看一下 HandlerDispatcher 又是如何實現(xiàn) scheduleResumeAfterDelay 方法的,具體實現(xiàn)在 HandlerContext 里:
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
} else {
cancelOnRejection(continuation.context, block)
}
}
可以看到直截了當?shù)厥褂昧?Handler 的 postDelayed() post 了 Continuation 恢復的 Runnable 對象。這也解釋了 delay() 沒有阻塞線程的原因。
所以假使在 Android 主線程的協(xié)程里執(zhí)行了 delay() 邏輯,其效果等同于調(diào)用了 Handler 的 postDelayed。
這種實現(xiàn)非常有趣:在 Android 平臺上調(diào)用 delay(),實際上相當于通過 Handler post 一個 delayed runnable;而在 JVM 平臺上則是利用 Executor API 這種類似的思路。