我工作三年了,該懂并發(fā)了(干貨)

本文的組織形式如下,主要會(huì)介紹到同步容器類,操作系統(tǒng)的并發(fā)工具,Java 開(kāi)發(fā)工具包(只是簡(jiǎn)單介紹一下,后面會(huì)有源碼分析)。同步工具類有哪些。

下面我們就來(lái)介紹一下 Java 并發(fā)中都涉及哪些模塊,這些并發(fā)模塊都是 Java 并發(fā)類庫(kù)所提供的。

同步容器類

同步容器主要包括兩類,一種是本來(lái)就是線程安全實(shí)現(xiàn)的容器,這類容器有?Vector、Hashtable、Stack,這類容器的方法上都加了?synchronized?鎖,是線程安全的實(shí)現(xiàn)。

Vector、Hashtable、Stack 這些容器我們現(xiàn)在幾乎都不在使用,因?yàn)檫@些容器在多線程環(huán)境下的效率不高。

還有一類是由?Collections.synchronizedxxx?實(shí)現(xiàn)的非線程安全的容器,使用 Collections.synchronized 會(huì)把它們封裝起來(lái)編程線程安全的容器,舉出兩個(gè)例子

Collections.synchronizedList

Collections.synchronizedMap

我們可以通過(guò) Collections 源碼可以看出這些線程安全的實(shí)現(xiàn)

要不為啥要稱 Collections 為集合工具類呢?Collections 會(huì)把這些容器類的狀態(tài)封裝起來(lái),并對(duì)每個(gè)同步方法進(jìn)行同步,使得每次只有一個(gè)線程能夠訪問(wèn)容器的狀態(tài)。

其中每個(gè)?synchronized xxx都是相當(dāng)于創(chuàng)建了一個(gè)靜態(tài)內(nèi)部類。

雖然同步容器類都是線程安全的,但是在某些情況下需要額外的客戶端加鎖來(lái)保證一些復(fù)合操作的安全性,復(fù)合操作就是有兩個(gè)及以上的方法組成的操作,比如最典型的就是?若沒(méi)有則添加,用偽代碼表示則是

if(a?==null){

a?=?get();

}

比如可以用來(lái)判斷 Map 中是否有某個(gè) key,如果沒(méi)有則添加進(jìn) Map 中。這些復(fù)合操作在沒(méi)有客戶端加鎖的情況下是線程安全的,但是當(dāng)多個(gè)線程并發(fā)修改容器時(shí),可能會(huì)表現(xiàn)出意料之外的行為。例如下面這段代碼

publicclassTestVectorimplementsRunnable{

staticVector?vector?=newVector();

staticvoidaddVector(){

for(inti?=0;i?<10000;i++){

vector.add(i);

}

}

staticObjectgetVector(){

intindex?=?vector.size()?-1;

returnvector.get(index);

}

staticvoidremoveVector(){

intindex?=?vector.size()?-1;

vector.remove(index);

}

@Override

publicvoidrun(){

getVector();

}

publicstaticvoidmain(String[]?args){

TestVector?testVector?=newTestVector();

testVector.addVector();

Thread?t1?=newThread(()?->?{

for(inti?=0;i?<?vector.size();i++){

getVector();

}

});

Thread?t2?=newThread(()?->?{

for(inti?=0;i?<?vector.size();i++){

removeVector();

}

});

t1.start();

t2.start();

}

}

這些方法看似沒(méi)有問(wèn)題,因?yàn)?Vector 能夠保證線程安全性,無(wú)論多少個(gè)線程訪問(wèn) Vector 也不會(huì)造成 Vector 的內(nèi)部產(chǎn)生破壞,但是從整個(gè)系統(tǒng)來(lái)說(shuō),是存在線程安全性的,事實(shí)上你運(yùn)行一下,也會(huì)發(fā)現(xiàn)報(bào)錯(cuò)。

會(huì)出現(xiàn)

如果線程 A 在包含這么多元素的基礎(chǔ)上調(diào)用?getVector?方法,會(huì)得到一個(gè)數(shù)值,getVector 只是取得該元素,而并不是從 vector 中移除,removeVector?方法是得到一個(gè)元素進(jìn)行移除,這段代碼的不安全因素就是,因?yàn)榫€程的時(shí)間片是亂序的,而且 getVector 和 removeVector 并不會(huì)保證互斥,所以在 removeVector 方法把某個(gè)值比如 6666 移除后,vector 中就不存在這個(gè) 6666 的元素,此時(shí) getVector 方法取得 6666 ,就會(huì)拋出數(shù)組越界異常。為什么是數(shù)組越界異常呢?可以看一下 vector 的源碼

如果用圖表示的話,則會(huì)是下面這樣。

所以,從系統(tǒng)的層面來(lái)看,上面這段代碼也要保證線程安全性才可以,也就是在客戶端加鎖?實(shí)現(xiàn),只要我們讓復(fù)合操作使用一把鎖,那么這些操作就和其他單獨(dú)的操作一樣都是原子性的。如下面例子所示

staticObjectgetVector(){

synchronized(vector){

intindex?=?vector.size()?-1;

returnvector.get(index);

}

}

staticvoidremoveVector(){

synchronized(vector)?{

intindex?=?vector.size()?-1;

vector.remove(index);

}

}

也可以通過(guò)鎖住?.class?來(lái)保證原子性操作,也能達(dá)到同樣的效果。

staticObjectgetVector(){

synchronized(TestVector.class){

intindex?=?vector.size()?-1;

returnvector.get(index);

}

}

staticvoidremoveVector(){

synchronized(TestVector.class)?{

intindex?=?vector.size()?-1;

vector.remove(index);

}

}

在調(diào)用 size 和 get 之間,Vector 的長(zhǎng)度可能會(huì)發(fā)生變化,這種變化在對(duì) Vector 進(jìn)行排序時(shí)出現(xiàn),如下所示

for(inti?=0;i<?vector.size();i++){

doSomething(vector.get(i));

}

這種迭代的操作正確性取決于運(yùn)氣,即在調(diào)用 size 和 get 之間會(huì)修改 Vector,在單線程環(huán)境中,這種假設(shè)完全成立,但是再有其他線程并發(fā)修改 Vector 時(shí),則可能會(huì)導(dǎo)致麻煩。

我們?nèi)耘f可以通過(guò)客戶端加鎖的方式來(lái)避免這種情況

synchronized(vector){

for(inti?=0;i<?vector.size();i++){

doSomething(vector.get(i));

}

}

這種方式為客戶端的可靠性提供了保證,但是犧牲了伸縮性,而且這種在遍歷過(guò)程中進(jìn)行加鎖,也不是我們所希望看到的。

fail-fast

針對(duì)上面這種情況,很多集合類都提供了一種?fail-fast?機(jī)制,因?yàn)榇蟛糠旨蟽?nèi)部都是使用 Iterator 進(jìn)行遍歷,在循環(huán)中使用同步鎖的開(kāi)銷會(huì)很大,而 Iterator 的創(chuàng)建是輕量級(jí)的,所以在集合內(nèi)部如果有并發(fā)修改的操作,集合會(huì)進(jìn)行快速失敗,也就是?fail-fast。當(dāng)他們發(fā)現(xiàn)容器在迭代過(guò)程中被修改時(shí),會(huì)拋出?ConcurrentModificationException異常,這種快速失敗不是一種完備的處理機(jī)制,而只是?善意的捕獲并發(fā)錯(cuò)誤。

如果查看過(guò) ConcurrentModificationException 的注解,你會(huì)發(fā)現(xiàn),ConcurrentModificationException 拋出的原則由兩種,如下

造成這種異常的原因是由于多個(gè)線程在遍歷集合的同時(shí)對(duì)集合類內(nèi)部進(jìn)行了修改,這也就是 fail-fast 機(jī)制。

該注解還聲明了另外一種方式

這個(gè)問(wèn)題也是很經(jīng)典的一個(gè)問(wèn)題,我們使用 ArrayList 來(lái)舉例子。如下代碼所示

