ConcurrentLinkedQueue 源碼分析 (基于Java 8)

ConcurrentLinkedQueue

通過名字大家就可以知道, 這是一個(gè)通過鏈表實(shí)現(xiàn)的并發(fā)安全的隊(duì)列, 它應(yīng)該是java中并發(fā)環(huán)境下性能最好的隊(duì)列, 為什么呢? 因?yàn)樗牟蛔冃?invariants) 與可變性(non-invariants)

1. 基本原則不變性(fundamental invariants)
1.整個(gè)隊(duì)列中一定會(huì)存在一個(gè) node(node.next = null), 并且僅存在一個(gè), 但tail引用不一定指向它
2. 隊(duì)列中所有 item != null 的節(jié)點(diǎn), head一定能夠到達(dá); cas 設(shè)置 node.item = null, 意味著這個(gè)節(jié)點(diǎn)被刪除
head引用的不變性和可變性
不變性(invariants)
1. 所有的有效節(jié)點(diǎn)通過 succ() 方法都可達(dá)
2. head != null
3. (tmp = head).next != tmp || tmp != head (其實(shí)就是 head.next != head)
 
可變性(Non-invariants)
1. head.item 可能是 null, 也可能不是 null
2. 允許 tail 滯后于 head, 也就是調(diào)用 succ() 方法, 從 head 不可達(dá)tail
tail 引用的不變性和可變性
不變性(invariants)
1. tail 節(jié)點(diǎn)通過succ()方法一定到達(dá)隊(duì)列中的最后一個(gè)節(jié)點(diǎn)(node.next = null)
2. tail != null
 
可變性(Non-invariants)
1. tail.item 可能是 null, 也可能不是 null
2. 允許 tail 滯后于 head, 也就是調(diào)用 succ() 方法, 從 head 不可達(dá)tail
3. tail.next 可能指向 tail

這些不變性(invariants) 和 可變性(Non-invariants) 造成 ConcurrentLinkedQueue 有些異于一般queue的特點(diǎn):

1. head 與 tail 都有可能指向一個(gè) (item = null) 的節(jié)點(diǎn)
2. 如果 queue 是空的, 則所有 node.item = null
3. queue剛剛創(chuàng)建時(shí) head = tail = dummyNode
4. head/tail 的 item/next 的操作都是通過 CAS

暈了, 是哇! 沒事, 這些都是特性, 我們先看代碼, 回頭再回顧這些特性.

2. 內(nèi)部節(jié)點(diǎn) Node
import com.lami.tuomatuo.search.base.concurrent.unsafe.UnSafeClass;
import sun.misc.Unsafe;

/**
 * http://hg.openjdk.java.net/jdk7/jdk7/jdk/file/9b8c96f96a0f/src/share/classes/sun/misc/Unsafe.java
 * http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/9b0ca45cd756/src/share/vm/prims/unsafe.cpp
 * http://mishadoff.com/blog/java-magic-part-4-sun-dot-misc-dot-unsafe/
 *
 * Created by xjk on 1/13/17.
 */
public class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item){
        /**
         * Stores a reference value into a given Java variable.
         * <p>
         * Unless the reference <code>x</code> being stored is either null
         * or matches the field type, the results are undefined.
         * If the reference <code>o</code> is non-null, car marks or
         * other store barriers for that object (if the VM requires them)
         * are updated.
         * @see #putInt(Object, int, int)
         *
         * 將 Node 對(duì)象的指定 itemOffset 偏移量設(shè)置 一個(gè)引用值
         */
        unsafe.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val){
        /**
         * Atomically update Java variable to <tt>x</tt> if it is currently
         * holding <tt>expected</tt>.
         * @return <tt>true</tt> if successful
         * 原子性的更新 item 值
         */
        return unsafe.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val){
        /**
         * Version of {@link #putObjectVolatile(Object, long, Object)}
         * that does not guarantee immediate visibility of the store to
         * other threads. This method is generally only useful if the
         * underlying field is a Java volatile (or if an array cell, one
         * that is otherwise only accessed using volatile accesses).
         *
         * 調(diào)用這個(gè)方法和putObject差不多, 只是這個(gè)方法設(shè)置后對(duì)應(yīng)的值的可見性不一定得到保證,
         * 這個(gè)方法能起這個(gè)作用, 通常是作用在 volatile field上, 也就是說, 下面中的參數(shù) val 是被volatile修飾
         */
        unsafe.putOrderedObject(this, nextOffset, val);
    }

    /**
     * Atomically update Java variable to <tt>x</tt> if it is currently
     * holding <tt>expected</tt>.
     * @return <tt>true</tt> if successful
     *
     * 原子性的更新 nextOffset 上的值
     *
     */
    boolean casNext(Node<E> cmp, Node<E> val){
        return unsafe.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    private static Unsafe unsafe;
    private static long itemOffset;
    private static long nextOffset;

    static {
        try {
            unsafe = UnSafeClass.getInstance();
            Class<?> k = Node.class;
            itemOffset = unsafe.objectFieldOffset(k.getDeclaredField("item"));
            nextOffset = unsafe.objectFieldOffset(k.getDeclaredField("next"));
        }catch (Exception e){

        }
    }
}

