大而全的java并發(fā)編程總結(jié)

我以為,學(xué)習(xí)java編程,關(guān)鍵在于三點,OOP,并發(fā)和JVM。最后兩點其實是聯(lián)系比較緊密的,并且是屬于java語言特有的屬性。之前在學(xué)習(xí)java的時候?qū)Σl(fā)的理解比較粗淺,利用這一段時間,進一步的學(xué)習(xí)了java并發(fā)的定義,原理和例子,寫篇文章總結(jié)一下。

之前看過一篇關(guān)于Java多線程編程的文章,里面有一句話特別好,說的是多線程編程看上去很深奧,在學(xué)習(xí)概念,理解和設(shè)計并發(fā)程序上很容易出錯。但是

從根本上來看,所謂的多線程編程,不過是JVM或者說當前的計算機體系結(jié)構(gòu)無法處理好多線程下資源競爭的情況而人為加上的一些處理方法。這樣的方法導(dǎo)致在實現(xiàn)相同功能時候會產(chǎn)生很多復(fù)雜的,讓開發(fā)者難以理解或者設(shè)計缺陷,僅此而已。

有了這樣的前提,我們可以認為,多線程編程無非是為了更好的壓榨CPU的性能,人為設(shè)計出來的補償機制。不過在宏觀上,我們可以藐視這樣的機制,但是在工作里,還是不能避免要用到它,而且還要用好它。

先說說一些基本定義,這些定義在無數(shù)的博客和書籍上都有解釋,假定讀者已經(jīng)有所了解,這里只是枚舉出最簡潔的幾點。

  • 進程與線程的區(qū)別(面試題常見題,但是一般問出這個問題的面試官要么是真沒實際開發(fā)多線程程序的經(jīng)驗,要么是對面試者比較失望,問個簡單的理論問題湊個數(shù)。。。)

  • 線程的狀態(tài)


    別人總結(jié)的線程狀態(tài)
  • Java中多線程的實現(xiàn)

    • Interface: Runnable, Callable, Future, ExecutorService
    • Class: Thread, FutureTask
  • 說明

    • Thread,實現(xiàn)類,start()方法將線程變?yōu)榭蛇\行狀態(tài),在運行態(tài)的時候調(diào)用定義的run()方法。不過一般不會有人用定義子類的方式定義一個線程
    • Runnable, 接口,通常實現(xiàn)這個接口,然后作為構(gòu)造參數(shù)新建一個線程實例
    • Callable, Java 1.5, java.util.concurrent, 與runnable類似,call()方法可以返回線程運行的狀態(tài),并且可以拋出異常
    • FutureTask,包裝器,處于thread和callable的中間,它通過接受Callable來創(chuàng)建,同時實現(xiàn)了Future和Runnable接口,可以檢查線程的狀態(tài)

Java語法中的多線程機制

synchronized 關(guān)鍵字

synchronized關(guān)鍵字是Java 1.0就有的語法元素。在Java中,所有的object實例(class也是一種object)都可以作為多線程環(huán)境下得競爭資源,所以每個oject上都有一個鎖的標記,在執(zhí)行關(guān)鍵代碼的時候,對非null的object加上synchronize關(guān)鍵字,標記一個代碼塊,可以自動對某個對象加解鎖。

舉個小栗子:

public class SynchronizedTest {

    private int count = 0;

    public synchronized void increaseCount() {
        count++;
    }

    public int getCount() {
        return count;
    }

    public static void main(String[] args) throws InterruptedException {
        SynchronizedTest synchronizedTest = new SynchronizedTest();
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i = 0; i < 5000; i++) {
                    synchronizedTest.increaseCount();
                }
            }
        }, "Thread1");

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i = 0; i < 5000; i++) {
                    synchronizedTest.increaseCount();
                }
            }
        }, "Thread2");

        thread1.start();
        thread2.start();
        Thread.currentThread().join(2000);//main thread waiting for sub-threads perform
        System.out.println("count is: " + synchronizedTest.getCount());
    }
}