publicstaticvoidmain(String[]?args){

List?list?=newArrayList<>();

for(inti?=0;?i?<10;?i++?)?{

list.add(i?+"");

}

Iterator?iterator?=?list.iterator();

inti?=0;

while(iterator.hasNext())?{

if(i?==3)?{

list.remove(3);

}

System.out.println(iterator.next());

i?++;

}

}

該段代碼會(huì)發(fā)生異常,因?yàn)樵?ArrayList 內(nèi)部,有兩個(gè)屬性,一個(gè)是?modCount?,一個(gè)是?expectedModCount?,ArrayList 在 remove 等對(duì)集合結(jié)構(gòu)的元素造成數(shù)量上的操作會(huì)有?checkForComodification?的判斷,如下所示,這也是這段代碼的錯(cuò)誤原因。

fail-safe

fail-safe?是 Java 中的一種?安全失敗?機(jī)制,它表示的是在遍歷時(shí)不是直接在原集合上進(jìn)行訪問(wèn),而是先復(fù)制原有集合內(nèi)容,在拷貝的集合上進(jìn)行遍歷。由于迭代時(shí)是對(duì)原集合的拷貝進(jìn)行遍歷,所以在遍歷過(guò)程中對(duì)原集合所作的修改并不能被迭代器檢測(cè)到,所以不會(huì)觸發(fā) ConcurrentModificationException。java.util.concurrent?包下的容器都是安全失敗的,可以在多線程條件下使用,并發(fā)修改。

比如?CopyOnWriteArrayList, 它就是一種 fail-safe 機(jī)制的集合,它就不會(huì)出現(xiàn)異常,例如如下操作

List?integers?=newCopyOnWriteArrayList<>();

integers.add(1);

integers.add(2);

integers.add(3);

Iterator?itr?=?integers.iterator();

while(itr.hasNext())?{

Integer?a?=?itr.next();

integers.remove(a);

}

CopyOnWriteArrayList 就是 ArrayList 的一種線程安全的變體,CopyOnWriteArrayList 中的所有可變操作比如 add 和 set 等等都是通過(guò)對(duì)數(shù)組進(jìn)行全新復(fù)制來(lái)實(shí)現(xiàn)的。

操作系統(tǒng)中的并發(fā)工具

講到并發(fā)容器,就不得不提操作系統(tǒng)級(jí)別實(shí)現(xiàn)了哪些進(jìn)程/線程間的并發(fā)容器,說(shuō)白了其實(shí)就是數(shù)據(jù)結(jié)構(gòu)的設(shè)計(jì)。下面我們就來(lái)一起看一下操作系統(tǒng)級(jí)別的并發(fā)工具

信號(hào)量

信號(hào)量是 E.W.Dijkstra 在 1965 年提出的一種方法,它使用一個(gè)整形變量來(lái)累計(jì)喚醒次數(shù),以供之后使用。在他的觀點(diǎn)中,有一個(gè)新的變量類型稱作?信號(hào)量(semaphore)。一個(gè)信號(hào)量的取值可以是 0 ,或任意正數(shù)。0 表示的是不需要任何喚醒,任意的正數(shù)表示的就是喚醒次數(shù)。

Dijkstra 提出了信號(hào)量有兩個(gè)操作,現(xiàn)在通常使用?down?和?up(分別可以用 sleep 和 wakeup 來(lái)表示)。down 這個(gè)指令的操作會(huì)檢查值是否大于 0 。如果大于 0 ,則將其值減 1 ;若該值為 0 ,則進(jìn)程將睡眠,而且此時(shí) down 操作將會(huì)繼續(xù)執(zhí)行。檢查數(shù)值、修改變量值以及可能發(fā)生的睡眠操作均為一個(gè)單一的、不可分割的?原子操作(atomic action)?完成。

互斥量

如果不需要信號(hào)量的計(jì)數(shù)能力時(shí),可以使用信號(hào)量的一個(gè)簡(jiǎn)單版本,稱為?mutex(互斥量)。互斥量的優(yōu)勢(shì)就在于在一些共享資源和一段代碼中保持互斥。由于互斥的實(shí)現(xiàn)既簡(jiǎn)單又有效,這使得互斥量在實(shí)現(xiàn)用戶空間線程包時(shí)非常有用。

互斥量是一個(gè)處于兩種狀態(tài)之一的共享變量:解鎖(unlocked)?和?加鎖(locked)。這樣,只需要一個(gè)二進(jìn)制位來(lái)表示它,不過(guò)一般情況下,通常會(huì)用一個(gè)?整型(integer)?來(lái)表示。0 表示解鎖,其他所有的值表示加鎖,比 1 大的值表示加鎖的次數(shù)。

mutex 使用兩個(gè)過(guò)程,當(dāng)一個(gè)線程(或者進(jìn)程)需要訪問(wèn)關(guān)鍵區(qū)域時(shí),會(huì)調(diào)用?mutex_lock?進(jìn)行加鎖。如果互斥鎖當(dāng)前處于解鎖狀態(tài)(表示關(guān)鍵區(qū)域可用),則調(diào)用成功,并且調(diào)用線程可以自由進(jìn)入關(guān)鍵區(qū)域。

另一方面,如果 mutex 互斥量已經(jīng)鎖定的話,調(diào)用線程會(huì)阻塞直到關(guān)鍵區(qū)域內(nèi)的線程執(zhí)行完畢并且調(diào)用了?mutex_unlock?。如果多個(gè)線程在 mutex 互斥量上阻塞,將隨機(jī)選擇一個(gè)線程并允許它獲得鎖。

Futexes

隨著并行的增加,有效的同步(synchronization)和鎖定(locking)?對(duì)于性能來(lái)說(shuō)是非常重要的。如果進(jìn)程等待時(shí)間很短,那么自旋鎖(Spin lock)?是非常有效;但是如果等待時(shí)間比較長(zhǎng),那么這會(huì)浪費(fèi) CPU 周期。如果進(jìn)程很多,那么阻塞此進(jìn)程,并僅當(dāng)鎖被釋放的時(shí)候讓內(nèi)核解除阻塞是更有效的方式。不幸的是,這種方式也會(huì)導(dǎo)致另外的問(wèn)題:它可以在進(jìn)程競(jìng)爭(zhēng)頻繁的時(shí)候運(yùn)行良好,但是在競(jìng)爭(zhēng)不是很激烈的情況下內(nèi)核切換的消耗會(huì)非常大,而且更困難的是,預(yù)測(cè)鎖的競(jìng)爭(zhēng)數(shù)量更不容易。

有一種有趣的解決方案是把兩者的優(yōu)點(diǎn)結(jié)合起來(lái),提出一種新的思想,稱為?futex,或者是?快速用戶空間互斥(fast user space mutex),是不是聽(tīng)起來(lái)很有意思?

futex 是?Linux?中的特性實(shí)現(xiàn)了基本的鎖定(很像是互斥鎖)而且避免了陷入內(nèi)核中,因?yàn)閮?nèi)核的切換的開(kāi)銷非常大,這樣做可以大大提高性能。futex 由兩部分組成:內(nèi)核服務(wù)和用戶庫(kù)。內(nèi)核服務(wù)提供了了一個(gè)?等待隊(duì)列(wait queue)?允許多個(gè)進(jìn)程在鎖上排隊(duì)等待。除非內(nèi)核明確的對(duì)他們解除阻塞,否則它們不會(huì)運(yùn)行。

Pthreads 中的互斥量

Pthreads 提供了一些功能用來(lái)同步線程。最基本的機(jī)制是使用互斥量變量,可以鎖定和解鎖,用來(lái)保護(hù)每個(gè)關(guān)鍵區(qū)域。希望進(jìn)入關(guān)鍵區(qū)域的線程首先要嘗試獲取 mutex。如果 mutex 沒(méi)有加鎖,線程能夠馬上進(jìn)入并且互斥量能夠自動(dòng)鎖定,從而阻止其他線程進(jìn)入。如果 mutex 已經(jīng)加鎖,調(diào)用線程會(huì)阻塞,直到 mutex 解鎖。如果多個(gè)線程在相同的互斥量上等待,當(dāng)互斥量解鎖時(shí),只有一個(gè)線程能夠進(jìn)入并且重新加鎖。這些鎖并不是必須的,程序員需要正確使用它們。