整個(gè)內(nèi)部節(jié)點(diǎn) Node 的代碼比較簡(jiǎn)單, 若不了解 Unsafe 類使用的, 請(qǐng)點(diǎn)擊鏈接 Unsafe 與 LockSupport

3. ConcurrentLinkedQueue 內(nèi)部屬性及構(gòu)造方法
/** head 節(jié)點(diǎn) */
private transient volatile Node<E> head;
/** tail 節(jié)點(diǎn) */
private transient volatile Node<E> tail;

public ConcurrentLinkedList() {
    /** 默認(rèn)會(huì)構(gòu)造一個(gè) dummy 節(jié)點(diǎn)
     * dummy 的存在是防止一些特殊復(fù)雜代碼的出現(xiàn) 
     */
    head = tail = new Node<E>(null);
}

初始化 ConcurrentLinkedQueue時(shí) head = tail = dummy node.

4. 查詢后繼節(jié)點(diǎn)方法 succ()
/**
 * 獲取 p 的后繼節(jié)點(diǎn), 若 p.next = p (updateHead 操作導(dǎo)致的), 則說明 p 已經(jīng) fall off queue, 需要 jump 到 head
 */
final Node<E> succ(Node<E> p){
    Node<E> next = p.next;
    return (p == next)? head : next;
}

獲取一個(gè)節(jié)點(diǎn)的后繼節(jié)點(diǎn)不是 node.next 嗎, No, No, No, 還有特殊情況, 就是tail 指向一個(gè)哨兵節(jié)點(diǎn) (node.next = node); 代碼的注釋中我提到了 哨兵節(jié)點(diǎn)是 updateHead 導(dǎo)致的, 那我們來看 updateHead方法.

5. 特別的更新頭節(jié)點(diǎn)方法 updateHead

為什么說 updateHead 特別呢? 還是看代碼

/**
 * Tries to CAS head to p, If successfully, repoint old head to itself
 * as sentinel for succ(), blew
 *
 * 將節(jié)點(diǎn) p設(shè)置為新的節(jié)點(diǎn)(這是原子操作),
 * 之后將原節(jié)點(diǎn)的next指向自己, 直接變成一個(gè)哨兵節(jié)點(diǎn)(為queue節(jié)點(diǎn)刪除及garbage做準(zhǔn)備)
 *
 * @param h
 * @param p
 */
final void updateHead(Node<E> h, Node<E> p){
    if(h != p && casHead(h, p)){
        h.lazySetNext(h);
    }
}

主要這個(gè) h.lazySetNext(h), 將 h.next -> h 直接變成一個(gè)哨兵節(jié)點(diǎn), 這種lazySetNext主要用于無阻塞數(shù)據(jù)結(jié)構(gòu)的 nulling out, 要了解詳情 點(diǎn)擊 Unsafe 與 LockSupport
有了上面的這些輔助方法, 我們開始進(jìn)入正題

6. 入隊(duì)列操作 offer()

一般我們的思維: 入隊(duì)操作就是 tail.next = newNode; 而這里不同, 為什么呢? 我們?cè)賮砘仡櫼幌?tail 的不變性和可變性