啟兩個線程,并發(fā)的對一個變量做自增操作,這個操作在synchronized標識下變?yōu)榇械倪^程,最后輸出10000,如果不加synchronized,結(jié)果會小于10000。

synchronized可能產(chǎn)生死鎖

public class DeadLock {

    private Object a = new Object();
    private Object b = new Object();

    public static void main(String[] args) throws InterruptedException {
        DeadLock deadLock = new DeadLock();
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (deadLock.a) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    synchronized (deadLock.b) {
                        System.out.println("Thread 1 enter");
                    }
                }
            }
        }, "Thread1");

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (deadLock.b) {
                    synchronized (deadLock.a) {
                        System.out.println("Thread 2 enter");
                    }
                }
            }
        }, "Thread2");

        thread1.start();
        thread2.start();
    }
}

以上,thread1在請求到a的鎖之后會帶著鎖睡一會兒,然后再請求b的鎖,但是這是b的鎖已經(jīng)在thread2手里了,同時thread2還在請求a的鎖,變成了循環(huán)等待并且是無限等待,于是產(chǎn)生了死鎖。運行結(jié)果是兩個線程都無限的等待下去。要想解這樣的死鎖,可以在競爭資源上加上是否被鎖的標記位,然后引入等待超時的機制,使得有一方在請求資源超時之后做出讓步,把手上已有的鎖也釋放了,改變循環(huán)等待的狀態(tài)。但是,即使有了超時機制,也需要注意有過度退讓的情況存在,形象的說,好比在一個只能容納一個人通過的窄巷里,你和另一個人迎面走來,然后你們發(fā)現(xiàn)這樣誰也過不去,于是都高風(fēng)亮節(jié)的往后退出巷子,然后等待一會兒,又很默契的一起走了進去,結(jié)果是悲劇的又發(fā)生了死鎖的情況,而且會持續(xù)下去。這就需要兩個線程之間需要知道對方的情況而不是盲目的退讓。

Synchronize的可重入性

所以可重入性,是指在某個線程得到某個對象的鎖之后,不需要額外申請該對象的鎖也可以進入關(guān)鍵代碼塊。

Synchronized的JVM層實現(xiàn)

Synchronized在設(shè)計之初被實現(xiàn)為一種重量鎖,每次做互斥系統(tǒng)開銷很大。在Java 1.6之后做了優(yōu)化調(diào)整,加入鎖升級的機制去減小每次鎖的開銷。在JVM中,每個object都有一個header,保存object的一些信息,普通對象頭的長度為兩個字,數(shù)組對象頭的長度為三個字(JVM內(nèi)存字長等于虛擬機位數(shù),32位虛擬機即32位一字,64位亦然),其中有兩個bit位記錄了對象的鎖類型:

別人畫的MarkWord結(jié)構(gòu)圖

偏向鎖

鎖對象第一次被線程獲取的時候,虛擬機把對象頭的status設(shè)置為"01",偏向鎖狀態(tài),當發(fā)生鎖重入時,只需要檢查MarkValue中的ThreadID是否與當前線程ID相同即可,相同即可直接重入。偏向鎖的釋放不需要做任何事情,這也就意味著加過偏向鎖的MarkValue會一直保留偏向鎖的狀態(tài),因此即便同一個線程持續(xù)不斷地加鎖解鎖,也是沒有開銷的。

一般偏向鎖是在有不同線程申請鎖時升級為輕量鎖,這也就意味著假如一個對象先被線程1加鎖解鎖,再被線程2加鎖解鎖,這過程中沒有鎖沖突,也一樣會發(fā)生偏向鎖失效,不同的是這回要先退化為無鎖的狀態(tài),再加輕量鎖。

