Kotlin 的鎖和多線程同步

Synchronized.kt 的源碼:

/**
 * Executes the given function [block] while holding the monitor of the given object [lock].
 */
@kotlin.internal.InlineOnly
public inline fun <R> synchronized(lock: Any, block: () -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }

    @Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")
    monitorEnter(lock)
    try {
        return block()
    }
    finally {
        @Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")
        monitorExit(lock)
    }
}

JvmFlagAnnotations.kt 的源碼:


/**
 * Marks the JVM backing field of the annotated property as `volatile`, meaning that writes to this field
 * are immediately made visible to other threads.
 */
@Target(FIELD)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Volatile

/**
 * Marks the JVM backing field of the annotated property as `transient`, meaning that it is not
 * part of the default serialized form of the object.
 */
@Target(FIELD)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Transient

/**
 * Marks the JVM method generated from the annotated function as `strictfp`, meaning that the precision
 * of floating point operations performed inside the method needs to be restricted in order to
 * achieve better portability.
 */
@Target(FUNCTION, CONSTRUCTOR, PROPERTY_GETTER, PROPERTY_SETTER, CLASS)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Strictfp

/**
 * Marks the JVM method generated from the annotated function as `synchronized`, meaning that the method
 * will be protected from concurrent execution by multiple threads by the monitor of the instance (or,
 * for static methods, the class) on which the method is defined.
 */
@Target(FUNCTION, PROPERTY_GETTER, PROPERTY_SETTER)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Synchronized

如何使用 Synchronized 同步鎖:
在Java中,給一個(gè)方法加鎖 ,需要給方法加 synchronized 關(guān)鍵字

public synchronized void doSomething() {

}

kotlin 中沒有 synchronized 關(guān)鍵之,取而代之的是 @Synchronized 注解

class MyUtil {
    @Synchronized
    fun doSomething() {

    }
}

Kotlin 在方法內(nèi),可以使用 block 塊

class Util {

    val lock = Any()

    fun main() {
        synchronized(lock) {

        }
    }
}

Volatile 關(guān)鍵字

有時(shí)僅僅為了讀寫一個(gè)或者兩個(gè)實(shí)例域就使用同步的話,顯得開銷過(guò)大;而Volatile關(guān)鍵字為實(shí)例域的同步訪問(wèn)提供了免鎖的機(jī)制。

當(dāng)一個(gè)共享變量被vlatile修飾時(shí),其就具備兩個(gè)含義:
1、一個(gè)線程修改了變量的值,變量的新值對(duì)其他線程是立即可見的。
2、禁止使用指令重排序。
什么是指令重排序呢?
重排序通常是編譯器或者運(yùn)行環(huán)境為了優(yōu)化程序性能而采取的對(duì)指令重新排序執(zhí)行的一種手段。重排序分為兩類:編譯期重排序和運(yùn)行期重排序,分別對(duì)應(yīng)著編譯時(shí)和運(yùn)行時(shí)環(huán)境。

在 kotlin 中沒有 volatile 關(guān)鍵字,但是有 @Volatile 注解

class Util {

    @Volatile
    var lock = Any()
}

在 kotlin 中的 Any 和 java 中的 Object 相似,每一個(gè)類都是從 Any 繼承過(guò)來(lái)的,但是 Any 并沒有聲明 wait() , notify() 和 notifyAll() 方法,這就意味著,你不能在kotlin類中調(diào)用這些方法。但是你仍然能夠使用java.lang.Object的實(shí)例作為lock,并且調(diào)用相關(guān)的方法。下面將會(huì)展示一個(gè)使用 Object 做為 lock 解決生產(chǎn)者和消費(fèi)者的問(wèn)題。

    private val lock = Object()

    fun produce() = synchronized(lock) {
          while(items>=maxItems) {
            lock.wait()
          }
          Thread.sleep(Random.nextInt(100).toLong())
          items++
          println("Produced, count is$items:${Thread.currentThread()}")
          lock.notifyAll()
    }

    fun consume() = synchronized(lock) {
        while(items<=0) {
            lock.wait()
        }
        Thread.sleep(Random.nextInt(100).toLong())
        items--
        println("Consumed, count is$items:${Thread.currentThread()}")
        lock.notifyAll()
    }
  • volatile變量具有可見性
  • volatile不保證原子性
  • volatile保證有序性
    volatile能禁止指令的重排序,因此volatile保證有序性。
    volatile禁止指令重排序的含義:當(dāng)程序執(zhí)行到volatile變量的時(shí)候,在其前面的語(yǔ)句已經(jīng)全部執(zhí)行完成,并且結(jié)果對(duì)后面可見,而且該變量后面的語(yǔ)句都沒有執(zhí)行。