不變性(invariants)
1. tail 節(jié)點(diǎn)通過succ()方法一定到達(dá)隊(duì)列中的最后一個(gè)節(jié)點(diǎn)(node.next = null)
2. tail != null

可變性(Non-invariants)
1. tail.item 可能是 null, 也可能不是 null
2. 允許 tail 滯后于 head, 也就是調(diào)用 succ() 方法, 從 head 不可達(dá)tail
3. tail.next 可能指向 tail

主要是這里 tail 會(huì)滯后于 head, 所以呢 要找到正真的 last node (node.next = null)
直接來代碼

/**
 * Inserts the specified element at the tail of this queue
 * As the queue is unbounded, this method will never return {@code false}
 *
 * @param e {@code true} (as specified by {@link Queue#offer(Object)})
 * @return NullPointerException if the specified element is null
 *
 * 在隊(duì)列的末尾插入指定的元素
 */
public boolean offer(E e){
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e); // 1. 構(gòu)建一個(gè) node

    for(Node<E> t = tail, p = t;;){ // 2. 初始化變量 p = t = tail
        Node<E> q = p.next;  // 3. 獲取 p 的next
        if(q == null){      // q == null, 說明 p 是 last Node
            // p is last node
            if(p.casNext(null, newNode)){   // 4. 對(duì) p 進(jìn)行 cas 操作, newNode -> p.next
                // Successful CAS is the linearization point
                // for e to become an element of the queue,
                // and for newNode to become "live"
                if(p != t){ // 5. 每每經(jīng)過一次 p = q 操作(向后遍歷節(jié)點(diǎn)), 則 p != t 成立, 這個(gè)也說明 tail 滯后于 head 的體現(xiàn)
                    casTail(t, newNode); // Failure is OK
                }
                return true;
            }
        }
        else if(p == q){  // 6. (p == q) 成立, 則說明p是pool()時(shí)調(diào)用 "updateHead" 導(dǎo)致的(刪除頭節(jié)點(diǎn)); 此時(shí)說明 tail 指針已經(jīng) fallen off queue, 所以進(jìn)行 jump 操作, 若在t沒變化, 則 jump 到 head, 若 t 已經(jīng)改變(jump操作在另外的線程中執(zhí)行), 則jump到 head 節(jié)點(diǎn), 直到找到 node.next = null 的節(jié)點(diǎn)
            /** 1. 大前提 p 是已經(jīng)被刪除的節(jié)點(diǎn)
             *  2. 判斷 tail 是否已經(jīng)改變
             *      1) tail 已經(jīng)變化, 則說明 tail 已經(jīng)重新定位
             *      2) tail 未變化, 而 tail 指向的節(jié)點(diǎn)是要?jiǎng)h除的節(jié)點(diǎn), 所以讓 p 指向 head
             *  判斷尾節(jié)點(diǎn)是否有變化
             *  1. 尾節(jié)點(diǎn)變化, 則用新的尾節(jié)點(diǎn)
             *  2. 尾節(jié)點(diǎn)沒變化, 將 tail 指向head
             *
             *  public void test(){
             *        String tail = "";
             *        String t = (tail = "oldTail");
             *        tail = "newTail";
             *        boolean isEqual = t != (t = tail); // <- 神奇吧
             *        System.out.println("isEqual : "+isEqual); // isEqual : true
             *  }
             */
            p = (t != (t = tail))? t : head;
        }else{
            // 7. (p != t) -> 說明執(zhí)行過 p = q 操作(向后遍歷操作), "(t != (t = tail)))" -> 說明尾節(jié)點(diǎn)在其他的線程發(fā)生變化
            // 為什么 "(t != (t = tail)))" 一定要滿足呢, 因?yàn)?tail變更, 節(jié)省了 (p = q) 后 loop 中的無畏操作, tail 更新說明 q節(jié)點(diǎn)肯定也是無效的
            p = (p != t && (t != (t = tail))) ? t : q;
        }
    }
}