引入偏向鎖是為了在無多線程競爭的情況下盡量減少不必要的輕量級鎖執(zhí)行路徑,因為輕量級鎖的獲取及釋放依賴多次CAS原子指令,而偏向鎖只需要在置換ThreadID的時候依賴一次CAS原子指令(由于一旦出現(xiàn)多線程競爭的情況就必須撤銷偏向鎖,所以偏向鎖的撤銷操作的性能損耗必須小于節(jié)省下來的CAS原子指令的性能消耗)。

  • 偏向鎖獲取過程:
      (1) 訪問Mark Word中偏向鎖的標識是否設(shè)置成1,鎖標志位是否為01——確認為可偏向狀態(tài)。
      (2) 如果為可偏向狀態(tài),則測試線程ID是否指向當前線程,如果是,進入步驟(5),否則進入步驟(3)。
      (3) 如果線程ID并未指向當前線程,則通過CAS操作競爭鎖。如果競爭成功,則將Mark Word中線程ID設(shè)置為當前線程ID,然后執(zhí)行(5);如果競爭失敗,執(zhí)行(4)。
      (4)如果CAS獲取偏向鎖失敗,則表示有競爭。當?shù)竭_全局安全點(safepoint)時獲得偏向鎖的線程被掛起,偏向鎖升級為輕量級鎖,然后被阻塞在安全點的線程繼續(xù)往下執(zhí)行同步代碼。
      (5) 執(zhí)行同步代碼。

  • 偏向鎖的釋放:
    偏向鎖的撤銷在上述第四步驟中有提到偏向鎖只有遇到其他線程嘗試競爭偏向鎖時,持有偏向鎖的線程才會釋放鎖,線程不會主動去釋放偏向鎖。偏向鎖的撤銷,需要等待全局安全點(在這個時間點上沒有字節(jié)碼正在執(zhí)行),它會首先暫停擁有偏向鎖的線程,判斷鎖對象是否處于被鎖定狀態(tài),撤銷偏向鎖后恢復(fù)到未鎖定(標志位為“01”)或輕量級鎖(標志位為“00”)的狀態(tài)。

輕量級鎖

“輕量級”是相對于使用操作系統(tǒng)互斥量來實現(xiàn)的傳統(tǒng)鎖而言的。但是,首先需要強調(diào)一點的是,輕量級鎖并不是用來代替重量級鎖的,它的本意是在沒有多線程競爭的前提下,減少傳統(tǒng)的重量級鎖使用產(chǎn)生的性能消耗。在解釋輕量級鎖的執(zhí)行過程之前,先明白一點,輕量級鎖所適應(yīng)的場景是線程交替執(zhí)行同步塊的情況,如果存在同一時間訪問同一鎖的情況,就會導(dǎo)致輕量級鎖膨脹為重量級鎖。

  • 輕量級鎖的加鎖過程
      (1)在代碼進入同步塊的時候,如果同步對象鎖狀態(tài)為無鎖狀態(tài)(鎖標志位為“01”狀態(tài),是否為偏向鎖為“0”),虛擬機首先將在當前線程的棧幀中建立一個名為鎖記錄(Lock Record)的空間,用于存儲鎖對象目前的Mark Word的拷貝,官方稱之為 Displaced Mark Word。這時候線程堆棧與對象頭的狀態(tài)如圖2.1所示。
      (2)拷貝對象頭中的Mark Word復(fù)制到鎖記錄中。
     ?。?)拷貝成功后,虛擬機將使用CAS操作嘗試將對象的Mark Word更新為指向Lock Record的指針,并將Lock record里的owner指針指向object mark word。如果更新成功,則執(zhí)行步驟(3),否則執(zhí)行步驟(4)。
     ?。?)如果這個更新動作成功了,那么這個線程就擁有了該對象的鎖,并且對象Mark Word的鎖標志位設(shè)置為“00”,即表示此對象處于輕量級鎖定狀態(tài),這時候線程堆棧與對象頭的狀態(tài)如圖2.2所示。
     ?。?)如果這個更新操作失敗了,虛擬機首先會檢查對象的Mark Word是否指向當前線程的棧幀,如果是就說明當前線程已經(jīng)擁有了這個對象的鎖,那就可以直接進入同步塊繼續(xù)執(zhí)行。否則說明多個線程競爭鎖,輕量級鎖就要膨脹為重量級鎖,鎖標志的狀態(tài)值變?yōu)椤?0”,Mark Word中存儲的就是指向重量級鎖(互斥量)的指針,后面等待鎖的線程也要進入阻塞狀態(tài)。 而當前線程便嘗試使用自旋來獲取鎖,自旋就是為了不讓線程阻塞,而采用循環(huán)去獲取鎖的過程。


    輕量級鎖CAS操作之前堆棧與對象的狀態(tài)
