Java并發(fā)編程實戰(zhàn)基礎(chǔ)構(gòu)建模塊

同步容器類

同步容器類包括Vector和Hashtable以及由Collections.synchronizedXxx等工廠方法創(chuàng)建的同步封裝器類。這些類實現(xiàn)線程安全的方式是:將它們的狀態(tài)封裝起來,并對每個公有方法都進行同步,使得每次只有一個線程能訪問容器的狀態(tài)。同步容器對所有容器狀態(tài)的訪問都串行化,嚴重降低了并發(fā)性;當(dāng)多個線程競爭鎖時,吞吐量嚴重下降。

同步容器類存在的問題

同步容器類都是線程安全的,但是在某些情況下可能需要額外的客戶端加鎖來保護復(fù)合操作。

比如,在Vecotr中,getLast()和deleteLast()操作,如果是在多線程的環(huán)境下運行,如果不加鎖,會產(chǎn)生異常情況。一個線程在getLast()后,另一個線程deleteLast(),然后該線程繼續(xù)執(zhí)行,進行deleteLast()操作,此時會拋出下標越界的異常。

又比如,在迭代的過程中,使用get(index)的操作,如果有多個線程運行,可能會刪除其中元素,同樣會造成異常。

對于如上的情況,我們需要通過客戶端加鎖來解決線程安全的問題。如在迭代時加鎖:

synchronized(vector){for(inti=0;i

迭代器

在迭代或者for-each循環(huán)語法時,對容器類進行迭代的標準方式都是使用Iterator。然而,在設(shè)計同步容器類的迭代器時并沒有考慮到并發(fā)修改的問題,并且它們表現(xiàn)出的行為時“及時失敗”的,也就是當(dāng)它們發(fā)現(xiàn)容器在迭代過程中被修改時,就會拋出ConcurrentModificationException。

如果在迭代期間,對容器加鎖,首先會降低效率,提高線程的等待時間;然后還可能會產(chǎn)生死鎖;降低了吞吐量和CPU的利用率。

如果不希望在迭代期間加鎖,可以使用克隆容器的方法,并在克隆副本上進行迭代。

加鎖可以防止迭代器拋出ConcurrentModificationException,但是要在所有對容器進行迭代的地方都要加鎖。如hashCode,equals,containsAll,removeAll,retainAll等方法,在以容器為參數(shù)時,都會對容器進行迭代。這些間接的迭代操作可能拋出ConcurrentModificationException。

并發(fā)容器

Java 5.0提供了多種并發(fā)容器類來改進同步容器的性能。同步容器對所有容器狀態(tài)的訪問都串行化,嚴重降低了并發(fā)性;當(dāng)多個線程競爭鎖時,吞吐量嚴重下降。

并發(fā)容器是針對多個線程并發(fā)訪問設(shè)計的。通過并發(fā)容器來替代同步容器,可以極大地提高伸縮性并降低風(fēng)險。并發(fā)容器包括ConcurrentHashMap(替代Map),CopyOnWriteArrayList(替代List),ConcurrentLinkedQueue,BlockingQueue等等。

ConcurrentHashMap

同步容器類在執(zhí)行每個操作期間都持有一個鎖。ConcurrentHashMap采用了不同的加鎖策略來提供更高的并發(fā)性和伸縮性。它并不是將每個方法都在同一個鎖上同步,而是使用一種粒度更細的加鎖機制來實現(xiàn)更大程度的共享,這種機制稱為分段鎖。

分段鎖機制使得任意數(shù)量的讀取線程可以并發(fā)訪問Map,執(zhí)行讀取操作的線程和執(zhí)行寫入操作的線程可以并發(fā)訪問Map,并且一定數(shù)量的寫入線程可以并發(fā)地修改Map,因此提高了并發(fā)訪問的吞吐量。

并發(fā)容器增強了同步容器類,它們提供的迭代器不會拋出ConcurrentModificationException,因此不需要在迭代過程中對容器加鎖。其迭代器具有弱一致性,可以容忍并發(fā)的修改,在創(chuàng)建迭代器時會遍歷已有元素,并可以(但是不保證)在迭代器被構(gòu)造后將修改操作反映給容器。size(),isEmpty()等方法返回的是一個近似值。

由于ConcurrentHashMap與Hashtable和synchronizedMap有更多的優(yōu)勢,因此大多數(shù)情況應(yīng)該使用并發(fā)容器類,至于當(dāng)需要對整個容器加鎖進行獨占訪問時,才應(yīng)該放棄使用并發(fā)容器。

注意,此時不能再通過客戶端加鎖新建新的原子操作了,客戶端只能對并發(fā)容器自身加鎖,但并發(fā)容器內(nèi)部使用的并不是自身鎖。

CopyOnWriteArrayList

寫入時復(fù)制容器,在每次修改時都會加鎖并創(chuàng)建和重新發(fā)布一個新的容器副本,直接修改容器引用,從而實現(xiàn)可見性。

寫操作在一個復(fù)制的數(shù)組上進行,讀操作還是在原始數(shù)組中進行,讀寫分離,互不影響。寫操作需要加鎖,防止并發(fā)寫入時導(dǎo)致寫入數(shù)據(jù)丟失。寫操作結(jié)束之后需要把原始數(shù)組指向新的復(fù)制數(shù)組。

CopyOnWriteArrayList 在寫操作的同時允許讀操作,大大提高了讀操作的性能,因此很適合讀多寫少的應(yīng)用場景。

但是 CopyOnWriteArrayList 有其缺陷:

內(nèi)存占用:在寫操作時需要復(fù)制一個新的數(shù)組,使得內(nèi)存占用為原來的兩倍左右;

數(shù)據(jù)不一致:讀操作不能讀取實時性的數(shù)據(jù),因為部分寫操作的數(shù)據(jù)還未同步到讀數(shù)組中。

阻塞隊列

阻塞隊列支持生產(chǎn)者-消費者模式。簡化了開發(fā)過程,消除了生產(chǎn)者和消費者之間的代碼依賴性。阻塞隊列簡化了生產(chǎn)者-消費者設(shè)計的實現(xiàn)過程。一種常見的生產(chǎn)者-消費者設(shè)計模式就是線程池與工作隊列的組合。

阻塞隊列提供了四種處理方法:

拋出異常,使用add(e)插入,remove()刪除,element()查詢。當(dāng)阻塞隊列滿時,插入元素;當(dāng)隊列空,刪除元素都會拋出異常。

返回特殊值,使用offer(e)插入,poll()刪除,peek()查詢。插入時,如果成功返回true,移除時,如果沒有對應(yīng)的元素返回null。

阻塞,使用put(e)插入,take()刪除。隊列滿,插入元素時會阻塞;隊列空,取元素會阻塞。

超時退出:使用offer(e,time,unit)插入,poll(time,unit)刪除。當(dāng)隊列滿時,會阻塞,超過一定的時間,線程會退出。

阻塞隊列有多種實現(xiàn)。

ArrayBlokcingQueue和LinkedBlockingQueue分別是數(shù)組和鏈表結(jié)構(gòu)組成的有界的FIFO阻塞隊列。

PriorityBlockingQueue是一個支持優(yōu)先級排序的無界阻塞隊列。

SynchronousQueue是一個不存儲元素的阻塞隊列,它不會為隊列中元素維護存儲空間。

LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列。

LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。

雙端隊列與工作密取

Java 6提供了Dqueue和BlockingDeque,是雙端隊列,實現(xiàn)了在隊列頭和隊列尾的高效插入和移除。雙端隊列適用于工作密取模式。在工作密取中,每個消費者都有各自的雙端隊列。如果一個消費者完成了自己的雙端隊列的全部工作,可以從其他消費者雙端隊列末尾秘密的獲取工作。因為工作者線程不會再單個共享的任務(wù)隊列上發(fā)生競爭。適用于既是生產(chǎn)者又是消費者問題。

阻塞方法與中斷方法

線程會阻塞或暫停執(zhí)行。被阻塞的線程必須等待某個不受它控制的事件發(fā)生后才能繼續(xù)執(zhí)行。當(dāng)在代碼中調(diào)用一個可以拋出InterruptedException的方法時,自己的方法就編程了阻塞方法,必須處理中斷的響應(yīng)。如果這個方法被中斷,那么它將努力提前結(jié)束狀態(tài)。

處理中斷的響應(yīng)有兩種基本選擇:

傳遞InterruptedException,把該異常拋出給方法的調(diào)用者。

恢復(fù)中斷,捕獲異常,并調(diào)用當(dāng)前線程的interrupt方法恢復(fù)中斷,引發(fā)更高層的代碼中斷。

publicvoidrun(){try{? ? ? ? something();? ? }catch(InterruptedException e){? ? ? ? Thread.currentThread().interrupt();? ? }}

同步工具類

同步工具類可以是任何一個對象,只要它根據(jù)其自身的狀態(tài)來協(xié)調(diào)線程的控制流。包括阻塞隊列,信號量,柵欄以及閉鎖。

閉鎖

閉鎖用來確保某些活動直到其他活動都完成了才繼續(xù)執(zhí)行。如果有多個線程,其中一個線程需要等到其他所有線程活動結(jié)束后才繼續(xù)執(zhí)行,使用閉鎖。

CountDownLatch是一種閉鎖的實現(xiàn),可以使得一個或者多個線程等待一組事情發(fā)生。包括一個計數(shù)器,表示需要等待的事件數(shù)量;countDown方法用來遞減計數(shù)器,表示有一個事件已經(jīng)發(fā)生了;await方法等待計數(shù)器為0,表示所有需要等待的事情已經(jīng)發(fā)生。

// 初始化閉鎖,并設(shè)置資源個數(shù)CountDownLatch latch =newCountDownLatch(2);Thread t1 =newThread(newRunnable(){publicvoidrun(){// 加載資源1加載資源的代碼……// 本資源加載完后,閉鎖-1latch.countDown();? ? }} ).start();Thread t2 =newThread(newRunnable(){publicvoidrun(){// 加載資源2資源加載代碼……// 本資源加載完后,閉鎖-1latch.countDown();? ? }} ).start();Thread t3 =newThread(newRunnable(){publicvoidrun(){// 本線程必須等待所有資源加載完后才能執(zhí)行l(wèi)atch.await();// 當(dāng)閉鎖數(shù)量為0時,await返回,執(zhí)行接下來的任務(wù)任務(wù)代碼……? ? }} ).start();復(fù)制代碼

柵欄(同步屏障)

閉鎖是一次性對象,一旦進入終止狀態(tài),就不能被重置。柵欄類似于閉鎖,能阻塞一組進程直到某個時間發(fā)生。柵欄與閉鎖的區(qū)別在于,所有線程必須同時到達柵欄位置,才能繼續(xù)執(zhí)行。

若有多條線程,他們到達屏障時將會被阻塞,只有當(dāng)所有線程都到達屏障時才能打開屏障,所有線程同時執(zhí)行,若有這樣的需求可以使用同步屏障。此外,當(dāng)屏障打開的同時還能指定執(zhí)行的任務(wù)。

閉鎖只會阻塞一條線程,目的是為了讓該條任務(wù)線程滿足條件后執(zhí)行;

而同步屏障會阻塞所有線程,目的是為了讓所有線程同時執(zhí)行(實際上并不會同時執(zhí)行,而是盡量把線程啟動的時間間隔降為最少)。

// 創(chuàng)建同步屏障對象,并制定需要等待的線程個數(shù) 和 打開屏障時需要執(zhí)行的任務(wù)CyclicBarrier barrier =newCyclicBarrier(3,newRunnable(){publicvoidrun(){//當(dāng)所有線程準備完畢后觸發(fā)此任務(wù)}});// 啟動三條線程for(inti=0; i<3; i++ ){newThread(newRunnable(){publicvoidrun(){// 等待,(每執(zhí)行一次barrier.await,同步屏障數(shù)量-1,直到為0時,打開屏障)barrier.await();// 任務(wù)任務(wù)代碼……? ? ? ? }? ? } ).start();}復(fù)制代碼

信號量

信號量用于控制同時訪問某個特定資源的操作數(shù)量,或者執(zhí)行某個指定操作的數(shù)量。計數(shù)信號量還可以用來實現(xiàn)某種資源池,或者對容器施加邊界。

信號量可以用于實現(xiàn)資源池,也可以用于將容器變?yōu)橛薪缱枞萜?。信號量管理著一組虛擬的許可,在執(zhí)行操作時首先獲取許可,并在使用以后釋放許可。如果沒有許可,將阻塞直到有許可或被中斷,超時。

信號量的使用場景是,有m個資源,n個線程,且n>m,同一時刻只能允許m條線程訪問資源。

// 創(chuàng)建信號量對象,并給予3個資源Semaphore semaphore =newSemaphore(3);// 開啟10條線程for(inti=0; i<10; i++ ) {newThread(newRunnbale(){publicvoidrun(){// 獲取資源,若此時資源被用光,則阻塞,直到有線程歸還資源semaphore.acquire();// 任務(wù)代碼……// 釋放資源semaphore.release();? ? ? ? }? ? } ).start();}

FutureTask

可以用作閉鎖,是一種可以生成結(jié)果的Runnable,可以處于以下三種狀態(tài):等待運行,正在運行和運行完成。當(dāng)FutureTask進入完成狀態(tài)后,它會停止在這個狀態(tài)上。

FutureTask在Executor框架中表示異步任務(wù),此外還可以用來表示一些時間較長的運算,這些計算可以在使用計算結(jié)構(gòu)之前啟動。

實戰(zhàn):構(gòu)建緩存

首先,使用HashMap和同步機制來初始化緩存。

publicinterfaceComputable{Vcompute(A arg)throwsInterruptedException;}publicclassExpensiveFuncimplementsComputable{@OverridepublicBigIntegercompute(String arg)throwsInterruptedException{returnnewBigInteger(arg);? ? }}publicclassMemoizer1implementsComputable{privatefinalMap cache=newHashMap<>();privatefinalComputable c;publicMemoizer1(Computable<A,V> c){this.c=c;? ? }@OverridepublicsynchronizedVcompute(A arg)throwsInterruptedException{? ? ? ? V result=cache.get(arg);if(result==null){? ? ? ? ? ? result=c.compute(arg);? ? ? ? ? ? cache.put(arg,result);? ? ? ? }returnresult;? ? }}

在這種實現(xiàn)方法中,使用HashMap保存之前計算的結(jié)果。首先檢查需要的結(jié)果是否已經(jīng)在緩存中,如果存在則返回之前計算,否則將計算結(jié)果緩存到HashMap再返回。

為了確保線程安全,將整個compute方法進行同步。但是這樣伸縮性差,緩存的性能并沒有得到提升。

下面使用ConcurrentHashMap替換HashMap。但是,這種方法存在一些不足,當(dāng)兩個線程同時調(diào)用compute時,可能會導(dǎo)致計算得到相同的值。這樣是低效的,因為緩存的作用就是避免相同的數(shù)據(jù)被計算多次。其問題在于,如果某個線程啟動了一個計算,而其他線程并不知道這個計算正在進行,很可能會重復(fù)這個計算。

針對如上問題,我們考慮可以使用FutureTask來解決。使用該類來表示計算的過程,如果有結(jié)果可用,則返回結(jié)果,否則一直阻塞。

publicclassMemo2implementsComputable{privatefinalMap> cache=newConcurrentHashMap<>();privatefinalComputablec;publicMemo2(Computable<A,V>c){this.c=c;? ? }@OverridepublicVcompute(A arg)throwsInterruptedException{? ? ? ? Future future=cache.get(arg);if(future==null){? ? ? ? ? ? Callable eval=newCallable() {@OverridepublicVcall()throwsException{returnc.compute(arg);? ? ? ? ? ? ? ? }? ? ? ? ? ? };? ? ? ? ? ? FutureTask ft=newFutureTask<>(eval);? ? ? ? ? ? future=cache.putIfAbsent(arg,ft);if(future==null){? ? ? ? ? ? ? ? future=ft;? ? ? ? ? ? ? ? ft.run();? ? ? ? ? ? }? ? ? ? }try{returnfuture.get();? ? ? ? }catch(ExecutionException e){? ? ? ? ? ? e.printStackTrace();? ? ? ? }returnnull;? ? }}

在此我向大家推薦一個架構(gòu)學(xué)習(xí)交流群。交流學(xué)習(xí)群號:938837867 暗號:555 里面會分享一些資深架構(gòu)師錄制的視頻錄像:有Spring,MyBatis,Netty源碼分析,高并發(fā)、高性能、分布式、微服務(wù)架構(gòu)的原理,JVM性能優(yōu)化、分布式架構(gòu)等這些成為架構(gòu)師必備

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容