重學(xué)Java-扒一扒Java并發(fā)

關(guān)于Java并發(fā)的書籍和文章已經(jīng)有很多了,但是就我自己的學(xué)習(xí)下來的感受來說,有一些看似簡單的知識點,以至于大神們和文章的作者們都直接忽略了,但是這些知識點卻很重要,如果不搞清楚,很難“徹底理解、融會貫通”,這種似懂非懂的感覺讓我很難受,所以我總結(jié)了這篇文章,可能不會有什么牛X的技術(shù),高深的理論,但是這些思考曾經(jīng)讓我對Java并發(fā)的認(rèn)知更進(jìn)了一步,送給你們。

先提幾個曾經(jīng)困擾過我的問題啊,看似很簡單,而且可能還有很多同學(xué)還存在誤解,我們來一起看一下。

  • 問題一:經(jīng)常聽說“主內(nèi)存“,”工作內(nèi)存“,那它們到底指什么? 或者說它們以何種形式存在?
  • 問題二:我們還經(jīng)常聽說“可見性”,到底什么是可見性?為什么會出現(xiàn)“不可見”的情況?
  • 問題三:你肯定還聽說過“原子性”,那什么是原子性?哪些操作可以認(rèn)為是原子的?
  • 問題四:”有序性“,代碼真的按我們寫的先后順序執(zhí)行嗎?背后有什么玄機(jī)?

如果這些問題也曾困擾過你,那這篇文章最合適你不過了,接下來我們一起進(jìn)入Java的世界扒一扒并發(fā)。

什么是主內(nèi)存,工作內(nèi)存

這2個概念是Java內(nèi)存模型(Java Memory Model)中提出的,關(guān)于內(nèi)存模型的詳細(xì)介紹猛戳這里,我們目前只需要知道內(nèi)存模型是幫我們屏蔽底層硬件細(xì)節(jié)的,程序員只需要按照它的規(guī)則來寫代碼,寫的程序就可以實現(xiàn)跨平臺運行,很巧妙的設(shè)計。

了解了內(nèi)存模型,我們回到主題,我們知道JVM將內(nèi)存劃分了以下幾大塊

  • 堆 (進(jìn)程內(nèi)所有線程共享)
  • 方法區(qū) (進(jìn)程內(nèi)所有線程共享)
  • 虛擬機(jī)棧 (每個線程獨立)
  • native本地方法棧 (每個線程獨立)
  • pc計數(shù)器 (每個線程獨立)

那主內(nèi)存,工作內(nèi)存跟它們的對應(yīng)關(guān)系是怎么樣的呢?
這里直接給出結(jié)論。

  • 主內(nèi)存就是堆 + 方法區(qū)
  • 工作內(nèi)存就是虛擬機(jī)棧 + native本地方法棧 + pc計數(shù)器

這個知識點看似不起眼,但是卻很重要,因為只有有了這個結(jié)論,才能與我們后面的實際代碼例子結(jié)合起來,否則就會感覺理論與實際操作脫節(jié)了,沒法對應(yīng)起來。

舉個例子

需求是這樣的,有一個Counter計數(shù)器類,內(nèi)部有一個count成員變量int類型,記錄當(dāng)前的總數(shù),具體定義如下。

public class Counter {
    private int count;
    public void increment() {
        this.count++;
    }
    public int getCount() {
        return this.count;
    }
}

我們現(xiàn)在的任務(wù)是調(diào)用一億次increment方法,然后打印count的數(shù)量,那么顯然正確的輸出應(yīng)該是一億。

public static void main(String[] args) {
   // 單線程代碼
   Counter counter = new Counter();
   int loopCount = 100000000;
   long startTime = System.currentTimeMillis();
   for (int i = 0; i < loopCount; i++) {
       counter.increment();
   }
   System.out.println("count:" + counter.getCount());  
   System.out.println("take time:" + (System.currentTimeMillis() - startTime) + "ms");
}

// 輸出結(jié)果
count:100000000
take time:577ms