輕量級鎖CAS操作之后堆棧與對象的狀態(tài)
  • 輕量級鎖的解鎖過程:
     ?。?)通過CAS操作嘗試把線程中復(fù)制的Displaced Mark Word對象替換當前的Mark Word。
      (2)如果替換成功,整個同步過程就完成了。
      (3)如果替換失敗,說明有其他線程嘗試過獲取該鎖(此時鎖已膨脹),那就要在釋放鎖的同時,喚醒被掛起的線程。

重量級鎖

TBD

重量級鎖、輕量級鎖和偏向鎖之間轉(zhuǎn)換

三種鎖的轉(zhuǎn)換圖

volatile 關(guān)鍵字

又是面試中經(jīng)常會被問到的一個Java關(guān)鍵字,如果用volatile聲明一個變量為共享的,一個線程修改了某個變量的值,這個更新的值會立即寫入內(nèi)存中,從而對其他線程來說是立即可見的。

然而比較坑的結(jié)果,volatile只能在很小的范圍內(nèi)保證互斥性,如果對volatile變量本身的操作不是線程安全的,比如++,那么同樣是有問題的。

volatile僅僅用來保證該變量對所有線程的可見性,但不保證原子性

具體的使用場景參照Java 理論與實踐: 正確使用 Volatile 變量。不過我膚淺的總結(jié)起來就是盡量不要用volatile來實現(xiàn)互斥。。。。

還有個典型的使用volatile的場景實在多線程環(huán)境下lazy load的單例模式,參考Java 單例真的寫對了么?

volatile底層實現(xiàn)

雖然volatile不建議使用,但是還是有必要探究一下它在底層是如何實現(xiàn)的,因為有助于更好的理解JVM的編譯機制。

為了優(yōu)化性能,編譯器和CPU可能對某些指令進行重排。大家都知道java代碼最終會被編譯成匯編指令,而一條java語句可能對應(yīng)多條匯編指令。為了優(yōu)化性能,CPU和編譯器會對這些指令重排,volatile的變量在進行操作只會在尾部添加一個內(nèi)存屏障(Memory Barrier),lock addl $0x0,(%rsp)。它可以:a) 確保一些特定操作執(zhí)行的順序; b) 影響一些數(shù)據(jù)的可見性(可能是某些指令執(zhí)行后的結(jié)果)。編譯器和CPU可以在保證輸出結(jié)果一樣的情況下對指令重排序,使性能得到優(yōu)化。插入一個內(nèi)存屏障,相當于告訴CPU和編譯器先于這個命令的必須先執(zhí)行,后于這個命令的必須后執(zhí)行。內(nèi)存屏障另一個作用是強制更新一次不同CPU的緩存。例如,一個寫屏障會把這個屏障前寫入的數(shù)據(jù)刷新到緩存,這樣任何試圖讀取該數(shù)據(jù)的線程將得到最新值,而不用考慮到底是被哪個cpu核心或者哪顆CPU執(zhí)行的。所以一旦你完成寫入,任何訪問這個變量的線程將會得到最新的值。而且在你寫入前,會保證所有之前發(fā)生的事已經(jīng)發(fā)生,并且任何更新過的數(shù)據(jù)值也是可見的,因為內(nèi)存屏障會把之前的寫入值都刷新到緩存。