正確使用volatile

synchronized可以防止多個(gè)線程同時(shí)執(zhí)行一段代碼,這會(huì)阻塞一部分線程的執(zhí)行,這樣就會(huì)影響程序的執(zhí)行效率。
而volatile在某些情況下的性能是優(yōu)于synchronized的。
但是volatile無(wú)法替代synchronized,因?yàn)関olatile無(wú)法保證操作的原子性。


通常情況下,我們使用volatile關(guān)鍵字要避開兩種場(chǎng)景:
1、對(duì)變量的寫操作依賴當(dāng)前值。
2、該變量包含在具有其他變量的不變式中。

使用volatile的場(chǎng)景有很多,這里介紹兩種常見的場(chǎng)景:

場(chǎng)景1:狀態(tài)標(biāo)志
當(dāng)多線程執(zhí)行該類的時(shí)候,我們需要對(duì)狀態(tài)標(biāo)志stop保持可見性,這樣我們的運(yùn)行才能實(shí)時(shí)保持正確的執(zhí)行。這種情況如果使用sychronized的話顯然要復(fù)雜的多。

class TestVolatile1 {

    @Volatile
    var stop = false;

    fun onInit(){
        if(stop){

        }
    }

    fun onStop(){
        stop = true;
    }

}

場(chǎng)景2: 雙重檢查模式(DCL)

在 Java 中,我們的單例模式經(jīng)常會(huì)這樣寫,第一次判空是為了不必要的同步操作,第二次判斷是只有在MyLock實(shí)例==null的時(shí)候才會(huì)去new一個(gè)實(shí)例出來(lái),當(dāng)多線程調(diào)用時(shí),當(dāng)進(jìn)行這兩次判空時(shí),我們需要保證instance的可見性。

public class MyLock {
    private static volatile MyLock instance = null;

    public static MyLock getInstance() {
        if(instance == null){
            synchronized (MyLock.class){
                if(instance == null){
                    instance = new MyLock();

                }
            }
        }
        return instance;
    }

}

在 Kotlin 中實(shí)現(xiàn)多線程同步的方式

“ 現(xiàn)有 Task1、Task2 等多個(gè)并行任務(wù),如何等待全部執(zhí)行完成后,執(zhí)行 Task3?!?br> 在 Kotlin 中我們有多種實(shí)現(xiàn)方式:

  1. Thread.join
  2. Synchronized
  3. ReentrantLock
  4. BlockingQueue
  5. CountDownLatch
  6. CyclicBarrier
  7. CAS
  8. Future
  9. CompletableFuture
  10. Rxjava
  11. Coroutine
  12. Flow

我們先定義三個(gè)Task,模擬上述場(chǎng)景, Task3 基于 Task1、Task2 返回的結(jié)果拼接字符串,每個(gè) Task 通過(guò) sleep 模擬耗時(shí):

val task1: () -> String = {
    sleep(2000)
    "Hello".also { println("task1 finished: $it") }
}

val task2: () -> String = {
    sleep(2000)
    "World".also { println("task2 finished: $it") }
}

val task3: (String, String) -> String = { p1, p2 ->
    sleep(2000)
    "$p1 $p2".also { println("task3 finished: $it") }
}

1. Thread.join()

Kotlin 兼容 Java,Java 的所有線程工具默認(rèn)都可以使用。其中最簡(jiǎn)單的線程同步方式就是使用 Thread 的 join() :