so easy!這樣的代碼我們太熟悉了,但是這次我想從代碼在虛擬機(jī)棧中的具體執(zhí)行過程來加深理解程序是怎么運作的。先通過Javac和Javap命令查看Counter類的increment方法的字節(jié)碼實現(xiàn)。

javac Counter.java
javap -verbose Counter.class

// Counter類 increment方法的字節(jié)碼
  public void increment();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=3, locals=1, args_size=1
         0: aload_0
         1: dup
         2: getfield      #2                  // Field count:I
         5: iconst_1
         6: iadd
         7: putfield      #2                  // Field count:I
        10: return

我們知道Java中方法的調(diào)用是基于棧幀實現(xiàn)的,每個棧幀中主要包含操作數(shù)棧+用到類的運行時常量池引用+本地變量表。我畫了一張示意圖幫助大家理解整個執(zhí)行過程,并且將其中一次count++操作的字節(jié)碼在操作數(shù)棧中的執(zhí)行步驟分解了(以count=10為例),這里要注意,下面這張圖的運行是在工作內(nèi)存中(main方法所在線程的虛擬機(jī)棧中)。

image

過程其實比較簡單,我們寫的代碼在底層就這樣運行著,是不是一點點興奮了。
好,到這里,我要總結(jié)方法論了!上面的代碼之所以在單線程中運行正確,因為滿足了以下三個條件!

  • 循環(huán)從0到一億,是嚴(yán)格按順序執(zhí)行的(有序性)
  • 循環(huán)過程中,前一次對count的修改對后面可見(可見性)
  • 因為是嚴(yán)格按順序執(zhí)行的,所以count++操作中間不會交叉執(zhí)行,所以其實在單線程環(huán)境中,可以認(rèn)為滿足原子性 (原子性)

上面的條件只要有一個被打破,執(zhí)行的結(jié)果就可能不正確,這也就是為什么Java多線程環(huán)境下容易出現(xiàn)并發(fā)問題,原因就是沒有同時滿足這三個條件。

多線程為什么會出現(xiàn)并發(fā)問題?

上面已經(jīng)提到,我們上面的Count類的實例中,需同時滿足 有序性,可見性,原子性,其中有序性和原子性,我們比較容易想到因為多個線程交叉執(zhí)行,如果不加同步控制,有序性和原子性肯定沒法保證,但是這里比較難理解的是可見性,骨頭先撿難啃的啃,所以接下來我們先談?wù)効梢娦浴?/p>

什么是可見性?為什么會出現(xiàn)”不可見“

還是以我們上面的示例來說明。
我們已經(jīng)知道

counter.increment();

編譯成字節(jié)碼為

getfield      #2    
iconst_1
iadd
putfield      #2    

前面已經(jīng)說過,這里的字節(jié)碼的執(zhí)行過程是在工作內(nèi)存中,但是getField和putField這二條指令其實是跟主內(nèi)存有交互的,這里還是以Counter類的increment方法為例。

  • getField指令會從主存中讀取count的值,但是并不是每次都從主存中讀,因為CPU高速cache的存在,我們count值有可能會從cache中讀,導(dǎo)致讀的并不是最新的
  • putField指令會將count新的值寫入主內(nèi)存,但是也不是立即生效,別的CPU的高速cache中的count不會立即更新,CPU會使用緩存一致性協(xié)議來做同步,這個對我們是透明的。

正是因為CPU高速cache的存在,在多核環(huán)境中會有可見性的問題。這里額外提一句 ,之所以有高速cache存在,是為提高運行效率,現(xiàn)代CPU的速度比我們的內(nèi)存快很多,如果每次都鎖總線寫主存,會導(dǎo)致執(zhí)行速度下降很多,這是不可以接受的,木桶理論我們都能理解哈。這里我也畫了一張圖,來幫助大家理解。

image

那有沒有辦法解決可見性帶來的問題呢?當(dāng)然是有的,對于Java,我們可以使用volatile關(guān)鍵字。

volatile

volatile修飾的變量有下面的特性

  • 在寫volatile的時候,有monitor release的語義,會刷新各個cpu中該變量的cache,存入最新的值
  • 在讀volatile的時候,有monitor acquire的語義,會使當(dāng)前cpu的cache中該變量的cache失效,從主存中讀取最新的值
  • volatile擁有禁止指令重排序的語義