更多volatile的細節(jié),請看深入理解Java內(nèi)存模型(四)——volatile

concurrent包

java.util.concurrent在java 1.5以后提供了另一種多線程編程的方式,主要提供了各種鎖,線程池和基本類型的atomic版本。

大致的讀了一下代碼,以下通過一些源碼探索底層的實現(xiàn)。

ReentrentLock

ReentrentLock和synchronized關(guān)鍵字類似,也是實現(xiàn)一種可重入的鎖。

ReentrentLock實現(xiàn)了Lock接口,內(nèi)部定義了兩個Sync類,F(xiàn)airSync和NonFairSync,分別實現(xiàn)公平鎖和非公平鎖。 該類又繼承自AbstractQueuedSynchronizer類。類圖如下:

自己畫的一個簡單的類圖

AbstractQueuedSynchronizer實際上在內(nèi)部維護了一個列表形式的等待隊列,每個node都記錄了一個線程和等待的狀態(tài)。

關(guān)鍵代碼:

ReentrantLock

public void lock() {
    sync.lock();
}

Sync

abstract void lock();

抽象方法,在FairSync和NonFairSync里都有相應(yīng)的實現(xiàn),先看FairSync

final void lock() {
    acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

關(guān)鍵的地方來了,tryAcquire里有個compareAndState方法。

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

它會調(diào)用unsafe的compareAndSwapInt,查看一下Unsafe類,這是一系列基于JNI的API定義類,其中有一些compareAndSwap方法,縮寫為CAS。這個方法會在CPU級別來支持原子性。

C++代碼

// Adding a lock prefix to an instruction on MP machine
// VC++ doesn't like the lock prefix to be on a single line
// so we can't insert a label after the lock prefix.
// By emitting a lock prefix, we can define a label after it.
#define LOCK_IF_MP(mp) __asm cmp mp, 0  \
                   __asm je L0      \
                   __asm _emit 0xF0 \
                   __asm L0:

inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
  // alternative for InterlockedCompareExchange
  int mp = os::is_MP();
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}

程序會根據(jù)當前處理器的類型來決定是否為cmpxchg指令添加lock前綴。如果程序是在多處理器上運行,就為cmpxchg指令加上lock前綴(lock cmpxchg)。反之,如果程序是在單處理器上運行,就省略lock前綴(單處理器自身會維護單處理器內(nèi)的順序一致性,不需要lock前綴提供的內(nèi)存屏障效果)。

更多CAS的原理,可以參考 JAVA CAS原理深度分析

回到FairSync中,tryAcquire方法會檢查:

  • 如果鎖數(shù)量為0,如果當前線程是等待隊列中的頭節(jié)點,基于CAS嘗試將state(鎖數(shù)量)從0設(shè)置為1一次,如果設(shè)置成功,設(shè)置當前線程為獨占鎖的線程
  • 如果鎖數(shù)量不為0或者當前線程不是等待隊列中的頭節(jié)點或者上邊的嘗試又失敗了,查看當前線程是不是已經(jīng)是獨占鎖的線程了,如果是,則將當前的鎖數(shù)量+1(可重入)。如果不是, 則將該線程封裝在一個Node內(nèi),并加入到等待隊列中去, 等待被其前一個線程節(jié)點喚醒。

如果第一時間沒有獲取到鎖,沒關(guān)系,接著

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

這種死循環(huán)的方式申請鎖有個好聽的名字,叫“自旋”。

lock看完了再看看unlock,原理類似:
Reentrant

public void unlock() {
    sync.release(1);
}

Sync

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

/**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

LockSupport.park()、LockSupport.unpark()底層還是調(diào)用Unsafe累的park/unpark方法,作用分別是阻塞線程和解除阻塞線程,且park()和unpark()不會遇到“Thread.suspend ()和 Thread.resume所可能引發(fā)的死鎖”問題。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容