下面是與互斥量有關(guān)的函數(shù)調(diào)用

和我們想象中的一樣,mutex 能夠被創(chuàng)建和銷毀,扮演這兩個(gè)角色的分別是?Phread_mutex_init?和?Pthread_mutex_destroy。mutex 也可以通過(guò)?Pthread_mutex_lock?來(lái)進(jìn)行加鎖,如果互斥量已經(jīng)加鎖,則會(huì)阻塞調(diào)用者。還有一個(gè)調(diào)用Pthread_mutex_trylock?用來(lái)嘗試對(duì)線程加鎖,當(dāng) mutex 已經(jīng)被加鎖時(shí),會(huì)返回一個(gè)錯(cuò)誤代碼而不是阻塞調(diào)用者。這個(gè)調(diào)用允許線程有效的進(jìn)行忙等。最后,Pthread_mutex_unlock?會(huì)對(duì) mutex 解鎖并且釋放一個(gè)正在等待的線程。

除了互斥量以外,Pthreads?還提供了第二種同步機(jī)制:條件變量(condition variables)?。mutex 可以很好的允許或阻止對(duì)關(guān)鍵區(qū)域的訪問(wèn)。條件變量允許線程由于未滿足某些條件而阻塞。絕大多數(shù)情況下這兩種方法是一起使用的。下面我們進(jìn)一步來(lái)研究線程、互斥量、條件變量之間的關(guān)聯(lián)。

下面再來(lái)重新認(rèn)識(shí)一下生產(chǎn)者和消費(fèi)者問(wèn)題:一個(gè)線程將東西放在一個(gè)緩沖區(qū)內(nèi),由另一個(gè)線程將它們?nèi)〕?。如果生產(chǎn)者發(fā)現(xiàn)緩沖區(qū)沒(méi)有空槽可以使用了,生產(chǎn)者線程會(huì)阻塞起來(lái)直到有一個(gè)線程可以使用。生產(chǎn)者使用 mutex 來(lái)進(jìn)行原子性檢查從而不受其他線程干擾。但是當(dāng)發(fā)現(xiàn)緩沖區(qū)已經(jīng)滿了以后,生產(chǎn)者需要一種方法來(lái)阻塞自己并在以后被喚醒。這便是條件變量做的工作。

下面是一些與條件變量有關(guān)的最重要的 pthread 調(diào)用

上表中給出了一些調(diào)用用來(lái)創(chuàng)建和銷毀條件變量。條件變量上的主要屬性是?Pthread_cond_wait?和?Pthread_cond_signal。前者阻塞調(diào)用線程,直到其他線程發(fā)出信號(hào)為止(使用后者調(diào)用)。阻塞的線程通常需要等待喚醒的信號(hào)以此來(lái)釋放資源或者執(zhí)行某些其他活動(dòng)。只有這樣阻塞的線程才能繼續(xù)工作。條件變量允許等待與阻塞原子性的進(jìn)程。Pthread_cond_broadcast?用來(lái)喚醒多個(gè)阻塞的、需要等待信號(hào)喚醒的線程。

需要注意的是,條件變量(不像是信號(hào)量)不會(huì)存在于內(nèi)存中。如果將一個(gè)信號(hào)量傳遞給一個(gè)沒(méi)有線程等待的條件變量,那么這個(gè)信號(hào)就會(huì)丟失,這個(gè)需要注意

管程

為了能夠編寫(xiě)更加準(zhǔn)確無(wú)誤的程序,Brinch Hansen 和 Hoare 提出了一個(gè)更高級(jí)的同步原語(yǔ)叫做?管程(monitor)。管程有一個(gè)很重要的特性,即在任何時(shí)候管程中只能有一個(gè)活躍的進(jìn)程,這一特性使管程能夠很方便的實(shí)現(xiàn)互斥操作。管程是編程語(yǔ)言的特性,所以編譯器知道它們的特殊性,因此可以采用與其他過(guò)程調(diào)用不同的方法來(lái)處理對(duì)管程的調(diào)用。通常情況下,當(dāng)進(jìn)程調(diào)用管程中的程序時(shí),該程序的前幾條指令會(huì)檢查管程中是否有其他活躍的進(jìn)程。如果有的話,調(diào)用進(jìn)程將被掛起,直到另一個(gè)進(jìn)程離開(kāi)管程才將其喚醒。如果沒(méi)有活躍進(jìn)程在使用管程,那么該調(diào)用進(jìn)程才可以進(jìn)入。

進(jìn)入管程中的互斥由編譯器負(fù)責(zé),但是一種通用做法是使用?互斥量(mutex)?和?二進(jìn)制信號(hào)量(binary semaphore)。由于編譯器而不是程序員在操作,因此出錯(cuò)的幾率會(huì)大大降低。在任何時(shí)候,編寫(xiě)管程的程序員都無(wú)需關(guān)心編譯器是如何處理的。他只需要知道將所有的臨界區(qū)轉(zhuǎn)換成為管程過(guò)程即可。絕不會(huì)有兩個(gè)進(jìn)程同時(shí)執(zhí)行臨界區(qū)中的代碼。

即使管程提供了一種簡(jiǎn)單的方式來(lái)實(shí)現(xiàn)互斥,但在我們看來(lái),這還不夠。因?yàn)槲覀冞€需要一種在進(jìn)程無(wú)法執(zhí)行被阻塞。在生產(chǎn)者-消費(fèi)者問(wèn)題中,很容易將針對(duì)緩沖區(qū)滿和緩沖區(qū)空的測(cè)試放在管程程序中,但是生產(chǎn)者在發(fā)現(xiàn)緩沖區(qū)滿的時(shí)候該如何阻塞呢?

解決的辦法是引入條件變量(condition variables)?以及相關(guān)的兩個(gè)操作?wait?和?signal。當(dāng)一個(gè)管程程序發(fā)現(xiàn)它不能運(yùn)行時(shí)(例如,生產(chǎn)者發(fā)現(xiàn)緩沖區(qū)已滿),它會(huì)在某個(gè)條件變量(如 full)上執(zhí)行?wait?操作。這個(gè)操作造成調(diào)用進(jìn)程阻塞,并且還將另一個(gè)以前等在管程之外的進(jìn)程調(diào)入管程。在前面的 pthread 中我們已經(jīng)探討過(guò)條件變量的實(shí)現(xiàn)細(xì)節(jié)了。另一個(gè)進(jìn)程,比如消費(fèi)者可以通過(guò)執(zhí)行?signal?來(lái)喚醒阻塞的調(diào)用進(jìn)程。

通過(guò)臨界區(qū)自動(dòng)的互斥,管程比信號(hào)量更容易保證并行編程的正確性。但是管程也有缺點(diǎn),我們前面說(shuō)到過(guò)管程是一個(gè)編程語(yǔ)言的概念,編譯器必須要識(shí)別管程并用某種方式對(duì)其互斥作出保證。C、Pascal 以及大多數(shù)其他編程語(yǔ)言都沒(méi)有管程,所以不能依靠編譯器來(lái)遵守互斥規(guī)則。

與管程和信號(hào)量有關(guān)的另一個(gè)問(wèn)題是,這些機(jī)制都是設(shè)計(jì)用來(lái)解決訪問(wèn)共享內(nèi)存的一個(gè)或多個(gè) CPU 上的互斥問(wèn)題的。通過(guò)將信號(hào)量放在共享內(nèi)存中并用?TSL?或?XCHG?指令來(lái)保護(hù)它們,可以避免競(jìng)爭(zhēng)。但是如果是在分布式系統(tǒng)中,可能同時(shí)具有多個(gè) CPU 的情況,并且每個(gè) CPU 都有自己的私有內(nèi)存呢,它們通過(guò)網(wǎng)絡(luò)相連,那么這些原語(yǔ)將會(huì)失效。因?yàn)樾盘?hào)量太低級(jí)了,而管程在少數(shù)幾種編程語(yǔ)言之外無(wú)法使用,所以還需要其他方法。