其中monitor可以理解為鎖,moniter release就是釋放鎖,monitor acquire就是獲取鎖,這樣就是volatile變量的讀寫都是直接對主存操作的,相當(dāng)于犧牲一部分性能來換取可見性,這一部分犧牲的性能一般是可以忽略不計的,只需要知道有這么回事就行。

volatile實現(xiàn)原理

給count加上volatile修飾符后,查看編譯后的字節(jié)碼后會發(fā)現(xiàn),字節(jié)碼層面唯一的變化是給count添加了ACC_VOLATILE標(biāo)識flag,在運行時會根據(jù)這個flag會自動插入內(nèi)存屏障,保證volatile可見性語義,內(nèi)存屏障一共有四種,分別是:

  • LoadLoad
  • LoadStore
  • StoreStore
  • StoreLoad

這里有個文檔,比較權(quán)威詳細(xì)的說明了內(nèi)存屏障的知識,這一塊知識大家可以自己繼續(xù)深入。這里給出文檔中的一個實例,比較形象的說明了內(nèi)存屏障是怎么插入的。

image

再回到上面的例子,我們給count添加上volatile修飾符之后,是不是就能在多線程中得到正確的累加結(jié)果呢?我們試驗一下,簡單起見,我們只開2個線程,每個線程分配一半的計算量。

// Counter.java
private volatile int count;

// main 方法
    Counter counter = new Counter();
    int loopCount = 100000000;
    int halfCount = loopCount / 2;
    Thread thread1 = new Thread(() -> {
        for (int i = 0; i < halfCount; i++) {
            counter.increment();
        }
    });

    Thread thread2 = new Thread(() -> {
        for (int i = halfCount; i < loopCount; i++) {
            counter.increment();
        }
    });

    thread1.start();
    thread2.start();

    try {
        thread1.join();
        thread2.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("count:" + counter.getCount());
    System.out.println("take time:" + (System.currentTimeMillis() - startTime) + "ms");
    
// 運行結(jié)果
count:51743664
take time:2335ms

結(jié)果顯然還是不對的,而且程序運行的時間長了好幾倍了。這是因為volatile只保證了可見性,卻沒有原子性語義,比如下面這種情況

image

在T1-T6時間內(nèi),初始count=0,經(jīng)過二次++操作,最后count的值還是1,在我們上面的例子中,5千萬次的循環(huán)會出現(xiàn)大量類似的錯誤覆蓋寫入。根據(jù)我們上面分析的volatile的語義,在T5時刻,Thread1對count的修改對Thread2是可見的,這里的可見指的是,如果此時調(diào)用getfield指令,拿到的值會是Thread1修改的最新的1,但是遺憾的是,Thread2對此一無所知,只是按著自己的步驟將錯誤的1寫入了count中。

那我們不妨設(shè)想下,如果在putfield之前,檢查下當(dāng)前棧中存儲的count是不是最新的,如果不是最新的重新讀取count,然后重試,如果是最新的,直接寫入更新值,似乎這樣就能解決我們上面出現(xiàn)的錯誤寫入的問題。看起來似乎是一個不錯的想法,但是一定要注意,整個檢查過程要保證原子性,否則仍然會有并發(fā)問題。事實上JDK中Unsafe包里面的CAS方法就是這個思路,不斷循環(huán)嘗試,這個過程就是自旋,它的底層實現(xiàn)依賴cmpxchglcmpxchgq這二個匯編指令,不同平臺的cpu有不同的實現(xiàn),但是代碼大同小異,我在這里以opekjdk8為例扒一扒CAS的源碼,源碼比較多我只會貼出關(guān)鍵代碼塊。

// Unsafe.class中的三個CAS方法,都是native的
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

它對應(yīng)的native實現(xiàn)在hotspot/src/share/vm/prims/unsafe.cpp

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

篇幅關(guān)系,這里只貼上compareAndSwapInt的實現(xiàn),可以看到又調(diào)用了Atomic::cmpxchg方法,繼續(xù)跟進(jìn)去

unsigned Atomic::cmpxchg(unsigned int exchange_value,
                         volatile unsigned int* dest, unsigned int compare_value) {
  assert(sizeof(unsigned int) == sizeof(jint), "more work to do");
  return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest,
                                       (jint)compare_value);
}

jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {
  assert(sizeof(jbyte) == 1, "assumption.");
  uintptr_t dest_addr = (uintptr_t)dest;
  uintptr_t offset = dest_addr % sizeof(jint);
  volatile jint* dest_int = (volatile jint*)(dest_addr - offset);
  jint cur = *dest_int;
  jbyte* cur_as_bytes = (jbyte*)(&cur);
  jint new_val = cur;
  jbyte* new_val_as_bytes = (jbyte*)(&new_val);
  new_val_as_bytes[offset] = exchange_value;
  while (cur_as_bytes[offset] == compare_value) {
    jint res = cmpxchg(new_val, dest_int, cur);
    if (res == cur) break;
    cur = res;
    new_val = cur;
    new_val_as_bytes[offset] = exchange_value;
  }
  return cur_as_bytes[offset];
}

我們跟蹤到了調(diào)用了cmpxchg這個方法,這個方法不是在atomic.cpp中定義的,查看atomic.hpp,看到了cmpxchg對應(yīng)的內(nèi)聯(lián)函數(shù)的定義

inline static jint     cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value);
// See comment above about using jlong atomics on 32-bit platforms
inline static jlong    cmpxchg    (jlong    exchange_value, volatile jlong*    dest, jlong    compare_value);

這里我們以solaris_x86平臺為例,cmpxchg對應(yīng)的內(nèi)涵函數(shù)定義在hotspot/src/os_cpu/solaris_x86/vm/atomic_solaris_x86.inline.hpp

#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
inline jint _Atomic_cmpxchg(jint exchange_value, volatile jint* dest, jint compare_value, int mp) {
    __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
    return exchange_value;
  }

這個是內(nèi)嵌匯編代碼,實話說,匯編這一塊的知識我也還給老師了。根據(jù)LOCK_IF_MP這個宏定義判斷是不是多核心,如果是多核心需要加鎖,但是這個鎖是cpu總線鎖,它的代價比我們應(yīng)用層中用的Lock代價小得多。同時我們看到cmpxchgl這個關(guān)鍵的指令。追到這一層,我想對于應(yīng)用開發(fā)工程師已經(jīng)足夠了。了解了底層實現(xiàn),我們來現(xiàn)學(xué)現(xiàn)賣實戰(zhàn)一波。

使用CAS改造我們的加法器Counter,使其是線程安全的

