前言
最近公司的項(xiàng)目在陸續(xù)升級(jí)jdk8,正好想起之前有人問(wèn)我的jdk8中新增的LongAdder類和AtomicLong的區(qū)別,就忍不住想探究一番。
源碼解析
首先看一下類的定義
public class LongAdder extends Striped64 implements Serializable
然后來(lái)看一下類的屬性和方法

這里讓我困惑的一個(gè)問(wèn)題是
LongAdder中沒有類似于AtomicLong中getAndIncrement()或者incrementAndGet()這樣的原子操作,所以只能通過(guò)increment()方法和longValue()方法的組合來(lái)實(shí)現(xiàn)更新和獲取的動(dòng)作,然而這樣不能保證這個(gè)組合操作的原子性,猜想也許LongAdder就是不具備這樣的機(jī)制吧。那么就主要看一下increment()和longValue()方法。
longValue方法
public long longValue() {
return sum();
}
首先看一下獲取值的longValue()方法,這里調(diào)用了sum()
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
這里出現(xiàn)了一個(gè)類Cell,這個(gè)類是Striped64類的靜態(tài)內(nèi)部類,因此當(dāng)Striped64對(duì)象初始化時(shí)并不會(huì)連帶將Cell類初始化,其源碼如下
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
這里有一個(gè)@sun.misc.Contended標(biāo)識(shí)非常奇怪,查了資料發(fā)現(xiàn)是用于解決偽共享(false sharing)問(wèn)題的,這里就不深入講解了。這個(gè)類只有一個(gè)cas(long cmp, long val)方法,做的事情也就是簡(jiǎn)單的CAS操作,接著來(lái)看一下cells和base的定義,這兩個(gè)屬性都是Striped64中的
transient volatile Cell[] cells;
transient volatile long base;
可以看到全部都是volatile類型的,因此都具有可見性,并且可以推測(cè)會(huì)遇到并發(fā)的操作。
回到sum()方法就很好理解了,這個(gè)方法返回的是base和cells數(shù)組中所有元素的和,這里的base像是一個(gè)初始值的作用,我們繼續(xù)往下分析。
increment方法
public void increment() {
add(1L);
}
接著分析increment()方法,可以看到該方法就是對(duì)add(long x)的封裝,那么具體來(lái)分析一下這個(gè)方法
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
這里重點(diǎn)是casBase(b = base, b + x),來(lái)看一下它做了什么
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
由于此處的x為1,那么該方法就是對(duì)Striped64的BASE的值進(jìn)行累加并返回是否成功,順帶一提這里的BASE是base值所對(duì)應(yīng)的內(nèi)存偏移量,所以casBase(b = base, b + x)就是對(duì)base進(jìn)行CAS操作,執(zhí)行成功的話操作就結(jié)束了,那么什么時(shí)候會(huì)不成功呢,當(dāng)然就是并發(fā)量大的時(shí)候,結(jié)合之前分析的longValue()方法,這里就可以得出一個(gè)結(jié)論——當(dāng)并發(fā)不大的時(shí)候只對(duì)base進(jìn)行更新,獲取值得時(shí)候當(dāng)然也只從base獲取即可,這個(gè)時(shí)候其實(shí)和AtomicLong的原理幾乎一模一樣,看來(lái)區(qū)別就在于后面的分支,繼續(xù)往下看。
首先置標(biāo)志位uncontended為true,從字面意思也可以看出來(lái)uncontended意思是沒有競(jìng)爭(zhēng)的。當(dāng)casBase不成功則將進(jìn)入add(long x)的if分支中,分支由四部分組成中間用||連接,必須全部滿足才會(huì)結(jié)束方法執(zhí)行,否則進(jìn)入longAccumulate(x, null, uncontended)方法,這四部分分別為
- as == null,
- (m = as.length - 1) < 0
- (a = as[getProbe() & m]) == null
- !(uncontended = a.cas(v = a.value, v + x))
這里的四個(gè)條件其實(shí)并不是并列的,而是遞進(jìn)式的,1和2判斷cells數(shù)組是否為空,3取cells數(shù)組中的任意一個(gè)元素a判斷是否為空,4是對(duì)a進(jìn)行cas操作并將執(zhí)行結(jié)果賦值標(biāo)志位uncontended。從這里可以給出第二個(gè)結(jié)論,當(dāng)競(jìng)爭(zhēng)激烈到一定程度無(wú)法對(duì)base進(jìn)行累加操作時(shí),會(huì)對(duì)cells數(shù)組中某個(gè)元素進(jìn)行更新。
最后來(lái)看一下當(dāng)上述條件無(wú)法全部滿足時(shí)調(diào)用的longAccumulate(x, null, uncontended)方法
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe(); //返回當(dāng)前線程的threadLocalRandomProbe值
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // cells數(shù)組中對(duì)應(yīng)位置沒有數(shù)據(jù)則插入新對(duì)象
Cell r = new Cell(x);
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) //對(duì)該位置的cell元素進(jìn)行累加
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale //判斷數(shù)組大小是否大于核數(shù)
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) { //對(duì)cells數(shù)組進(jìn)行擴(kuò)容,直接擴(kuò)容為2倍
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { //cellsBusy這里是做為一個(gè)自旋鎖來(lái)使用的
boolean init = false;
try { // 初始化cells數(shù)組大小為2
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) //對(duì)base進(jìn)行CAS操作
break; // Fall back on using base
}
}
這個(gè)方法比較長(zhǎng),大致對(duì)幾個(gè)關(guān)鍵點(diǎn)做了注釋,該方法主要是用一個(gè)死循環(huán)對(duì)cells數(shù)組中的元素進(jìn)行操作,當(dāng)要更新的位置的元素為空時(shí)插入新的cell元素,否則在該位置進(jìn)行CAS的累加操作,如果CAS操作失敗并且數(shù)組大小沒有超過(guò)核數(shù)就擴(kuò)容cells數(shù)組。
總結(jié)
LongAdder類與AtomicLong類的區(qū)別在于高并發(fā)時(shí)前者將對(duì)單一變量的CAS操作分散為對(duì)數(shù)組cells中多個(gè)元素的CAS操作,取值時(shí)進(jìn)行求和;而在并發(fā)較低時(shí)僅對(duì)base變量進(jìn)行CAS操作,與AtomicLong類原理相同。不得不說(shuō)這種分布式的設(shè)計(jì)還是很巧妙的。