消息傳遞

上面提到的其他方法就是?消息傳遞(messaage passing)。這種進(jìn)程間通信的方法使用兩個(gè)原語(yǔ)?send?和?receive?,它們像信號(hào)量而不像管程,是系統(tǒng)調(diào)用而不是語(yǔ)言級(jí)別。示例如下

send(destination,?&message);

receive(source,?&message);

send 方法用于向一個(gè)給定的目標(biāo)發(fā)送一條消息,receive 從一個(gè)給定的源接收一條消息。如果沒(méi)有消息,接受者可能被阻塞,直到接收一條消息或者帶著錯(cuò)誤碼返回。

消息傳遞系統(tǒng)現(xiàn)在面臨著許多信號(hào)量和管程所未涉及的問(wèn)題和設(shè)計(jì)難點(diǎn),尤其對(duì)那些在網(wǎng)絡(luò)中不同機(jī)器上的通信狀況。例如,消息有可能被網(wǎng)絡(luò)丟失。為了防止消息丟失,發(fā)送方和接收方可以達(dá)成一致:一旦接受到消息后,接收方馬上回送一條特殊的?確認(rèn)(acknowledgement)?消息。如果發(fā)送方在一段時(shí)間間隔內(nèi)未收到確認(rèn),則重發(fā)消息。

現(xiàn)在考慮消息本身被正確接收,而返回給發(fā)送著的確認(rèn)消息丟失的情況。發(fā)送者將重發(fā)消息,這樣接受者將收到兩次相同的消息。

對(duì)于接收者來(lái)說(shuō),如何區(qū)分新的消息和一條重發(fā)的老消息是非常重要的。通常采用在每條原始消息中嵌入一個(gè)連續(xù)的序號(hào)來(lái)解決此問(wèn)題。如果接受者收到一條消息,它具有與前面某一條消息一樣的序號(hào),就知道這條消息是重復(fù)的,可以忽略。

消息系統(tǒng)還必須處理如何命名進(jìn)程的問(wèn)題,以便在發(fā)送或接收調(diào)用中清晰的指明進(jìn)程。身份驗(yàn)證(authentication)?也是一個(gè)問(wèn)題,比如客戶端怎么知道它是在與一個(gè)真正的文件服務(wù)器通信,從發(fā)送方到接收方的信息有可能被中間人所篡改。

屏障

最后一個(gè)同步機(jī)制是準(zhǔn)備用于進(jìn)程組而不是進(jìn)程間的生產(chǎn)者-消費(fèi)者情況的。在某些應(yīng)用中劃分了若干階段,并且規(guī)定,除非所有的進(jìn)程都就緒準(zhǔn)備著手下一個(gè)階段,否則任何進(jìn)程都不能進(jìn)入下一個(gè)階段,可以通過(guò)在每個(gè)階段的結(jié)尾安裝一個(gè)?屏障(barrier)?來(lái)實(shí)現(xiàn)這種行為。當(dāng)一個(gè)進(jìn)程到達(dá)屏障時(shí),它會(huì)被屏障所攔截,直到所有的屏障都到達(dá)為止。屏障可用于一組進(jìn)程同步,如下圖所示

在上圖中我們可以看到,有四個(gè)進(jìn)程接近屏障,這意味著每個(gè)進(jìn)程都在進(jìn)行運(yùn)算,但是還沒(méi)有到達(dá)每個(gè)階段的結(jié)尾。過(guò)了一段時(shí)間后,A、B、D 三個(gè)進(jìn)程都到達(dá)了屏障,各自的進(jìn)程被掛起,但此時(shí)還不能進(jìn)入下一個(gè)階段呢,因?yàn)檫M(jìn)程 B 還沒(méi)有執(zhí)行完畢。結(jié)果,當(dāng)最后一個(gè) C 到達(dá)屏障后,這個(gè)進(jìn)程組才能夠進(jìn)入下一個(gè)階段。

避免鎖:讀-復(fù)制-更新

最快的鎖是根本沒(méi)有鎖。問(wèn)題在于沒(méi)有鎖的情況下,我們是否允許對(duì)共享數(shù)據(jù)結(jié)構(gòu)的并發(fā)讀寫(xiě)進(jìn)行訪問(wèn)。答案當(dāng)然是不可以。假設(shè)進(jìn)程 A 正在對(duì)一個(gè)數(shù)字?jǐn)?shù)組進(jìn)行排序,而進(jìn)程 B 正在計(jì)算其平均值,而此時(shí)你進(jìn)行 A 的移動(dòng),會(huì)導(dǎo)致 B 會(huì)多次讀到重復(fù)值,而某些值根本沒(méi)有遇到過(guò)。

然而,在某些情況下,我們可以允許寫(xiě)操作來(lái)更新數(shù)據(jù)結(jié)構(gòu),即便還有其他的進(jìn)程正在使用。竅門(mén)在于確保每個(gè)讀操作要么讀取舊的版本,要么讀取新的版本,例如下面的樹(shù)

上面的樹(shù)中,讀操作從根部到葉子遍歷整個(gè)樹(shù)。加入一個(gè)新節(jié)點(diǎn) X 后,為了實(shí)現(xiàn)這一操作,我們要讓這個(gè)節(jié)點(diǎn)在樹(shù)中可見(jiàn)之前使它"恰好正確":我們對(duì)節(jié)點(diǎn) X 中的所有值進(jìn)行初始化,包括它的子節(jié)點(diǎn)指針。然后通過(guò)原子寫(xiě)操作,使 X 稱為 A 的子節(jié)點(diǎn)。所有的讀操作都不會(huì)讀到前后不一致的版本

在上面的圖中,我們接著移除 B 和 D。首先,將 A 的左子節(jié)點(diǎn)指針指向 C 。所有原本在 A 中的讀操作將會(huì)后續(xù)讀到節(jié)點(diǎn) C ,而永遠(yuǎn)不會(huì)讀到 B 和 D。也就是說(shuō),它們將只會(huì)讀取到新版數(shù)據(jù)。同樣,所有當(dāng)前在 B 和 D 中的讀操作將繼續(xù)按照原始的數(shù)據(jù)結(jié)構(gòu)指針并且讀取舊版數(shù)據(jù)。所有操作均能正確運(yùn)行,我們不需要鎖住任何東西。而不需要鎖住數(shù)據(jù)就能夠移除 B 和 D 的主要原因就是?讀-復(fù)制-更新(Ready-Copy-Update,RCU),將更新過(guò)程中的移除和再分配過(guò)程分離開(kāi)。

Java 并發(fā)工具包

JDK 1.5 提供了許多種并發(fā)容器來(lái)改進(jìn)同步容器的性能,同步容器將所有對(duì)容器狀態(tài)的訪問(wèn)都串行化,以實(shí)現(xiàn)他們之間的線程安全性。這種方法的代價(jià)是嚴(yán)重降低了并發(fā)性能,當(dāng)多個(gè)線程爭(zhēng)搶容器鎖的同時(shí),嚴(yán)重降低吞吐量。

下面我們就來(lái)一起認(rèn)識(shí)一下 Java 中都用了哪些并發(fā)工具

Java 并發(fā)工具綜述

在 Java 5.0 中新增加了?ConcurrentHashMap?用來(lái)替代基于散列的 Map 容器;新增加了CopyOnWriteArrayList?和?CopyOnWriteArraySet?來(lái)分別替代 ArrayList 和 Set 接口實(shí)現(xiàn)類;還新增加了兩種容器類型,分別是?Queue?和?BlockingQueue, Queue 是隊(duì)列的意思,它有一些實(shí)現(xiàn)分別是傳統(tǒng)的先進(jìn)先出隊(duì)列?ConcurrentLinkedQueue以及并發(fā)優(yōu)先級(jí)隊(duì)列?PriorityQueue。Queue 是一個(gè)先入先出的隊(duì)列,它的操作不會(huì)阻塞,如果隊(duì)列為空那么獲取元素的操作會(huì)返回空值。PriorityQueue 擴(kuò)展了 Queue,增加了可阻塞的插入和獲取等操作。如果隊(duì)列為空,那么獲取元素的操作將一直阻塞,直到隊(duì)列中出現(xiàn)一個(gè)可用的元素為止。如果隊(duì)列已滿,那么插入操作則一直阻塞,直到隊(duì)列中有可用的空間為止。