@Test
fun test_join() {
    lateinit var s1: String
    lateinit var s2: String

    val t1 = Thread { s1 = task1() }
    val t2 = Thread { s2 = task2() }
    t1.start()
    t2.start()

    t1.join()
    t2.join()
    
    task3(s1, s2)
}

2. Synchronized

使用 synchronized 鎖進(jìn)行同步

    @Test
    fun test_synchrnoized() {
        lateinit var s1: String

        Thread {
            synchronized(Unit) {
                s1 = task1()
            }
        }.start()
        val s2: String = task2()

        synchronized(Unit) {
            task3(s1, s2)
        }

    }

但是如果超過(guò)三個(gè)任務(wù),使用 synchrnoized 這種寫法就比較別扭了,為了同步多個(gè)并行任務(wù)的結(jié)果需要聲明n個(gè)鎖,并嵌套n個(gè) synchronized。

3. ReentrantLock

ReentrantLock 是 JUC 提供的線程鎖,可以替換 synchronized 的使用

    @Test
    fun test_ReentrantLock() {

        lateinit var s1: String

        val lock = ReentrantLock()
        Thread {
            lock.lock()
            s1 = task1()
            lock.unlock()
        }.start()
        val s2: String = task2()

        lock.lock()
        task3(s1, s2)
        lock.unlock()

    }

ReentrantLock 的好處是,當(dāng)有多個(gè)并行任務(wù)時(shí)是不會(huì)出現(xiàn)嵌套 synchrnoized 的問(wèn)題,但仍然需要?jiǎng)?chuàng)建多個(gè) lock 管理不同的任務(wù)。

4. BlockingQueue

阻塞隊(duì)列內(nèi)部也是通過(guò) Lock 實(shí)現(xiàn)的,所以也可以達(dá)到同步鎖的效果

    @Test
    fun test_blockingQueue() {

        lateinit var s1: String

        val queue = SynchronousQueue<Unit>()

        Thread {
            s1 = task1()
            queue.put(Unit)
        }.start()

        val s2: String = task2()

        queue.take()
        task3(s1, s2)
    }

當(dāng)然,阻塞隊(duì)列更多是使用在生產(chǎn)/消費(fèi)場(chǎng)景中的同步。

5. CountDownLatch

UC 中的鎖大都基于 AQS 實(shí)現(xiàn)的,可以分為獨(dú)享鎖和共享鎖。ReentrantLock 就是一種獨(dú)享鎖。相比之下,共享鎖更適合本場(chǎng)景。 例如 CountDownLatch,它可以讓一個(gè)線程一直處于阻塞狀態(tài),直到其他線程的執(zhí)行全部完成:

    @Test
    fun test_countdownlatch() {

        lateinit var s1: String
        lateinit var s2: String
        val cd = CountDownLatch(2)
        Thread() {
            s1 = task1()
            cd.countDown()
        }.start()

        Thread() {
            s2 = task2()
            cd.countDown()
        }.start()

        cd.await()
        task3(s1, s2)
    }

共享鎖的好處是不必為了每個(gè)任務(wù)都創(chuàng)建單獨(dú)的鎖,即使再多并行任務(wù)寫起來(lái)也很輕松。

6. CyclicBarrier

CyclicBarrier 是 JUC 提供的另一種共享鎖機(jī)制,它可以讓一組線程到達(dá)一個(gè)同步點(diǎn)后再一起繼續(xù)運(yùn)行,其中任意一個(gè)線程未達(dá)到同步點(diǎn),其他已到達(dá)的線程均會(huì)被阻塞。
與 CountDownLatch 的區(qū)別在于 CountDownLatch 是一次性的,而 CyclicBarrier 可以被重置后重復(fù)使用,這也正是 Cyclic 的命名由來(lái),可以循環(huán)使用。

    @Test
    fun test_CyclicBarrier() {

        lateinit var s1: String
        lateinit var s2: String
        val cb = CyclicBarrier(3)

        Thread {
            s1 = task1()
            cb.await()
        }.start()

        Thread() {
            s2 = task1()
            cb.await()
        }.start()

        cb.await()
        task3(s1, s2)

    }