要使用CAS,肯定要使用Unsafe類,我們還是通過反射來獲取Unsafe對象,先看UnsafeUtil類的實現(xiàn)

    // UnsafeUtil.java
    public static Unsafe getUnsafeObject() {
        Class clazz = AtomicInteger.class;
        try {
            Field uFiled = clazz.getDeclaredField("unsafe");
            uFiled.setAccessible(true);
            return (Unsafe) uFiled.get(null);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    
    public static long getVariableOffset(Object target, String variableName) {
        Object unsafeObject = getUnsafeObject();
        if (unsafeObject != null) {
            try {
                Method method = unsafeObject.getClass().getDeclaredMethod("objectFieldOffset", Field.class);
                method.setAccessible(true);
                Field targetFiled = target.getClass().getDeclaredField(variableName);
                return (long) method.invoke(unsafeObject, targetFiled);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return -1;
    }

再來看Counter類的實現(xiàn)

public class Counter {
    private volatile int count;
    private Unsafe mUnsafe;
    private long countOffset;
    public Counter() {
        mUnsafe = UnsafeUtil.getUnsafeObject();
        countOffset = UnsafeUtil.getVariableOffset(this, "count");
    }
    public void increment() {
        int cur = getCount();
        while (!mUnsafe.compareAndSwapInt(this, countOffset, cur, cur+1)) {
            cur = getCount();
        }
    }
    public int getCount() {
        return this.count;
    }
}

再次開啟二個線程,執(zhí)行我們的累加程序

// 輸出結(jié)果
count:100000000
take time:5781ms

可以看到我們得到正確的累加結(jié)果,但是運行時長更長了,但是還好,時間復(fù)雜度還是在一個數(shù)量級上的。這里要注意一點的是,上述示例代碼中,我給count變量增加了volatile關(guān)鍵字,其實就算不加volatile關(guān)鍵字,在這里CAS也是能夠正確工作的,但是效率會低一點,我測試下來差不多性能會低5%左右,大家可以思考下為什么不加volatile效率會低?

volatile關(guān)鍵字還有一個禁止指令重排序的語義,一個經(jīng)典的應(yīng)用就是DCL單例模式,大家應(yīng)該都很熟了,就不贅述了。

到這里,關(guān)于可見性我們已經(jīng)討論的差不多了,接下來我們來討論”原子性“

原子性,怎么保證原子性?

其實上面我們已經(jīng)提及了一些,比如CAS本身就是原子的,那想一想還有哪些是原子的?我這里還是以Counter類的increment方法為例。

     0: aload_0
     1: dup
     2: getfield      #2                  // Field count:I
     5: iconst_1
     6: iadd
     7: putfield      #2                  // Field count:I
    10: return

這7條字節(jié)碼指令,都是原子的,沒有問題吧,但是我們?nèi)绻偻钐幭氲脑?,還是會有疑問,單個字節(jié)碼指令都是原子的嗎? 如果單條都不是原子的,我想我們前面的所有判斷都是錯誤的,因為我們得出結(jié)論的理論基石被打破了。事實是,單條字節(jié)碼就是原子的,這個原子性由誰來保證呢?由Java內(nèi)存模型JMM來保證,程序員不需要知道具體的細(xì)節(jié)。

雖然單條字節(jié)碼是原子的,但是多條字節(jié)碼組合起來就不是原子的了。這也是很多并發(fā)問題發(fā)生的根源。那我們程序員有哪些手段保證原子性呢?大概有以下三種。

  • CAS + 自旋
  • synchronized關(guān)鍵字
  • concurrent包提供的Lock,具體實現(xiàn)類比如ReentrantLock

CAS我們上面已經(jīng)討論過,這里不贅述了,我們來看看synchronized

synchronized

synchronized算是我們最常用的同步方式,主要有以下三種使用方式

// 普通類方法同步
synchronized publid void invoke() {}
// 類靜態(tài)方法同步
synchronized public static void invoke() {}
// 代碼塊同步
synchronized(object) {
}

這三種方式不同之處在于我們進(jìn)行同步的對象不同,普通類synchronized同步的就是對象本身,靜態(tài)方法同步的是類Class本身,代碼塊同步的是我們在括號內(nèi)部填入的對象。本質(zhì)上它們的原理是相同的,都會有一個monitor的對象與我們要進(jìn)行同步的對象進(jìn)行關(guān)聯(lián),當(dāng)有一個線程持有了monitor的鎖后,其他線程必須等待,一直到該線程釋放了該monitor才能被別的線程重新獲取,hotspot虛擬機(jī)中,它在native層對應(yīng)的實現(xiàn)類是ObjectMonitor.hpp,這個類內(nèi)部維護(hù)了很多同步相關(guān)的變量,我們重點關(guān)注二個變量

void *  volatile _owner;          // pointer to owning thread OR BasicLock
ObjectWaiter * volatile _WaitSet; // LL of threads wait()ing on the monitor

_owner代表當(dāng)前持有鎖的線程,—WaitSet代表等待鎖的線程隊列,通過ObjectWaiter的數(shù)據(jù)結(jié)構(gòu)可以推斷這是一個雙向循環(huán)鏈表。

回到我們的實例,這一次我們通過synchronized來實現(xiàn)我們的Counter類。

public void increment() {
    synchronized (Counter.class) {
        this.count++;
    }
}

// 再運行一次,
// 輸出結(jié)果
count:100000000
take time:4881ms

輸出的結(jié)果是正確的,而且從我們打印的運行時間結(jié)果來看,synchronized加鎖后的執(zhí)行速度比我們上面的CAS還要快,這有一點點反直覺,其實synchronized在Java1.7后引入了偏向鎖,輕量級鎖后,synchronized性能有了較大提升,所以在使用synchronized的時候不需要有太多的心理負(fù)擔(dān),一般情況下,對性能不是極度要求高的話,使用synchronized沒有問題。

這里還是貼下increment方法加上synchronized同步后的字節(jié)碼實現(xiàn)。


image

可以看到有一個monitorenter指令,和2個monitorexit指令,其實是因為編譯器自動幫我們添加了try-finally,為了確保monitor一定會被釋放,就算出現(xiàn)運行時異常,所以會有二個monitorexit指令。

// 我們的實現(xiàn)
synchronized (Counter.class) {
    this.count++;
}

// 編譯后,相當(dāng)于下面的偽代碼
monitorenter  Counter.class;
try {
    this.count++;
    monitorexit  Counter.class;
} finally {
    monitorexit  Counter.class;
}

除了synchronized關(guān)鍵字之外,還有個比較常用的用來做同步類就是ReentrantLock

ReentrantLock

ReentrantLock的使用大家應(yīng)該都很熟了,篇幅關(guān)系,這里只簡單提一下用法,更詳細(xì)的使用文檔大家可以自行查閱相關(guān)資料。

// 參數(shù)表示 是否是公平鎖,公平鎖嚴(yán)格按照等待順序獲取鎖,但是吞吐率低性能差
// 非公平鎖性能高,但是有可能會出現(xiàn)鎖等待饑餓
ReentrantLock reentrantLock = new ReentrantLock(false);
// 一個標(biāo)準(zhǔn)的使用方式
reentrantLock.lock();
try {
    // do something
} finally {
    reentrantLock.unlock();
}

這里要注意的是加鎖的lock方法的調(diào)用,一定要在try-catch-finally的前面,不能在內(nèi)部,因為如果在內(nèi)部調(diào)用lock,如果代碼在lock之前就出現(xiàn)異常了,就會出現(xiàn)我們沒有加鎖就執(zhí)行了finally里面的釋放鎖,肯定會有問題。

為了對比,我們還是使用ReentrantLock實現(xiàn)一遍上面Counter累加。

        Lock lock = new ReentrantLock(false);
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < halfCount; i++) {
                lock.lock();
                try {
                    counter.increment();
                } finally {
                    lock.unlock();
                }
            }
        });

        Thread thread2 = new Thread(() -> {
            for (int i = halfCount; i < loopCount; i++) {
                lock.lock();
                try {
                    counter.increment();
                } finally {
                    lock.unlock();
                }            
            }
        });
        