Java 6.0 還引入了?ConcurrentSkipListMap?和?ConcurrentSkipListSet?分別作為同步的 SortedMap 和 SortedSet 的并發(fā)替代品。下面我們就展開(kāi)探討了,設(shè)計(jì)不到底層源碼,因?yàn)楸酒恼轮饕康木褪菫榱嗣枋鲆幌掠心男〇|西以及用了哪些東西。

ConcurrentHashMap

我們先來(lái)看一下 ConcurrentHashMap 在并發(fā)集合中的位置

可以看到,ConcurrentHashMap 繼承了?AbstractMap?接口并實(shí)現(xiàn)了 ConcurrentMap 和 Serializable 接口,AbstractMap 和 ConcurrentMap 都是 Map 的實(shí)現(xiàn)類,只不過(guò) AbstractMap 是抽象實(shí)現(xiàn)。

ConcurrentHashMap 和 Hashtable 的構(gòu)造非常相似,只不過(guò) Hashtable 容器在激烈競(jìng)爭(zhēng)的場(chǎng)景中會(huì)表現(xiàn)出效率低下的現(xiàn)象,這是因?yàn)樗性L問(wèn) Hashtable 的線程都想獲取同一把鎖,如果容器里面有多把鎖,并且每一把鎖都只用來(lái)鎖定一段數(shù)據(jù),那么當(dāng)多個(gè)線程訪問(wèn)不同的數(shù)據(jù)段時(shí),就不存在競(jìng)爭(zhēng)關(guān)系。這就是 ConcurreentHashMap 采用的?分段鎖?實(shí)現(xiàn)。在這種鎖實(shí)現(xiàn)中,任意數(shù)量的讀取線程可以并發(fā)的訪問(wèn) Map,執(zhí)行讀取操作的線程和執(zhí)行寫(xiě)入的線程可以并發(fā)的訪問(wèn) Map,并且在讀取的同時(shí)也可以并發(fā)修改 Map。

ConcurrentHashMap 分段鎖實(shí)現(xiàn)帶來(lái)的結(jié)果是,在并發(fā)環(huán)境下可以實(shí)現(xiàn)更高的吞吐量,在單線程環(huán)境下只損失非常小的性能。

你知道 HashMap 是具有 fail-fast 機(jī)制的,也就是說(shuō)它是一種強(qiáng)一致性的集合,在數(shù)據(jù)不一致的情況下會(huì)拋出?ConcurrentModificationException?異常,而 ConcurrentHashMap 是一種?弱一致性?的集合,在并發(fā)修改其內(nèi)部結(jié)構(gòu)時(shí),它不會(huì)拋出 ConcurrentModificationException 異常,弱一致性能夠容忍并發(fā)修改。

在 HashMap 中,我們一般使用的 size、empty、containsKey 等方法都是標(biāo)準(zhǔn)方法,其返回的結(jié)果是一定的,包含就是包含,不包含就是不包含,可以作為判斷條件;而 ConcurrentHashMap 中的這些方法只是參考方法,它不是一個(gè)?精確值,像是 size、empty 這些方法在并發(fā)場(chǎng)景下用處很小,因?yàn)樗麄兊姆祷刂悼偸窃诓粩嘧兓?,所以這些操作的需求就被弱化了。

在 ConcurrentHashMap 中沒(méi)有實(shí)現(xiàn)對(duì) Map 加鎖從而實(shí)現(xiàn)獨(dú)占訪問(wèn)。在線程安全的 Map 實(shí)現(xiàn)?Hashtable?和?Collections.synchronizedMap?中都實(shí)現(xiàn)了獨(dú)占訪問(wèn),因此只能單個(gè)線程修改 Map 。ConcurrentHashMap 與這些 Map 容器相比,具有更多的優(yōu)勢(shì)和更少的劣勢(shì),只有當(dāng)需要獨(dú)占訪問(wèn)的需求時(shí)才會(huì)使用 Hashtable 或者是 Collections.synchronizedMap ,否則其他并發(fā)場(chǎng)景下,應(yīng)該使用 ConcurrentHashMap。

ConcurrentMap

ConcurrentMap 是一個(gè)接口,它繼承了 Map 接口并提供了 Map 接口中四個(gè)新的方法,這四個(gè)方法都是?原子性?方法,進(jìn)一步擴(kuò)展了 Map 的功能。

publicinterfaceConcurrentMapextendsMap{

//?僅當(dāng)?key?沒(méi)有相應(yīng)的映射值時(shí)才插入

VputIfAbsent(K?key,?V?value);

//?僅當(dāng)?key?被映射到?value?時(shí)才移除

booleanremove(Object?key,?Object?value);

//?僅當(dāng)?key?被映射到?value?時(shí)才移除

Vreplace(K?key,?V?value);

//?僅當(dāng)?key?被映射到?oldValue?時(shí)才替換為?newValue

booleanreplace(K?key,?V?oldValue,?V?newValue);

}

ConcurrentNavigableMap

java.util.concurrent.ConcurrentNavigableMap?類是?java.util.NavigableMap?的子類,它支持并發(fā)訪問(wèn),并且允許其視圖的并發(fā)訪問(wèn)。

什么是視圖呢?視圖就是集合中的一段數(shù)據(jù)序列,ConcurrentNavigableMap 中支持使用?headMap、subMap、tailMap?返回的視圖。與其重新解釋一下 NavigableMap 中找到的所有方法,不如看一下 ConcurrentNavigableMap 中添加的方法

headMap 方法:headMap 方法返回一個(gè)嚴(yán)格小于給定鍵的視圖

tailMap 方法:tailMap 方法返回包含大于或等于給定鍵的視圖。

subMap 方法:subMap 方法返回給定兩個(gè)參數(shù)的視圖

ConcurrentNavigableMap 接口包含一些可能有用的其他方法

descendingKeySet()

descendingMap()

navigableKeySet()

更多關(guān)于方法的描述這里就不再贅述了,讀者朋友們可自行查閱 javadoc

ConcurrentSkipListMap

ConcurrentSkipListMap?是線程安全的有序的哈希表,適用于高并發(fā)的場(chǎng)景。

ConcurrentSkipListMap 的底層數(shù)據(jù)結(jié)構(gòu)是基于跳表實(shí)現(xiàn)的。ConcurrentSkipListMap 可以提供 Comparable 內(nèi)部排序或者是 Comparator 外部排序,具體取決于使用哪個(gè)構(gòu)造函數(shù)。

ConcurrentSkipListSet

ConcurrentSkipListSet?是線程安全的有序的集合,適用于高并發(fā)的場(chǎng)景。ConcurrentSkipListSet 底層是通過(guò) ConcurrentNavigableMap 來(lái)實(shí)現(xiàn)的,它是一個(gè)有序的線程安全的集合。

ConcurrentSkipListSet有序的,基于元素的自然排序或者通過(guò)比較器確定的順序;

ConcurrentSkipListSet是線程安全的;

CopyOnWriteArrayList

CopyOnWriteArrayList 是 ArrayList 的變體,在 CopyOnWriteArrayList 中,所有可變操作比如 add、set 其實(shí)都是重新創(chuàng)建了一個(gè)副本,通過(guò)對(duì)數(shù)組進(jìn)行復(fù)制而實(shí)現(xiàn)的。

CopyOnWriteArrayList 其內(nèi)部有一個(gè)指向數(shù)組的引用,數(shù)組是不會(huì)被修改的,每次并發(fā)修改 CopyOnWriteArrayList 都相當(dāng)于重新創(chuàng)建副本,CopyOnWriteArrayList 是一種?fail-safe?機(jī)制的,它不會(huì)拋出 ConcurrentModificationException 異常,并且返回元素與迭代器創(chuàng)建時(shí)的元素相同。

