ConcurrentLinkedQueue
通過名字大家就可以知道, 這是一個(gè)通過鏈表實(shí)現(xiàn)的并發(fā)安全的隊(duì)列, 它應(yīng)該是java中并發(fā)環(huán)境下性能最好的隊(duì)列, 為什么呢? 因?yàn)樗牟蛔冃?invariants) 與可變性(non-invariants)
1. 基本原則不變性(fundamental invariants)
1.整個(gè)隊(duì)列中一定會(huì)存在一個(gè) node(node.next = null), 并且僅存在一個(gè), 但tail引用不一定指向它
2. 隊(duì)列中所有 item != null 的節(jié)點(diǎn), head一定能夠到達(dá); cas 設(shè)置 node.item = null, 意味著這個(gè)節(jié)點(diǎn)被刪除
head引用的不變性和可變性
不變性(invariants)
1. 所有的有效節(jié)點(diǎn)通過 succ() 方法都可達(dá)
2. head != null
3. (tmp = head).next != tmp || tmp != head (其實(shí)就是 head.next != head)
可變性(Non-invariants)
1. head.item 可能是 null, 也可能不是 null
2. 允許 tail 滯后于 head, 也就是調(diào)用 succ() 方法, 從 head 不可達(dá)tail
tail 引用的不變性和可變性
不變性(invariants)
1. tail 節(jié)點(diǎn)通過succ()方法一定到達(dá)隊(duì)列中的最后一個(gè)節(jié)點(diǎn)(node.next = null)
2. tail != null
可變性(Non-invariants)
1. tail.item 可能是 null, 也可能不是 null
2. 允許 tail 滯后于 head, 也就是調(diào)用 succ() 方法, 從 head 不可達(dá)tail
3. tail.next 可能指向 tail
這些不變性(invariants) 和 可變性(Non-invariants) 造成 ConcurrentLinkedQueue 有些異于一般queue的特點(diǎn):
1. head 與 tail 都有可能指向一個(gè) (item = null) 的節(jié)點(diǎn)
2. 如果 queue 是空的, 則所有 node.item = null
3. queue剛剛創(chuàng)建時(shí) head = tail = dummyNode
4. head/tail 的 item/next 的操作都是通過 CAS
暈了, 是哇! 沒事, 這些都是特性, 我們先看代碼, 回頭再回顧這些特性.
2. 內(nèi)部節(jié)點(diǎn) Node
import com.lami.tuomatuo.search.base.concurrent.unsafe.UnSafeClass;
import sun.misc.Unsafe;
/**
* http://hg.openjdk.java.net/jdk7/jdk7/jdk/file/9b8c96f96a0f/src/share/classes/sun/misc/Unsafe.java
* http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/9b0ca45cd756/src/share/vm/prims/unsafe.cpp
* http://mishadoff.com/blog/java-magic-part-4-sun-dot-misc-dot-unsafe/
*
* Created by xjk on 1/13/17.
*/
public class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item){
/**
* Stores a reference value into a given Java variable.
* <p>
* Unless the reference <code>x</code> being stored is either null
* or matches the field type, the results are undefined.
* If the reference <code>o</code> is non-null, car marks or
* other store barriers for that object (if the VM requires them)
* are updated.
* @see #putInt(Object, int, int)
*
* 將 Node 對(duì)象的指定 itemOffset 偏移量設(shè)置 一個(gè)引用值
*/
unsafe.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val){
/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
* 原子性的更新 item 值
*/
return unsafe.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val){
/**
* Version of {@link #putObjectVolatile(Object, long, Object)}
* that does not guarantee immediate visibility of the store to
* other threads. This method is generally only useful if the
* underlying field is a Java volatile (or if an array cell, one
* that is otherwise only accessed using volatile accesses).
*
* 調(diào)用這個(gè)方法和putObject差不多, 只是這個(gè)方法設(shè)置后對(duì)應(yīng)的值的可見性不一定得到保證,
* 這個(gè)方法能起這個(gè)作用, 通常是作用在 volatile field上, 也就是說, 下面中的參數(shù) val 是被volatile修飾
*/
unsafe.putOrderedObject(this, nextOffset, val);
}
/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
*
* 原子性的更新 nextOffset 上的值
*
*/
boolean casNext(Node<E> cmp, Node<E> val){
return unsafe.compareAndSwapObject(this, nextOffset, cmp, val);
}
private static Unsafe unsafe;
private static long itemOffset;
private static long nextOffset;
static {
try {
unsafe = UnSafeClass.getInstance();
Class<?> k = Node.class;
itemOffset = unsafe.objectFieldOffset(k.getDeclaredField("item"));
nextOffset = unsafe.objectFieldOffset(k.getDeclaredField("next"));
}catch (Exception e){
}
}
}
整個(gè)內(nèi)部節(jié)點(diǎn) Node 的代碼比較簡(jiǎn)單, 若不了解 Unsafe 類使用的, 請(qǐng)點(diǎn)擊鏈接 Unsafe 與 LockSupport
3. ConcurrentLinkedQueue 內(nèi)部屬性及構(gòu)造方法
/** head 節(jié)點(diǎn) */
private transient volatile Node<E> head;
/** tail 節(jié)點(diǎn) */
private transient volatile Node<E> tail;
public ConcurrentLinkedList() {
/** 默認(rèn)會(huì)構(gòu)造一個(gè) dummy 節(jié)點(diǎn)
* dummy 的存在是防止一些特殊復(fù)雜代碼的出現(xiàn)
*/
head = tail = new Node<E>(null);
}
初始化 ConcurrentLinkedQueue時(shí) head = tail = dummy node.
4. 查詢后繼節(jié)點(diǎn)方法 succ()
/**
* 獲取 p 的后繼節(jié)點(diǎn), 若 p.next = p (updateHead 操作導(dǎo)致的), 則說明 p 已經(jīng) fall off queue, 需要 jump 到 head
*/
final Node<E> succ(Node<E> p){
Node<E> next = p.next;
return (p == next)? head : next;
}
獲取一個(gè)節(jié)點(diǎn)的后繼節(jié)點(diǎn)不是 node.next 嗎, No, No, No, 還有特殊情況, 就是tail 指向一個(gè)哨兵節(jié)點(diǎn) (node.next = node); 代碼的注釋中我提到了 哨兵節(jié)點(diǎn)是 updateHead 導(dǎo)致的, 那我們來看 updateHead方法.
5. 特別的更新頭節(jié)點(diǎn)方法 updateHead
為什么說 updateHead 特別呢? 還是看代碼
/**
* Tries to CAS head to p, If successfully, repoint old head to itself
* as sentinel for succ(), blew
*
* 將節(jié)點(diǎn) p設(shè)置為新的節(jié)點(diǎn)(這是原子操作),
* 之后將原節(jié)點(diǎn)的next指向自己, 直接變成一個(gè)哨兵節(jié)點(diǎn)(為queue節(jié)點(diǎn)刪除及garbage做準(zhǔn)備)
*
* @param h
* @param p
*/
final void updateHead(Node<E> h, Node<E> p){
if(h != p && casHead(h, p)){
h.lazySetNext(h);
}
}
主要這個(gè) h.lazySetNext(h), 將 h.next -> h 直接變成一個(gè)哨兵節(jié)點(diǎn), 這種lazySetNext主要用于無阻塞數(shù)據(jù)結(jié)構(gòu)的 nulling out, 要了解詳情 點(diǎn)擊 Unsafe 與 LockSupport
有了上面的這些輔助方法, 我們開始進(jìn)入正題
6. 入隊(duì)列操作 offer()
一般我們的思維: 入隊(duì)操作就是 tail.next = newNode; 而這里不同, 為什么呢? 我們?cè)賮砘仡櫼幌?tail 的不變性和可變性
不變性(invariants)
1. tail 節(jié)點(diǎn)通過succ()方法一定到達(dá)隊(duì)列中的最后一個(gè)節(jié)點(diǎn)(node.next = null)
2. tail != null
可變性(Non-invariants)
1. tail.item 可能是 null, 也可能不是 null
2. 允許 tail 滯后于 head, 也就是調(diào)用 succ() 方法, 從 head 不可達(dá)tail
3. tail.next 可能指向 tail
主要是這里 tail 會(huì)滯后于 head, 所以呢 要找到正真的 last node (node.next = null)
直接來代碼
/**
* Inserts the specified element at the tail of this queue
* As the queue is unbounded, this method will never return {@code false}
*
* @param e {@code true} (as specified by {@link Queue#offer(Object)})
* @return NullPointerException if the specified element is null
*
* 在隊(duì)列的末尾插入指定的元素
*/
public boolean offer(E e){
checkNotNull(e);
final Node<E> newNode = new Node<E>(e); // 1. 構(gòu)建一個(gè) node
for(Node<E> t = tail, p = t;;){ // 2. 初始化變量 p = t = tail
Node<E> q = p.next; // 3. 獲取 p 的next
if(q == null){ // q == null, 說明 p 是 last Node
// p is last node
if(p.casNext(null, newNode)){ // 4. 對(duì) p 進(jìn)行 cas 操作, newNode -> p.next
// Successful CAS is the linearization point
// for e to become an element of the queue,
// and for newNode to become "live"
if(p != t){ // 5. 每每經(jīng)過一次 p = q 操作(向后遍歷節(jié)點(diǎn)), 則 p != t 成立, 這個(gè)也說明 tail 滯后于 head 的體現(xiàn)
casTail(t, newNode); // Failure is OK
}
return true;
}
}
else if(p == q){ // 6. (p == q) 成立, 則說明p是pool()時(shí)調(diào)用 "updateHead" 導(dǎo)致的(刪除頭節(jié)點(diǎn)); 此時(shí)說明 tail 指針已經(jīng) fallen off queue, 所以進(jìn)行 jump 操作, 若在t沒變化, 則 jump 到 head, 若 t 已經(jīng)改變(jump操作在另外的線程中執(zhí)行), 則jump到 head 節(jié)點(diǎn), 直到找到 node.next = null 的節(jié)點(diǎn)
/** 1. 大前提 p 是已經(jīng)被刪除的節(jié)點(diǎn)
* 2. 判斷 tail 是否已經(jīng)改變
* 1) tail 已經(jīng)變化, 則說明 tail 已經(jīng)重新定位
* 2) tail 未變化, 而 tail 指向的節(jié)點(diǎn)是要?jiǎng)h除的節(jié)點(diǎn), 所以讓 p 指向 head
* 判斷尾節(jié)點(diǎn)是否有變化
* 1. 尾節(jié)點(diǎn)變化, 則用新的尾節(jié)點(diǎn)
* 2. 尾節(jié)點(diǎn)沒變化, 將 tail 指向head
*
* public void test(){
* String tail = "";
* String t = (tail = "oldTail");
* tail = "newTail";
* boolean isEqual = t != (t = tail); // <- 神奇吧
* System.out.println("isEqual : "+isEqual); // isEqual : true
* }
*/
p = (t != (t = tail))? t : head;
}else{
// 7. (p != t) -> 說明執(zhí)行過 p = q 操作(向后遍歷操作), "(t != (t = tail)))" -> 說明尾節(jié)點(diǎn)在其他的線程發(fā)生變化
// 為什么 "(t != (t = tail)))" 一定要滿足呢, 因?yàn)?tail變更, 節(jié)省了 (p = q) 后 loop 中的無畏操作, tail 更新說明 q節(jié)點(diǎn)肯定也是無效的
p = (p != t && (t != (t = tail))) ? t : q;
}
}
}
先瞄一下這段代碼: 發(fā)現(xiàn)有3大疑惑:
- 明明 Node<E> q = p.next, 怎么會(huì)有 p = q ?
- "p = (t != (t = tail))? t : head" 這段代碼是什么玩意, 是不是讓你直接懷疑自己的java基礎(chǔ)了, 不急我們慢慢來.
- 最后就是 "p = (p != t && (t != (t = tail))) ? t : q"
queue 初始化時(shí)是這樣的:

整個(gè) queue 中 head = tail = dummyNode, 這時(shí)我們開始 offer 元素
1) 添加元素 a
1. 由于 head = tail = dummyNode, 所以 p.next = null
2. 直接操作步驟4 (p.casNext(null, newNode)), 若操作成功, 接著往下走, 不成功(并發(fā)時(shí) 其他的cas操作成功), 再loop 重試至成功
3. 判斷 p != t, 這時(shí)沒出現(xiàn) tail指向的不是 last node,所以不成立, 直接return
添加元素a后:

- 添加元素 b
- 此時(shí)還是 head = tail = dummyNode, p節(jié)點(diǎn)是 dummyNode, q.item = a, q.item != null 且 q != null, 直接執(zhí)行步驟7 p = q (p != t && (t != (t = tail)) 下面說)
- 再次 判斷 q == null, 所以 有執(zhí)行步驟4 p.casNext(), 這時(shí)因?yàn)閳?zhí)行過 p = q, 所以 p != t 成立, 對(duì)tail進(jìn)行cas操作
- 最后直接 return
添加 b 之后:

- 添加元素c
- 這里操作步驟和添加 a 一樣, 所以不說了
添加c后:
- 這里操作步驟和添加 a 一樣, 所以不說了

解決上面的疑惑(看這里時(shí)最好將下面的 poll也看一遍):
1. "p = q", 這是在poll方法中調(diào)用 updateHead 方法所致的
2. "p = (t != (t = tail))", 這段代碼的意思是 若 tail 節(jié)點(diǎn)在另外的節(jié)點(diǎn)中有變化 tail != t, 則將 tail 賦值給 p.雖然只有這短短一行代碼, 但是包含非常多的意思:
i!= 這個(gè)操作符號(hào)不是原子的, 它可以被中斷;
ii) 執(zhí)行時(shí) 先獲取t的值, 再 t = tail, 賦值好了之后再與原來的t比較
iii) 在多線程環(huán)境中 tail 很可能在上面添加元素的過程中被改變, 所以會(huì)出現(xiàn) t != tail, 若tail被修改, 則用新的tail, 不然直接跳到head節(jié)點(diǎn)
3. 多了一個(gè) p != t , 因?yàn)?tail變更, 節(jié)省了 (p = q) 后 loop 中的無畏操作, tail 更新說明 q節(jié)點(diǎn)肯定也是無效的
OK 至此 整個(gè)offer是分析好了, 接下來 poll
7. 出隊(duì)列操作 poll()
因?yàn)檫@個(gè)操作涉及 head 引用, 所以我們?cè)賮砘仡櫼幌耯ead的不變性和可變性:
不變性(invariants)
1. 所有的有效節(jié)點(diǎn)通過 succ() 方法都可達(dá)
2. head != null
3. (tmp = head).next != tmp || tmp != head (其實(shí)就是 head.next != head)
可變性(Non-invariants)
1. head.item 可能是 null, 也可能不是 null
2. 允許 tail 滯后于 head, 也就是調(diào)用 succ() 方法, 從 head 不可達(dá)tail
head主要特點(diǎn) tail 可能之后 head, 且head.item 可能是 null
不廢話了, 直接上代碼
public E poll(){
restartFromHead:
for(;;){ // 0. 為啥這里面是兩個(gè) for 循環(huán)? 不防, 你去掉個(gè)試試, 其實(shí)主要是為了在 "continue restartFromHead" 后進(jìn)行第二個(gè) for loop 中的初始化
for(Node<E> h = head, p = h, q;;){ // 1.進(jìn)行變量的初始化 p = h = head,
E item = p.item;
if(item != null && p.casItem(item, null)){ // 2. 若 node.item != null, 則進(jìn)行cas操作, cas成功則返回值
// Successful CAS is the linearization point
// for item to be removed from this queue
if(p != h){ // hop two nodes at a time // 3. 若此時(shí)的 p != h, 則更新 head(那啥時(shí) p != h, 額, 這個(gè)絕對(duì)坑啊 -> 執(zhí)行第8步后)
updateHead(h, ((q = p.next) != null)? q : p); // 4. 進(jìn)行 cas 更新 head ; "(q = p.next) != null" 怕出現(xiàn)p此時(shí)是尾節(jié)點(diǎn)了; 在 ConcurrentLinkedQueue 中正真的尾節(jié)點(diǎn)只有1個(gè)(必須滿足node.next = null)
}
return item;
}
else if((q = p.next) == null){ // 5. queue是空的, p是尾節(jié)點(diǎn)
updateHead(h, p); // 6. 這一步除了更新head 外, 還是helpDelete刪除隊(duì)列操作, 刪除 p 之前的節(jié)點(diǎn)(和 ConcurrentSkipListMap.Node 中的 helpDelete 有異曲同工之妙)
return null;
}
else if(p == q){ // 7. p == q -> 說明 p節(jié)點(diǎn)已經(jīng)是刪除了的head節(jié)點(diǎn), 為啥呢?(見updateHead方法)
continue restartFromHead;
}else
p = q; // 8. 將 q -> p, 進(jìn)行下個(gè)節(jié)點(diǎn)的 poll 操作(初始化一個(gè) dummy 節(jié)點(diǎn), 在單線程情況下, 這個(gè) if 判斷是第一個(gè)執(zhí)行的)
}
}
}
理解了offer之后我想 poll 應(yīng)該比較簡(jiǎn)單了.
我們?cè)賮砘仡櫼幌聞倓偺砑恿?a, b, c, 之后隊(duì)列的狀態(tài):