先瞄一下這段代碼: 發(fā)現(xiàn)有3大疑惑:

  1. 明明 Node<E> q = p.next, 怎么會(huì)有 p = q ?
  2. "p = (t != (t = tail))? t : head" 這段代碼是什么玩意, 是不是讓你直接懷疑自己的java基礎(chǔ)了, 不急我們慢慢來.
  3. 最后就是 "p = (p != t && (t != (t = tail))) ? t : q"

queue 初始化時(shí)是這樣的:

state1.png

整個(gè) queue 中 head = tail = dummyNode, 這時(shí)我們開始 offer 元素
1) 添加元素 a
1. 由于 head = tail = dummyNode, 所以 p.next = null
2. 直接操作步驟4 (p.casNext(null, newNode)), 若操作成功, 接著往下走, 不成功(并發(fā)時(shí) 其他的cas操作成功), 再loop 重試至成功
3. 判斷 p != t, 這時(shí)沒出現(xiàn) tail指向的不是 last node,所以不成立, 直接return
添加元素a后:

state2.png
  1. 添加元素 b
    1. 此時(shí)還是 head = tail = dummyNode, p節(jié)點(diǎn)是 dummyNode, q.item = a, q.item != null 且 q != null, 直接執(zhí)行步驟7 p = q (p != t && (t != (t = tail)) 下面說)
    2. 再次 判斷 q == null, 所以 有執(zhí)行步驟4 p.casNext(), 這時(shí)因?yàn)閳?zhí)行過 p = q, 所以 p != t 成立, 對(duì)tail進(jìn)行cas操作
    3. 最后直接 return
      添加 b 之后:
state3.png
  1. 添加元素c
    1. 這里操作步驟和添加 a 一樣, 所以不說了
      添加c后:
state4.png

解決上面的疑惑(看這里時(shí)最好將下面的 poll也看一遍):

1. "p = q", 這是在poll方法中調(diào)用 updateHead 方法所致的 
2. "p = (t != (t = tail))", 這段代碼的意思是 若 tail 節(jié)點(diǎn)在另外的節(jié)點(diǎn)中有變化 tail != t, 則將 tail 賦值給 p.雖然只有這短短一行代碼, 但是包含非常多的意思:
   i!= 這個(gè)操作符號(hào)不是原子的, 它可以被中斷; 
   ii) 執(zhí)行時(shí) 先獲取t的值, 再 t = tail, 賦值好了之后再與原來的t比較
   iii) 在多線程環(huán)境中 tail 很可能在上面添加元素的過程中被改變, 所以會(huì)出現(xiàn) t != tail, 若tail被修改, 則用新的tail, 不然直接跳到head節(jié)點(diǎn)
3. 多了一個(gè) p != t , 因?yàn)?tail變更, 節(jié)省了 (p = q) 后 loop 中的無畏操作, tail 更新說明 q節(jié)點(diǎn)肯定也是無效的

OK 至此 整個(gè)offer是分析好了, 接下來 poll

7. 出隊(duì)列操作 poll()

因?yàn)檫@個(gè)操作涉及 head 引用, 所以我們?cè)賮砘仡櫼幌耯ead的不變性和可變性:

不變性(invariants)

1. 所有的有效節(jié)點(diǎn)通過 succ() 方法都可達(dá)
2. head != null
3. (tmp = head).next != tmp || tmp != head (其實(shí)就是 head.next != head)

可變性(Non-invariants)
1. head.item 可能是 null, 也可能不是 null
2. 允許 tail 滯后于 head, 也就是調(diào)用 succ() 方法, 從 head 不可達(dá)tail

head主要特點(diǎn) tail 可能之后 head, 且head.item 可能是 null
不廢話了, 直接上代碼