每次并發(fā)寫(xiě)操作都會(huì)創(chuàng)建新的副本,這個(gè)過(guò)程存在一定的開(kāi)銷,所以,一般在規(guī)模很大時(shí),讀操作要遠(yuǎn)遠(yuǎn)多于寫(xiě)操作時(shí),為了保證線程安全性,會(huì)使用 CopyOnWriteArrayList。

類似的,CopyOnWriteArraySet 的作用也相當(dāng)于替代了 Set 接口。

BlockingQueue

BlockingQueue 譯為?阻塞隊(duì)列,它是 JDK 1.5 添加的新的工具類,它繼承于 Queue隊(duì)列,并擴(kuò)展了 Queue 的功能。

BlockingQueue 在檢索元素時(shí)會(huì)等待隊(duì)列變成非空,并在存儲(chǔ)元素時(shí)會(huì)等待隊(duì)列變?yōu)榭捎谩lockingQueue 的方法有四種實(shí)現(xiàn)形式,以不同的方式來(lái)處理。

第一種是拋出異常

特殊值:第二種是根據(jù)情況會(huì)返回 null 或者 false

阻塞:第三種是無(wú)限期的阻塞當(dāng)前線程直到操作變?yōu)榭捎煤?/p>

超時(shí):第四種是給定一個(gè)最大的超時(shí)時(shí)間,超過(guò)后才會(huì)放棄

BlockingQueue 不允許添加 null 元素,在其實(shí)現(xiàn)類的方法?add、put 或者 offer?后時(shí)添加 null 會(huì)拋出空指針異常。BlockingQueue 會(huì)有容量限制。在任意時(shí)間內(nèi),它都會(huì)有一個(gè) remainCapacity,超過(guò)該值之前,任意 put 元素都會(huì)阻塞。

BlockingQueue 一般用于實(shí)現(xiàn)生產(chǎn)者 - 消費(fèi)者?隊(duì)列,如下圖所示

BlockingQueue 有多種實(shí)現(xiàn),下面我們一起來(lái)認(rèn)識(shí)一下這些容器。

其中?LinkedBlockingQueue?和?ArrayBlockingQueue?是 FIFO 先入先出隊(duì)列,二者分別和LinkedList和?ArrayList?對(duì)應(yīng),比同步 List 具有更好的并發(fā)性能。PriorityBlockingQueue?是一個(gè)優(yōu)先級(jí)排序的阻塞隊(duì)列,如果你希望按照某種順序而不是 FIFO 處理元素時(shí)這個(gè)隊(duì)列將非常有用。正如其他有序的容器一樣,PriorityBlockingQueue 既可以按照自然順序來(lái)比較元素,也可以使用?Comparator?比較器進(jìn)行外部元素比較。SynchronousQueue?它維護(hù)的是一組線程而不是一組隊(duì)列,實(shí)際上它不是一個(gè)隊(duì)列,它的每個(gè) insert 操作必須等待其他相關(guān)線程的 remove 方法后才能執(zhí)行,反之亦然。

LinkedBlockingQueue

LinkedBlockingQueue?是一種 BlockingQueue 的實(shí)現(xiàn)。

它是一種基于鏈表的構(gòu)造、先入先出的有界阻塞隊(duì)列。隊(duì)列的?head?也就是頭元素是在隊(duì)列中等待時(shí)間最長(zhǎng)的元素;隊(duì)列的?tail也就是隊(duì)尾元素是隊(duì)列中等待時(shí)間最短的元素。新的元素會(huì)被插入到隊(duì)尾中,檢索操作將獲取隊(duì)列中的頭部元素。鏈表隊(duì)列通常比基于數(shù)組的隊(duì)列具有更高的吞吐量,但是在大多數(shù)并發(fā)應(yīng)用程序中,可預(yù)測(cè)的性能較差。

ArrayBlockingQueue

ArrayBlockingQueue?是一個(gè)用數(shù)組實(shí)現(xiàn)的有界隊(duì)列,此隊(duì)列順序按照先入先出的原則對(duì)元素進(jìn)行排序。

默認(rèn)情況下不保證線程公平的訪問(wèn)隊(duì)列,所謂公平訪問(wèn)隊(duì)列指的是阻塞的線程,可以按照阻塞的先后順序訪問(wèn),即先阻塞線程先訪問(wèn)隊(duì)列。非公平性是對(duì)先等待的線程是非公平的。有可能先阻塞的線程最后才訪問(wèn)隊(duì)列。

PriorityBlockingQueue

PriorityBlockingQueue?是一個(gè)支持優(yōu)先級(jí)的阻塞隊(duì)列,默認(rèn)情況下的元素采取自然順序生序或者降序,也可以自己定義 Comparator 進(jìn)行外部排序。但需要注意的是不能保證同優(yōu)先級(jí)元素的順序。

DelayQueue

DelayQueue?是一個(gè)支持延時(shí)獲取元素的無(wú)阻塞隊(duì)列,其中的元素只能在延遲到期后才能使用,DelayQueue 中的隊(duì)列頭是延遲最長(zhǎng)時(shí)間的元素,如果沒(méi)有延遲,則沒(méi)有 head 頭元素,poll 方法會(huì)返回 null。判斷的依據(jù)就是?getDelay(TimeUnit.NANOSECONDS)?方法返回一個(gè)值小于或者等于 0 就會(huì)發(fā)生過(guò)期。

TransferQueue

TransferQueue 繼承于 BlockingQueue,它是一個(gè)接口,一個(gè) BlockingQueue 是一個(gè)生產(chǎn)者可能等待消費(fèi)者接受元素,TransferQueue 則更進(jìn)一步,生產(chǎn)者會(huì)一直阻塞直到所添加到隊(duì)列的元素被某一個(gè)消費(fèi)者所消費(fèi),新添加的transfer 方法用來(lái)實(shí)現(xiàn)這種約束。

TransferQueue 有下面這些方法:兩個(gè)?tryTransfer?方法,一個(gè)是非阻塞的,另一個(gè)是帶有 timeout 參數(shù)設(shè)置超時(shí)時(shí)間的。還有兩個(gè)輔助方法?hasWaitingConsumer?和?getWaitingConcusmerCount。

LinkedTransferQueue

一個(gè)無(wú)界的基于鏈表的 TransferQueue。這個(gè)隊(duì)列對(duì)任何給定的生產(chǎn)者進(jìn)行 FIFO 排序,head 是隊(duì)列中存在時(shí)間最長(zhǎng)的元素。tail 是隊(duì)列中存在時(shí)間最短的元素。

BlockingDeque

與 BlockingQueue 相對(duì)的還有 BlockingDeque 和 Deque,它們是 JDK1.6 被提出的,分別對(duì) Queue 和 BlockingQueue 做了擴(kuò)展。

Deque?是一個(gè)雙端隊(duì)列,分別實(shí)現(xiàn)了在隊(duì)列頭和隊(duì)列尾的插入。Deque 的實(shí)現(xiàn)有?ArrayDeque、ConcurrentLinkedDeque,BlockingDeque 的實(shí)現(xiàn)有?LinkedBlockingDeque?。

阻塞模式一般用于生產(chǎn)者 - 消費(fèi)者隊(duì)列,而雙端隊(duì)列適用于工作密取。在工作密取的設(shè)計(jì)中,每個(gè)消費(fèi)者都有各自的雙端隊(duì)列,如果一個(gè)消費(fèi)者完成了自己雙端隊(duì)列的任務(wù),就會(huì)去其他雙端隊(duì)列的末尾進(jìn)行消費(fèi)。密取方式要比傳統(tǒng)的生產(chǎn)者 - 消費(fèi)者隊(duì)列具有更高的可伸縮性,這是因?yàn)槊總€(gè)工作密取的工作者都有自己的雙端隊(duì)列,不存在競(jìng)爭(zhēng)的情況。

ArrayDeque