// 運行程序,輸出結(jié)果
count:100000000
take time:12754ms

輸出結(jié)果也是正確的,但是運行時間卻是最長的,差不多是我們用synchronized的三倍。當(dāng)然我們這里并不是搞性能測試,運行的時間也沒什么參考意義,貼出來只是讓大家有個直觀認(rèn)識,那就是CAS未必就一定性能高,synchronized未必就一定性能差,要具體問題具體分析,一定要有質(zhì)疑的意識。

既然說到了Lock,我們還是扒一扒它的實現(xiàn)源碼,這里以非公平鎖為例。

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
    //......

        public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    }

可以看到,lock方法的實現(xiàn),首先會用CAS嘗試去設(shè)置State為1,如果設(shè)置成功,將exclusiveOwnerThread設(shè)置為當(dāng)前thread,如果設(shè)置不成功調(diào)用acquire方法,又調(diào)用了tryAcquire,我們可以重寫該方法來自定義鎖的邏輯,ReentrantLock中的默認(rèn)實現(xiàn)如下

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

首先嘗試獲取state狀態(tài),如果==0嘗試用CAS獲取鎖,如果!=0,檢查是否是當(dāng)前線程擁有鎖,如果是,將state+1,返回true,如果不是返回false。所以這也就是為什么是可重入鎖的原因,允許鎖的嵌套,如果已經(jīng)獲取了鎖,state+1然后返回true。

