Synchronized.kt 的源碼:
/**
* Executes the given function [block] while holding the monitor of the given object [lock].
*/
@kotlin.internal.InlineOnly
public inline fun <R> synchronized(lock: Any, block: () -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")
monitorEnter(lock)
try {
return block()
}
finally {
@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")
monitorExit(lock)
}
}
JvmFlagAnnotations.kt 的源碼:
/**
* Marks the JVM backing field of the annotated property as `volatile`, meaning that writes to this field
* are immediately made visible to other threads.
*/
@Target(FIELD)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Volatile
/**
* Marks the JVM backing field of the annotated property as `transient`, meaning that it is not
* part of the default serialized form of the object.
*/
@Target(FIELD)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Transient
/**
* Marks the JVM method generated from the annotated function as `strictfp`, meaning that the precision
* of floating point operations performed inside the method needs to be restricted in order to
* achieve better portability.
*/
@Target(FUNCTION, CONSTRUCTOR, PROPERTY_GETTER, PROPERTY_SETTER, CLASS)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Strictfp
/**
* Marks the JVM method generated from the annotated function as `synchronized`, meaning that the method
* will be protected from concurrent execution by multiple threads by the monitor of the instance (or,
* for static methods, the class) on which the method is defined.
*/
@Target(FUNCTION, PROPERTY_GETTER, PROPERTY_SETTER)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Synchronized
如何使用 Synchronized 同步鎖:
在Java中,給一個(gè)方法加鎖 ,需要給方法加 synchronized 關(guān)鍵字
public synchronized void doSomething() {
}
kotlin 中沒有 synchronized 關(guān)鍵之,取而代之的是 @Synchronized 注解
class MyUtil {
@Synchronized
fun doSomething() {
}
}
Kotlin 在方法內(nèi),可以使用 block 塊
class Util {
val lock = Any()
fun main() {
synchronized(lock) {
}
}
}
Volatile 關(guān)鍵字
有時(shí)僅僅為了讀寫一個(gè)或者兩個(gè)實(shí)例域就使用同步的話,顯得開銷過(guò)大;而Volatile關(guān)鍵字為實(shí)例域的同步訪問(wèn)提供了免鎖的機(jī)制。
當(dāng)一個(gè)共享變量被vlatile修飾時(shí),其就具備兩個(gè)含義:
1、一個(gè)線程修改了變量的值,變量的新值對(duì)其他線程是立即可見的。
2、禁止使用指令重排序。
什么是指令重排序呢?
重排序通常是編譯器或者運(yùn)行環(huán)境為了優(yōu)化程序性能而采取的對(duì)指令重新排序執(zhí)行的一種手段。重排序分為兩類:編譯期重排序和運(yùn)行期重排序,分別對(duì)應(yīng)著編譯時(shí)和運(yùn)行時(shí)環(huán)境。
在 kotlin 中沒有 volatile 關(guān)鍵字,但是有 @Volatile 注解
class Util {
@Volatile
var lock = Any()
}
在 kotlin 中的 Any 和 java 中的 Object 相似,每一個(gè)類都是從 Any 繼承過(guò)來(lái)的,但是 Any 并沒有聲明 wait() , notify() 和 notifyAll() 方法,這就意味著,你不能在kotlin類中調(diào)用這些方法。但是你仍然能夠使用java.lang.Object的實(shí)例作為lock,并且調(diào)用相關(guān)的方法。下面將會(huì)展示一個(gè)使用 Object 做為 lock 解決生產(chǎn)者和消費(fèi)者的問(wèn)題。
private val lock = Object()
fun produce() = synchronized(lock) {
while(items>=maxItems) {
lock.wait()
}
Thread.sleep(Random.nextInt(100).toLong())
items++
println("Produced, count is$items:${Thread.currentThread()}")
lock.notifyAll()
}
fun consume() = synchronized(lock) {
while(items<=0) {
lock.wait()
}
Thread.sleep(Random.nextInt(100).toLong())
items--
println("Consumed, count is$items:${Thread.currentThread()}")
lock.notifyAll()
}
- volatile變量具有可見性
- volatile不保證原子性
- volatile保證有序性
volatile能禁止指令的重排序,因此volatile保證有序性。
volatile禁止指令重排序的含義:當(dāng)程序執(zhí)行到volatile變量的時(shí)候,在其前面的語(yǔ)句已經(jīng)全部執(zhí)行完成,并且結(jié)果對(duì)后面可見,而且該變量后面的語(yǔ)句都沒有執(zhí)行。
正確使用volatile
synchronized可以防止多個(gè)線程同時(shí)執(zhí)行一段代碼,這會(huì)阻塞一部分線程的執(zhí)行,這樣就會(huì)影響程序的執(zhí)行效率。
而volatile在某些情況下的性能是優(yōu)于synchronized的。
但是volatile無(wú)法替代synchronized,因?yàn)関olatile無(wú)法保證操作的原子性。
通常情況下,我們使用volatile關(guān)鍵字要避開兩種場(chǎng)景:
1、對(duì)變量的寫操作依賴當(dāng)前值。
2、該變量包含在具有其他變量的不變式中。
使用volatile的場(chǎng)景有很多,這里介紹兩種常見的場(chǎng)景:
場(chǎng)景1:狀態(tài)標(biāo)志
當(dāng)多線程執(zhí)行該類的時(shí)候,我們需要對(duì)狀態(tài)標(biāo)志stop保持可見性,這樣我們的運(yùn)行才能實(shí)時(shí)保持正確的執(zhí)行。這種情況如果使用sychronized的話顯然要復(fù)雜的多。
class TestVolatile1 {
@Volatile
var stop = false;
fun onInit(){
if(stop){
}
}
fun onStop(){
stop = true;
}
}
場(chǎng)景2: 雙重檢查模式(DCL)
在 Java 中,我們的單例模式經(jīng)常會(huì)這樣寫,第一次判空是為了不必要的同步操作,第二次判斷是只有在MyLock實(shí)例==null的時(shí)候才會(huì)去new一個(gè)實(shí)例出來(lái),當(dāng)多線程調(diào)用時(shí),當(dāng)進(jìn)行這兩次判空時(shí),我們需要保證instance的可見性。
public class MyLock {
private static volatile MyLock instance = null;
public static MyLock getInstance() {
if(instance == null){
synchronized (MyLock.class){
if(instance == null){
instance = new MyLock();
}
}
}
return instance;
}
}
在 Kotlin 中實(shí)現(xiàn)多線程同步的方式
“ 現(xiàn)有 Task1、Task2 等多個(gè)并行任務(wù),如何等待全部執(zhí)行完成后,執(zhí)行 Task3?!?br> 在 Kotlin 中我們有多種實(shí)現(xiàn)方式:
- Thread.join
- Synchronized
- ReentrantLock
- BlockingQueue
- CountDownLatch
- CyclicBarrier
- CAS
- Future
- CompletableFuture
- Rxjava
- Coroutine
- Flow
我們先定義三個(gè)Task,模擬上述場(chǎng)景, Task3 基于 Task1、Task2 返回的結(jié)果拼接字符串,每個(gè) Task 通過(guò) sleep 模擬耗時(shí):
val task1: () -> String = {
sleep(2000)
"Hello".also { println("task1 finished: $it") }
}
val task2: () -> String = {
sleep(2000)
"World".also { println("task2 finished: $it") }
}
val task3: (String, String) -> String = { p1, p2 ->
sleep(2000)
"$p1 $p2".also { println("task3 finished: $it") }
}
1. Thread.join()
Kotlin 兼容 Java,Java 的所有線程工具默認(rèn)都可以使用。其中最簡(jiǎn)單的線程同步方式就是使用 Thread 的 join() :
@Test
fun test_join() {
lateinit var s1: String
lateinit var s2: String
val t1 = Thread { s1 = task1() }
val t2 = Thread { s2 = task2() }
t1.start()
t2.start()
t1.join()
t2.join()
task3(s1, s2)
}
2. Synchronized
使用 synchronized 鎖進(jìn)行同步
@Test
fun test_synchrnoized() {
lateinit var s1: String
Thread {
synchronized(Unit) {
s1 = task1()
}
}.start()
val s2: String = task2()
synchronized(Unit) {
task3(s1, s2)
}
}
但是如果超過(guò)三個(gè)任務(wù),使用 synchrnoized 這種寫法就比較別扭了,為了同步多個(gè)并行任務(wù)的結(jié)果需要聲明n個(gè)鎖,并嵌套n個(gè) synchronized。
3. ReentrantLock
ReentrantLock 是 JUC 提供的線程鎖,可以替換 synchronized 的使用
@Test
fun test_ReentrantLock() {
lateinit var s1: String
val lock = ReentrantLock()
Thread {
lock.lock()
s1 = task1()
lock.unlock()
}.start()
val s2: String = task2()
lock.lock()
task3(s1, s2)
lock.unlock()
}
ReentrantLock 的好處是,當(dāng)有多個(gè)并行任務(wù)時(shí)是不會(huì)出現(xiàn)嵌套 synchrnoized 的問(wèn)題,但仍然需要?jiǎng)?chuàng)建多個(gè) lock 管理不同的任務(wù)。
4. BlockingQueue
阻塞隊(duì)列內(nèi)部也是通過(guò) Lock 實(shí)現(xiàn)的,所以也可以達(dá)到同步鎖的效果
@Test
fun test_blockingQueue() {
lateinit var s1: String
val queue = SynchronousQueue<Unit>()
Thread {
s1 = task1()
queue.put(Unit)
}.start()
val s2: String = task2()
queue.take()
task3(s1, s2)
}
當(dāng)然,阻塞隊(duì)列更多是使用在生產(chǎn)/消費(fèi)場(chǎng)景中的同步。
5. CountDownLatch
UC 中的鎖大都基于 AQS 實(shí)現(xiàn)的,可以分為獨(dú)享鎖和共享鎖。ReentrantLock 就是一種獨(dú)享鎖。相比之下,共享鎖更適合本場(chǎng)景。 例如 CountDownLatch,它可以讓一個(gè)線程一直處于阻塞狀態(tài),直到其他線程的執(zhí)行全部完成:
@Test
fun test_countdownlatch() {
lateinit var s1: String
lateinit var s2: String
val cd = CountDownLatch(2)
Thread() {
s1 = task1()
cd.countDown()
}.start()
Thread() {
s2 = task2()
cd.countDown()
}.start()
cd.await()
task3(s1, s2)
}
共享鎖的好處是不必為了每個(gè)任務(wù)都創(chuàng)建單獨(dú)的鎖,即使再多并行任務(wù)寫起來(lái)也很輕松。
6. CyclicBarrier
CyclicBarrier 是 JUC 提供的另一種共享鎖機(jī)制,它可以讓一組線程到達(dá)一個(gè)同步點(diǎn)后再一起繼續(xù)運(yùn)行,其中任意一個(gè)線程未達(dá)到同步點(diǎn),其他已到達(dá)的線程均會(huì)被阻塞。
與 CountDownLatch 的區(qū)別在于 CountDownLatch 是一次性的,而 CyclicBarrier 可以被重置后重復(fù)使用,這也正是 Cyclic 的命名由來(lái),可以循環(huán)使用。
@Test
fun test_CyclicBarrier() {
lateinit var s1: String
lateinit var s2: String
val cb = CyclicBarrier(3)
Thread {
s1 = task1()
cb.await()
}.start()
Thread() {
s2 = task1()
cb.await()
}.start()
cb.await()
task3(s1, s2)
}
7. CAS
AQS 內(nèi)部通過(guò)自旋鎖實(shí)現(xiàn)同步,自旋鎖的本質(zhì)是利用 CompareAndSwap 避免線程阻塞的開銷。 因此,我們可以使用基于 CAS 的原子類計(jì)數(shù),達(dá)到實(shí)現(xiàn)無(wú)鎖操作的目的。
@Test
fun test_cas() {
lateinit var s1: String
lateinit var s2: String
val cas = AtomicInteger(2)
Thread {
s1 = task1()
cas.getAndDecrement()
}.start()
Thread {
s2 = task2()
cas.getAndDecrement()
}.start()
while (cas.get() != 0) {}
task3(s1, s2)
}
While 循環(huán)空轉(zhuǎn)看起來(lái)有些浪費(fèi)資源,但是自旋鎖的本質(zhì)就是這樣,所以 CAS 僅僅適用于一些cpu密集型的短任務(wù)同步。
8. Future
上面無(wú)論有鎖操作還是無(wú)鎖操作,都需要定義兩個(gè)變量s1、s2記錄結(jié)果非常不方便。 Java 1.5 開始,提供了 Callable 和 Future ,可以在任務(wù)執(zhí)行結(jié)束時(shí)返回結(jié)果。
@Test
fun test_future() {
val future1 = FutureTask(Callable(task1))
val future2 = FutureTask(Callable(task2))
Executors.newCachedThreadPool().execute(future1)
Executors.newCachedThreadPool().execute(future2)
task3(future1.get(), future2.get())
}
通過(guò) future.get(),可以同步等待結(jié)果返回,寫起來(lái)非常方便。
9. CompletableFuture
future.get() 雖然方便,但是會(huì)阻塞線程。 Java 8 中引入了 CompletableFuture ,他實(shí)現(xiàn)了 Future 接口的同時(shí)實(shí)現(xiàn)了 CompletionStage 接口。 CompletableFuture 可以針對(duì)多個(gè) CompletionStage 進(jìn)行邏輯組合、實(shí)現(xiàn)復(fù)雜的異步編程。 這些邏輯組合的方法以回調(diào)的形式避免了線程阻塞:
@Test
fun test_CompletableFuture() {
CompletableFuture.supplyAsync(task1)
.thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
task3(p1, p2)
}.join()
}
10. RxJava
RxJava 提供的各種操作符以及線程切換能力同樣可以幫助我們實(shí)現(xiàn)需求: zip 操作符可以組合兩個(gè) Observable 的結(jié)果;subscribeOn 用來(lái)啟動(dòng)異步任務(wù)
@Test
fun test_Rxjava() {
Observable.zip(
Observable.fromCallable(Callable(task1))
.subscribeOn(Schedulers.newThread()),
Observable.fromCallable(Callable(task2))
.subscribeOn(Schedulers.newThread()),
BiFunction(task3)
).test().await()
}
11. Coroutine
前面那么多方式,其實(shí)都是 Java 的工具。 Coroutine 終于算得上是 Kotlin 特有的工具了:
@Test
fun test_coroutine() {
runBlocking {
val c1 = async(Dispatchers.IO) {
task1()
}
val c2 = async(Dispatchers.IO) {
task2()
}
task3(c1.await(), c2.await())
}
}
12. Flow
Flow 就是 Coroutine 版的 RxJava,具備很多 RxJava 的操作符,例如 zip:
@Test
fun test_flow() {
val flow1 = flow<String> { emit(task1()) }
val flow2 = flow<String> { emit(task2()) }
runBlocking {
flow1.zip(flow2) { t1, t2 ->
task3(t1, t2)
}.flowOn(Dispatchers.IO).collect()
}
}
FlowOn 使得 Task 在異步計(jì)算并發(fā)射結(jié)果。
總結(jié)
作為結(jié)論,在 Kotlin 上最好用的線程同步方案首推協(xié)程。