7. CAS

AQS 內(nèi)部通過(guò)自旋鎖實(shí)現(xiàn)同步,自旋鎖的本質(zhì)是利用 CompareAndSwap 避免線程阻塞的開銷。 因此,我們可以使用基于 CAS 的原子類計(jì)數(shù),達(dá)到實(shí)現(xiàn)無(wú)鎖操作的目的。

    @Test
    fun test_cas() {

        lateinit var s1: String
        lateinit var s2: String

        val cas = AtomicInteger(2)

        Thread {
            s1 = task1()
            cas.getAndDecrement()
        }.start()

        Thread {
            s2 = task2()
            cas.getAndDecrement()
        }.start()

        while (cas.get() != 0) {}

        task3(s1, s2)

    }

While 循環(huán)空轉(zhuǎn)看起來(lái)有些浪費(fèi)資源,但是自旋鎖的本質(zhì)就是這樣,所以 CAS 僅僅適用于一些cpu密集型的短任務(wù)同步。

8. Future

上面無(wú)論有鎖操作還是無(wú)鎖操作,都需要定義兩個(gè)變量s1、s2記錄結(jié)果非常不方便。 Java 1.5 開始,提供了 Callable 和 Future ,可以在任務(wù)執(zhí)行結(jié)束時(shí)返回結(jié)果。

    @Test
    fun test_future() {

        val future1 = FutureTask(Callable(task1))
        val future2 = FutureTask(Callable(task2))

        Executors.newCachedThreadPool().execute(future1)
        Executors.newCachedThreadPool().execute(future2)

        task3(future1.get(), future2.get())

    }

通過(guò) future.get(),可以同步等待結(jié)果返回,寫起來(lái)非常方便。

9. CompletableFuture

future.get() 雖然方便,但是會(huì)阻塞線程。 Java 8 中引入了 CompletableFuture ,他實(shí)現(xiàn)了 Future 接口的同時(shí)實(shí)現(xiàn)了 CompletionStage 接口。 CompletableFuture 可以針對(duì)多個(gè) CompletionStage 進(jìn)行邏輯組合、實(shí)現(xiàn)復(fù)雜的異步編程。 這些邏輯組合的方法以回調(diào)的形式避免了線程阻塞:

    @Test
    fun test_CompletableFuture() {
        CompletableFuture.supplyAsync(task1)
            .thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
                task3(p1, p2)
            }.join()
    }

10. RxJava

RxJava 提供的各種操作符以及線程切換能力同樣可以幫助我們實(shí)現(xiàn)需求: zip 操作符可以組合兩個(gè) Observable 的結(jié)果;subscribeOn 用來(lái)啟動(dòng)異步任務(wù)

    @Test
    fun test_Rxjava() {

        Observable.zip(
            Observable.fromCallable(Callable(task1))
                .subscribeOn(Schedulers.newThread()),
            Observable.fromCallable(Callable(task2))
                .subscribeOn(Schedulers.newThread()),
            BiFunction(task3)
        ).test().await()

    }

11. Coroutine

前面那么多方式,其實(shí)都是 Java 的工具。 Coroutine 終于算得上是 Kotlin 特有的工具了:

    @Test
    fun test_coroutine() {

        runBlocking {
            val c1 = async(Dispatchers.IO) {
                task1()
            }

            val c2 = async(Dispatchers.IO) {
                task2()
            }

            task3(c1.await(), c2.await())
        }
    }

12. Flow

Flow 就是 Coroutine 版的 RxJava,具備很多 RxJava 的操作符,例如 zip:

    @Test
    fun test_flow() {

        val flow1 = flow<String> { emit(task1()) }
        val flow2 = flow<String> { emit(task2()) }

        runBlocking {
            flow1.zip(flow2) { t1, t2 ->
                task3(t1, t2)
            }.flowOn(Dispatchers.IO).collect()

        }

    }

FlowOn 使得 Task 在異步計(jì)算并發(fā)射結(jié)果。

總結(jié)

作為結(jié)論,在 Kotlin 上最好用的線程同步方案首推協(xié)程。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容