public E poll(){
    restartFromHead:
    for(;;){ // 0. 為啥這里面是兩個(gè) for 循環(huán)? 不防, 你去掉個(gè)試試, 其實(shí)主要是為了在 "continue restartFromHead" 后進(jìn)行第二個(gè) for loop 中的初始化
        for(Node<E> h = head, p = h, q;;){ // 1.進(jìn)行變量的初始化 p = h = head,
            E item = p.item;

            if(item != null && p.casItem(item, null)){  // 2. 若 node.item != null, 則進(jìn)行cas操作, cas成功則返回值
                // Successful CAS is the linearization point
                // for item to be removed from this queue
                if(p != h){ // hop two nodes at a time  // 3. 若此時(shí)的 p != h, 則更新 head(那啥時(shí) p != h, 額, 這個(gè)絕對(duì)坑啊 -> 執(zhí)行第8步后)
                    updateHead(h, ((q = p.next) != null)? q : p); // 4. 進(jìn)行 cas 更新 head ; "(q = p.next) != null" 怕出現(xiàn)p此時(shí)是尾節(jié)點(diǎn)了; 在 ConcurrentLinkedQueue 中正真的尾節(jié)點(diǎn)只有1個(gè)(必須滿足node.next = null)
                }
                return item;
            }
            else if((q = p.next) == null){  // 5. queue是空的, p是尾節(jié)點(diǎn)
                updateHead(h, p); // 6. 這一步除了更新head 外, 還是helpDelete刪除隊(duì)列操作, 刪除 p 之前的節(jié)點(diǎn)(和 ConcurrentSkipListMap.Node 中的 helpDelete 有異曲同工之妙)
                return null;
            }
            else if(p == q){ // 7. p == q -> 說明 p節(jié)點(diǎn)已經(jīng)是刪除了的head節(jié)點(diǎn), 為啥呢?(見updateHead方法)
                continue restartFromHead;
            }else
                p = q; // 8. 將 q -> p, 進(jìn)行下個(gè)節(jié)點(diǎn)的 poll 操作(初始化一個(gè) dummy 節(jié)點(diǎn), 在單線程情況下, 這個(gè) if 判斷是第一個(gè)執(zhí)行的)
        }
    }
}

理解了offer之后我想 poll 應(yīng)該比較簡(jiǎn)單了.
我們?cè)賮砘仡櫼幌聞倓偺砑恿?a, b, c, 之后隊(duì)列的狀態(tài):

state4.png
  1. poll 第一個(gè)元素 a
1. 此時(shí) head指向 dummy, tail 指向 item = b 的節(jié)點(diǎn), 所以在步驟2中 item == null, 而 (q = p.next) != null, 所以直接跳到步驟8, 
2. 這時(shí) p指向a, 且滿足 item != null, 所以執(zhí)行步驟2, 又因?yàn)閳?zhí)行了步驟8, 所以 p != h, 進(jìn)行 head 節(jié)點(diǎn)的更新 (head 指向這時(shí)p.next節(jié)點(diǎn))

poll item = a 后:

state6.png
  1. poll 第二個(gè)元素 b
1. 此時(shí) head = tail = b 節(jié)點(diǎn), 所以 item != null, 直接執(zhí)行 步驟2, 而 p == h , 所以不更新head

poll 節(jié)點(diǎn) b 后:

state7.png
  1. poll 第三個(gè)元素 c
    poll 節(jié)點(diǎn) c 和 poll 節(jié)點(diǎn)啊一樣的, 所以不說了, 直接看結(jié)果圖
state8.png

一目了然, tail 滯后于 head

  1. ok 這時(shí)我們?cè)龠M(jìn)行 offer() 節(jié)點(diǎn) d, 則就會(huì)出現(xiàn) offer 中的步驟 6 (p == q), 所以這時(shí)p直接跳到 head節(jié)點(diǎn), 來進(jìn)行更新, 步驟省略....

結(jié)果如圖 :

state9.png

至此整個(gè) poll 分析結(jié)束

8. 總結(jié)

ConcurrentLinkedQueue 的整個(gè)設(shè)計(jì)十分精妙, 它使用 CAS 處理對(duì)數(shù)據(jù)的操作, 同時(shí)允許隊(duì)列處于不一致的狀態(tài); 這種特性分離了一般 poll/offer時(shí)需要兩個(gè)原子的操作, 對(duì)了尤其是節(jié)點(diǎn)的刪除 (updateHead) 和后繼節(jié)點(diǎn)的訪問 succ(), 而對(duì) ConcurrentLinkedQueue的掌握有助于我們了解 SynchronousQueue, AQS, FutureTask 中的 Queue

參考資料:
Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
vickyqi ConcurrentLinkedQueue
大飛 ConcurrentLinkedQueue

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容