// 可重入鎖,允許嵌套重復(fù)獲得鎖,偽代碼如下
lock.lock();
lock.lock();
// do something ...
lock.unlock();
lock.unlock();

總結(jié)一下,ReentrantLock的實現(xiàn)是基于隊列同步器AbstractQueuedSynchronizer(AQS)的,而AQS內(nèi)部也是封裝了CAS來實現(xiàn)的,深究下去還是有很多內(nèi)容的,可以說整個concurrent包都是建立在CAS+AQS這二塊基石上的,篇幅關(guān)系,更多的實現(xiàn)細(xì)節(jié)我們這里就不討論了,大家可以自行參考相關(guān)的源碼分析文章加深理解。

到這里,我們討論了Java中的”原子性“,以及如果保證”原子性“的三種常規(guī)手段,原子性的討論就結(jié)束了。接下來我們進(jìn)入最后一個問題,Java的”有序性“。

有序性

為了方便說明,我們首先舉一個簡單的例子

int a = 1;   ①
int b = 2;   ②
int c = a;   ③

按我們的預(yù)期,執(zhí)行的順序應(yīng)該是①②③,但是
真實的執(zhí)行順序可能是②①③或者①③②,這是因為指令重排序的原因,基本有二種重排序

  • CPU級的指令重排序
  • 編譯器級的指令重排序

重排序的目的是優(yōu)化我們的程序運行速度,但是優(yōu)化的前提是不能破壞as-if-serial語義,簡單來說,以上面的例子為例,③由于有依賴①的結(jié)果,所以它需要永遠(yuǎn)排在①后面執(zhí)行。
在單線程有序性還比較容易保證,但是在多線程情況就會變得復(fù)雜起來。所以JMM中抽象出了一個happen-before原則,這個原則是JMM給我們開發(fā)者的承諾,讓我們寫代碼時對多線程情況下的有序性有一個正確的預(yù)期。這個原則有下面5條。

  • 同一個線程中,程序中前面的代碼happen-before后面的代碼
  • 對一個monitor的解鎖happen-before對這個monitor的加鎖
  • 對一個volatile變量的寫happen-before對這個volatile變量的讀
  • 線程start方法調(diào)用happen-before線程內(nèi)的所有action
  • 在A線程調(diào)用了B線程的join,則B線程內(nèi)的操作happen-before于A線程后續(xù)的操作

當(dāng)然happen-before具有傳遞性,如果A happen-before B, B happen-before C,則A 也 happen-before C。
需要注意的是,happen-before并不完全等同于時間意義上的先執(zhí)行,比如上面的例子中,根據(jù)第一條happen-before原則,int a = 1; 這條語句 happen-before int b = 2; 這條語句,但是由于二者之間沒有依賴關(guān)系,可以指令重排,所以可以是 int b = 2;先執(zhí)行,這是合法的,并不違背happen-before原則。

理解這幾條happen-before原則后,很多我們平時經(jīng)常寫的并發(fā)代碼就有了理論依據(jù),比如第二條,加鎖happen-before解鎖,所以保證了鎖的同步范圍內(nèi)的代碼,具有原子性和有序性,同時加鎖和解鎖都會插入內(nèi)存屏障,可見性也得到保障,所以加鎖后的代碼是線程安全的。再比如第三條,volatile的寫happen-before于volatile的讀,有了這一條,多線程之間volatile修飾的共享變量的可見性得到保證。

另外幾條原則比較好理解,大家可以自行結(jié)合實際代碼加深理解,這里就不贅述了。

總結(jié)

Java并發(fā)算是一個比較高級的主題,但是這一塊的知識又是高級工程師必須掌握的,骨頭再難啃也得啃,希望本文的一些總結(jié)能幫助到希望深入了解Java并發(fā)的同學(xué),哪怕是其中能有一點,能讓你在閱讀中有豁然開朗的感覺,我的目的就達(dá)到了。

最后,來針雞血,”怕什么真理無窮,進(jìn)一寸有一寸的歡喜“。

碼字不易,如果喜歡點個贊唄!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容