ArrayDeque 是 Deque 的可動(dòng)態(tài)調(diào)整大小的數(shù)組實(shí)現(xiàn),其內(nèi)部沒(méi)有容量限制,他們會(huì)根據(jù)需要進(jìn)行增長(zhǎng)。ArrayDeque 不是線程安全的,如果沒(méi)有外部加鎖的情況下,不支持多線程訪問(wèn)。ArrayDeque 禁止空元素,這個(gè)類作為棧使用時(shí)要比 Stack 快,作為 queue 使用時(shí)要比 LinkedList 快。

除了 remove、removeFirstOccurrence、removeLastOccurrence、contains、interator.remove 外,大部分的 ArrayDeque 都以恒定的開(kāi)銷運(yùn)行。

注意:ArrayDeque 是 fail-fast 的,如果創(chuàng)建了迭代器之后,卻使用了迭代器外部的 remove 等修改方法,那么這個(gè)類將會(huì)拋出 ConcurrentModificationException 異常。

ConcurrentLinkedDeque

ConcurrentLinkedDeque?是 JDK1.7 引入的雙向鏈表的無(wú)界并發(fā)隊(duì)列。它與 ConcurrentLinkedQueue 的區(qū)別是 ConcurrentLinkedDeque 同時(shí)支持 FIFO 和 FILO 兩種操作方式,即可以從隊(duì)列的頭和尾同時(shí)操作(插入/刪除)。ConcurrentLinkedDeque 也支持?happen-before?原則。ConcurrentLinkedDeque 不允許空元素。

LinkedBlockingDeque

LinkedBlockingDeque?是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列,即可以從隊(duì)列的兩端插入和移除元素。雙向隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競(jìng)爭(zhēng)。LinkedBlockingDeque 把初始容量和構(gòu)造函數(shù)綁定,這樣能夠有效過(guò)度拓展。初始容量如果沒(méi)有指定,就取的是?Integer.MAX_VALUE,這也是 LinkedBlockingDeque 的默認(rèn)構(gòu)造函數(shù)。

同步工具類

同步工具類可以是任何一個(gè)對(duì)象,只要它根據(jù)自身狀態(tài)來(lái)協(xié)調(diào)線程的控制流。阻塞隊(duì)列可以作為同步控制類,其他類型的同步工具類還包括?信號(hào)量(Semaphore)、柵欄(Barrier)和?閉鎖(Latch)。下面我們就來(lái)一起認(rèn)識(shí)一下這些工具類

Semaphore

Semaphore 翻譯過(guò)來(lái)就是?信號(hào)量,信號(hào)量是什么?它其實(shí)就是一種信號(hào),在操作系統(tǒng)中,也有信號(hào)量的這個(gè)概念,在進(jìn)程間通信的時(shí)候,我們就會(huì)談到信號(hào)量進(jìn)行通信。還有在 Linux 操作系統(tǒng)采取中斷時(shí),也會(huì)向進(jìn)程發(fā)出中斷信號(hào),根據(jù)進(jìn)程的種類和信號(hào)的類型判斷是否應(yīng)該結(jié)束進(jìn)程。

在 Java 中,Semaphore(信號(hào)量)是用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù)量,它通過(guò)協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。

Semaphore 管理著一組許可(permit),許可的初始數(shù)量由構(gòu)造函數(shù)來(lái)指定。在獲取某個(gè)資源之前,應(yīng)該先從信號(hào)量獲取許可(permit),以確保資源是否可用。當(dāng)線程完成對(duì)資源的操作后,會(huì)把它放在池中并向信號(hào)量返回一個(gè)許可,從而允許其他線程訪問(wèn)資源,這叫做釋放許可。如果沒(méi)有許可的話,那么?acquire?將會(huì)阻塞直到有許可(中斷或者操作超時(shí))為止。release?方法將返回一個(gè)許可信號(hào)量。

Semaphore 可以用來(lái)實(shí)現(xiàn)流量控制,例如常用的數(shù)據(jù)庫(kù)連接池,線程請(qǐng)求資源時(shí),如果數(shù)據(jù)庫(kù)連接池為空則阻塞線程,直接返回失敗,如果連接池不為空時(shí)解除阻塞。

CountDownLatch

閉鎖(Latch)?是一種同步工具類,它可以延遲線程的進(jìn)度以直到其到達(dá)終止?fàn)顟B(tài)。閉鎖的作用相當(dāng)于是一扇門(mén),在閉鎖達(dá)到結(jié)束狀態(tài)前,門(mén)是一直關(guān)著的,沒(méi)有任何線程能夠通過(guò)。當(dāng)閉鎖到達(dá)結(jié)束狀態(tài)后,這扇門(mén)會(huì)打開(kāi)并且允許任何線程通過(guò),然后就一直保持打開(kāi)狀態(tài)。

CountDownLatch?就是閉鎖的一種實(shí)現(xiàn)。它可以使一個(gè)或者多個(gè)線程等待一組事件的發(fā)生。閉鎖有一個(gè)計(jì)數(shù)器,閉鎖需要對(duì)計(jì)數(shù)器進(jìn)行初始化,表示需要等待的次數(shù),閉鎖在調(diào)用?await?處進(jìn)行等待,其他線程在調(diào)用 countDown 把閉鎖 count 次數(shù)進(jìn)行遞減,直到遞減為 0 ,喚醒 await。如下代碼所示

publicclassTCountDownLatch{

publicstaticvoidmain(String[]?args){

CountDownLatch?latch?=newCountDownLatch(5);

Increment?increment?=newIncrement(latch);

Decrement?decrement?=newDecrement(latch);

newThread(increment).start();

newThread(decrement).start();

try{

Thread.sleep(6000);

}catch(InterruptedException?e)?{

e.printStackTrace();

}

}

}

classDecrementimplementsRunnable{

CountDownLatch?countDownLatch;

publicDecrement(CountDownLatch?countDownLatch){

this.countDownLatch?=?countDownLatch;

}

@Override

publicvoidrun(){

try{

for(longi?=?countDownLatch.getCount();i?>0;i--){

Thread.sleep(1000);

System.out.println("countdown");

this.countDownLatch.countDown();

}

}catch(InterruptedException?e)?{

e.printStackTrace();

}

}

}

classIncrementimplementsRunnable{

CountDownLatch?countDownLatch;

publicIncrement(CountDownLatch?countDownLatch){

this.countDownLatch?=?countDownLatch;

}

@Override

publicvoidrun(){

try{

System.out.println("await");

countDownLatch.await();

}catch(InterruptedException?e)?{

e.printStackTrace();

}

System.out.println("Waiter?Released");

}

}

Future

我們常見(jiàn)的創(chuàng)建多線程的方式有兩種,一種是繼承 Thread 類,一種是實(shí)現(xiàn) Runnable 接口。這兩種方式都沒(méi)有返回值。相對(duì)的,創(chuàng)建多線程還有其他三種方式,那就是使用?Callable接口、?Future?接口和?FutureTask?類。Callable 我們之前聊過(guò),這里就不再描述了,我們主要來(lái)描述一下 Future 和 FutureTask 接口。

Future 就是對(duì)具體的 Runnable 或者 Callable 任務(wù)的執(zhí)行結(jié)果進(jìn)行一系列的操作,必要時(shí)可通過(guò)?get?方法獲取執(zhí)行結(jié)果,這個(gè)方法會(huì)阻塞直到執(zhí)行結(jié)束。Future 中的主要方法有

publicinterfaceFuture{

booleancancel(booleanmayInterruptIfRunning);

booleanisCancelled();

booleanisDone();

Vget()throwsInterruptedException,?ExecutionException;

Vget(longtimeout,?TimeUnit?unit)

throwsInterruptedException,?ExecutionException,?TimeoutException

;

}

cancel(boolean mayInterruptIfRunning)?: 嘗試取消任務(wù)的執(zhí)行。如果任務(wù)已經(jīng)完成、已經(jīng)被取消或者由于某些原因而無(wú)法取消,那么這個(gè)嘗試會(huì)失敗。如果取消成功,或者在調(diào)用 cancel 時(shí)此任務(wù)尚未開(kāi)始,那么此任務(wù)永遠(yuǎn)不會(huì)執(zhí)行。如果任務(wù)已經(jīng)開(kāi)始,那么 mayInterruptIfRunning 參數(shù)會(huì)確定是否中斷執(zhí)行任務(wù)以便于嘗試停止該任務(wù)。這個(gè)方法返回后,會(huì)對(duì)?isDone?的后續(xù)調(diào)用也返回 true,如果 cancel 返回 true,那么后續(xù)的調(diào)用?isCancelled?也會(huì)返回 true。