- poll 第一個(gè)元素 a
1. 此時(shí) head指向 dummy, tail 指向 item = b 的節(jié)點(diǎn), 所以在步驟2中 item == null, 而 (q = p.next) != null, 所以直接跳到步驟8,
2. 這時(shí) p指向a, 且滿足 item != null, 所以執(zhí)行步驟2, 又因?yàn)閳?zhí)行了步驟8, 所以 p != h, 進(jìn)行 head 節(jié)點(diǎn)的更新 (head 指向這時(shí)p.next節(jié)點(diǎn))
poll item = a 后:

- poll 第二個(gè)元素 b
1. 此時(shí) head = tail = b 節(jié)點(diǎn), 所以 item != null, 直接執(zhí)行 步驟2, 而 p == h , 所以不更新head
poll 節(jié)點(diǎn) b 后:

- poll 第三個(gè)元素 c
poll 節(jié)點(diǎn) c 和 poll 節(jié)點(diǎn)啊一樣的, 所以不說了, 直接看結(jié)果圖

一目了然, tail 滯后于 head
- ok 這時(shí)我們?cè)龠M(jìn)行 offer() 節(jié)點(diǎn) d, 則就會(huì)出現(xiàn) offer 中的步驟 6 (p == q), 所以這時(shí)p直接跳到 head節(jié)點(diǎn), 來進(jìn)行更新, 步驟省略....
結(jié)果如圖 :

至此整個(gè) poll 分析結(jié)束
8. 總結(jié)
ConcurrentLinkedQueue 的整個(gè)設(shè)計(jì)十分精妙, 它使用 CAS 處理對(duì)數(shù)據(jù)的操作, 同時(shí)允許隊(duì)列處于不一致的狀態(tài); 這種特性分離了一般 poll/offer時(shí)需要兩個(gè)原子的操作, 對(duì)了尤其是節(jié)點(diǎn)的刪除 (updateHead) 和后繼節(jié)點(diǎn)的訪問 succ(), 而對(duì) ConcurrentLinkedQueue的掌握有助于我們了解 SynchronousQueue, AQS, FutureTask 中的 Queue
參考資料:
Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
vickyqi ConcurrentLinkedQueue
大飛 ConcurrentLinkedQueue