boolean isCancelled():如果此任務(wù)在正常完成之前被取消,則返回 true。

boolean isDone():如果任務(wù)完成,返回 true。

V get() throws InterruptedException, ExecutionException:等待必要的計(jì)算完成,然后檢索其結(jié)果

V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException?:必要時(shí)最多等待給定時(shí)間以完成計(jì)算,然后檢索其結(jié)果。

因?yàn)镕uture只是一個(gè)接口,所以是無(wú)法直接用來(lái)創(chuàng)建對(duì)象使用的,因此就有了下面的FutureTask。

FutureTask

FutureTask 實(shí)現(xiàn)了?RunnableFuture?接口,RunnableFuture 接口是什么呢?

RunnableFuture 接口又繼承了?Runnable?接口和?Future?接口。納尼?在 Java 中不是只允許單繼承么,是的,單繼承更多的是說(shuō)的類與類之間的繼承關(guān)系,子類繼承父類,擴(kuò)展父類的接口,這個(gè)過(guò)程是單向的,就是為了解決多繼承引起的過(guò)渡引用問(wèn)題。而接口之間的繼承是接口的擴(kuò)展,在 Java 編程思想中也印證了這一點(diǎn)

對(duì) RunnableFuture 接口的解釋是:成功執(zhí)行的 run 方法會(huì)使 Future 接口的完成并允許訪問(wèn)其結(jié)果。所以它既可以作為 Runnable 被線程執(zhí)行,又可以作為 Future 得到 Callable 的返回值。

FutureTask 也可以用作閉鎖,它可以處于以下三種狀態(tài)

等待運(yùn)行

正在運(yùn)行

運(yùn)行完成

FutureTask 在?Executor?框架中表示異步任務(wù),此外還可以表示一些時(shí)間較長(zhǎng)的計(jì)算,這些計(jì)算可以在使用計(jì)算結(jié)果之前啟動(dòng)。

FutureTask 具體的源碼我后面會(huì)單獨(dú)出文章進(jìn)行描述。

Barrier

我們上面聊到了通過(guò)閉鎖來(lái)啟動(dòng)一組相關(guān)的操作,使用閉鎖來(lái)等待一組事件的執(zhí)行。閉鎖是一種一次性對(duì)象,一旦進(jìn)入終止?fàn)顟B(tài)后,就不能被?重置。

Barrier?的特點(diǎn)和閉鎖也很類似,它也是阻塞一組線程直到某個(gè)事件發(fā)生。柵欄與閉鎖的區(qū)別在于,所有線程必須同時(shí)到達(dá)柵欄的位置,才能繼續(xù)執(zhí)行,就像我們上面操作系統(tǒng)給出的這幅圖一樣。

ABCD 四條線程,必須同時(shí)到達(dá) Barrier,然后?手牽手一起走過(guò)幸福的殿堂。

當(dāng)線程到達(dá) Barrier 的位置時(shí)會(huì)調(diào)用?await?方法,這個(gè)方法會(huì)阻塞直到所有線程都到達(dá) Barrier 的位置,如果所有線程都到達(dá) Barrier 的位置,那么 Barrier 將會(huì)打開(kāi)使所有線程都被釋放,而 Barrier 將被重置以等待下次使用。如果調(diào)用 await 方法導(dǎo)致超時(shí),或者 await 阻塞的線程被中斷,那么 Barrier 就被認(rèn)為被打破,所有阻塞的 await 都會(huì)拋出?BrokenBarrierException?。如果成功通過(guò)柵欄后,await 方法返回一個(gè)唯一索引號(hào),可以利用這些索引號(hào)選舉一個(gè)新的 leader,來(lái)處理一下其他工作。

publicclassTCyclicBarrier{

publicstaticvoidmain(String[]?args){

Runnable?runnable?=?()?->?System.out.println("Barrier?1?開(kāi)始...");

Runnable?runnable2?=?()?->?System.out.println("Barrier?2?開(kāi)始...");

CyclicBarrier?barrier1?=newCyclicBarrier(2,runnable);

CyclicBarrier?barrier2?=newCyclicBarrier(2,runnable2);

CyclicBarrierRunnable?b1?=newCyclicBarrierRunnable(barrier1,barrier2);

CyclicBarrierRunnable?b2?=newCyclicBarrierRunnable(barrier1,barrier2);

newThread(b1).start();

newThread(b2).start();

}

}

classCyclicBarrierRunnableimplementsRunnable{

CyclicBarrier?barrier1;

CyclicBarrier?barrier2;

publicCyclicBarrierRunnable(CyclicBarrier?barrier1,CyclicBarrier?barrier2){

this.barrier1?=?barrier1;

this.barrier2?=?barrier2;

}

@Override

publicvoidrun(){

try{

Thread.sleep(1000);

System.out.println(Thread.currentThread().getName()?+"等待?barrier1");

barrier1.await();

Thread.sleep(1000);

System.out.println(Thread.currentThread().getName()?+"等待?barrier2");

barrier2.await();

}catch(InterruptedException?|?BrokenBarrierException?e)?{

e.printStackTrace();

}

System.out.println(Thread.currentThread().getName()?+

"?做完了!");

}

}

Exchanger

與 Barrier 相關(guān)聯(lián)的還有一個(gè)工具類就是?Exchanger, Exchanger 是一個(gè)用于線程間協(xié)作的工具類。Exchanger用于進(jìn)行線程間的數(shù)據(jù)交換。

它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn)兩個(gè)線程可以交換彼此的數(shù)據(jù)。這兩個(gè)線程通過(guò)exchange 方法交換數(shù)據(jù), 如果第一個(gè)線程先執(zhí)行 exchange方法,它會(huì)一直等待第二個(gè)線程也執(zhí)行 exchange,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí),這兩個(gè)線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來(lái)的數(shù)據(jù)傳遞給對(duì)方。因此使用Exchanger 的重點(diǎn)是成對(duì)的線程使用 exchange() 方法,當(dāng)有一對(duì)線程達(dá)到了同步點(diǎn),就會(huì)進(jìn)行交換數(shù)據(jù)。因此該工具類的線程對(duì)象是成對(duì)的。

下面通過(guò)一段例子代碼來(lái)講解一下

publicclassTExchanger{

publicstaticvoidmain(String[]?args){

Exchanger?exchanger?=newExchanger();

ExchangerRunnable?exchangerRunnable?=newExchangerRunnable(exchanger,"A");

ExchangerRunnable?exchangerRunnable2?=newExchangerRunnable(exchanger,"B");

newThread(exchangerRunnable).start();

newThread(exchangerRunnable2).start();

}

}

classExchangerRunnableimplementsRunnable{

Exchanger?exchanger;

Object?object;

publicExchangerRunnable(Exchanger?exchanger,Object?object){

this.exchanger?=?exchanger;

this.object?=?object;

}

@Override

publicvoidrun(){

Object?previous?=?object;

try{

object?=this.exchanger.exchange(object);

System.out.println(

Thread.currentThread().getName()?+"改變前是"+?previous?+"改變后是"+?object);

}catch(InterruptedException?e)?{

e.printStackTrace();

}

}

}

總結(jié)

本篇文章我們從同步容器類入手,主要講了?fail-fast?和?fail-safe?機(jī)制,這兩個(gè)機(jī)制在并發(fā)編程中非常重要。然后我們從操作系統(tǒng)的角度,聊了聊操作系統(tǒng)層面實(shí)現(xiàn)安全性的幾種方式,然后從操作系統(tǒng) -> 并發(fā)我們聊了聊 Java 中的并發(fā)工具包有哪些,以及構(gòu)建并發(fā)的幾種工具類。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容