JDK8系列之LongAdder解析

前言

最近公司的項(xiàng)目在陸續(xù)升級(jí)jdk8,正好想起之前有人問(wèn)我的jdk8中新增的LongAdder類和AtomicLong的區(qū)別,就忍不住想探究一番。

源碼解析

首先看一下類的定義

public class LongAdder extends Striped64 implements Serializable

然后來(lái)看一下類的屬性和方法

LongAdder.png

這里讓我困惑的一個(gè)問(wèn)題是LongAdder中沒有類似于AtomicLonggetAndIncrement()或者incrementAndGet()這樣的原子操作,所以只能通過(guò)increment()方法和longValue()方法的組合來(lái)實(shí)現(xiàn)更新和獲取的動(dòng)作,然而這樣不能保證這個(gè)組合操作的原子性,猜想也許LongAdder就是不具備這樣的機(jī)制吧。那么就主要看一下increment()longValue()方法。

longValue方法
    public long longValue() {
        return sum();
    }

首先看一下獲取值的longValue()方法,這里調(diào)用了sum()

    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

這里出現(xiàn)了一個(gè)類Cell,這個(gè)類是Striped64類的靜態(tài)內(nèi)部類,因此當(dāng)Striped64對(duì)象初始化時(shí)并不會(huì)連帶將Cell類初始化,其源碼如下

    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

這里有一個(gè)@sun.misc.Contended標(biāo)識(shí)非常奇怪,查了資料發(fā)現(xiàn)是用于解決偽共享(false sharing)問(wèn)題的,這里就不深入講解了。這個(gè)類只有一個(gè)cas(long cmp, long val)方法,做的事情也就是簡(jiǎn)單的CAS操作,接著來(lái)看一下cellsbase的定義,這兩個(gè)屬性都是Striped64中的

    transient volatile Cell[] cells;

    transient volatile long base;

可以看到全部都是volatile類型的,因此都具有可見性,并且可以推測(cè)會(huì)遇到并發(fā)的操作。
回到sum()方法就很好理解了,這個(gè)方法返回的是basecells數(shù)組中所有元素的和,這里的base像是一個(gè)初始值的作用,我們繼續(xù)往下分析。

increment方法
public void increment() {
        add(1L);
    }

接著分析increment()方法,可以看到該方法就是對(duì)add(long x)的封裝,那么具體來(lái)分析一下這個(gè)方法

public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

這里重點(diǎn)是casBase(b = base, b + x),來(lái)看一下它做了什么

    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

由于此處的x為1,那么該方法就是對(duì)Striped64BASE的值進(jìn)行累加并返回是否成功,順帶一提這里的BASEbase值所對(duì)應(yīng)的內(nèi)存偏移量,所以casBase(b = base, b + x)就是對(duì)base進(jìn)行CAS操作,執(zhí)行成功的話操作就結(jié)束了,那么什么時(shí)候會(huì)不成功呢,當(dāng)然就是并發(fā)量大的時(shí)候,結(jié)合之前分析的longValue()方法,這里就可以得出一個(gè)結(jié)論——當(dāng)并發(fā)不大的時(shí)候只對(duì)base進(jìn)行更新,獲取值得時(shí)候當(dāng)然也只從base獲取即可,這個(gè)時(shí)候其實(shí)和AtomicLong的原理幾乎一模一樣,看來(lái)區(qū)別就在于后面的分支,繼續(xù)往下看。
首先置標(biāo)志位uncontended為true,從字面意思也可以看出來(lái)uncontended意思是沒有競(jìng)爭(zhēng)的。當(dāng)casBase不成功則將進(jìn)入add(long x)的if分支中,分支由四部分組成中間用||連接,必須全部滿足才會(huì)結(jié)束方法執(zhí)行,否則進(jìn)入longAccumulate(x, null, uncontended)方法,這四部分分別為

  1. as == null,
  2. (m = as.length - 1) < 0
  3. (a = as[getProbe() & m]) == null
  4. !(uncontended = a.cas(v = a.value, v + x))

這里的四個(gè)條件其實(shí)并不是并列的,而是遞進(jìn)式的,1和2判斷cells數(shù)組是否為空,3取cells數(shù)組中的任意一個(gè)元素a判斷是否為空,4是對(duì)a進(jìn)行cas操作并將執(zhí)行結(jié)果賦值標(biāo)志位uncontended。從這里可以給出第二個(gè)結(jié)論,當(dāng)競(jìng)爭(zhēng)激烈到一定程度無(wú)法對(duì)base進(jìn)行累加操作時(shí),會(huì)對(duì)cells數(shù)組中某個(gè)元素進(jìn)行更新。

最后來(lái)看一下當(dāng)上述條件無(wú)法全部滿足時(shí)調(diào)用的longAccumulate(x, null, uncontended)方法

    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();  //返回當(dāng)前線程的threadLocalRandomProbe值
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // cells數(shù)組中對(duì)應(yīng)位置沒有數(shù)據(jù)則插入新對(duì)象
                        Cell r = new Cell(x);   
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))         //對(duì)該位置的cell元素進(jìn)行累加
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale   //判斷數(shù)組大小是否大于核數(shù)
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {         //對(duì)cells數(shù)組進(jìn)行擴(kuò)容,直接擴(kuò)容為2倍
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {  //cellsBusy這里是做為一個(gè)自旋鎖來(lái)使用的
                boolean init = false;
                try {                           // 初始化cells數(shù)組大小為2
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))      //對(duì)base進(jìn)行CAS操作
                break;                          // Fall back on using base
        }
    }

這個(gè)方法比較長(zhǎng),大致對(duì)幾個(gè)關(guān)鍵點(diǎn)做了注釋,該方法主要是用一個(gè)死循環(huán)對(duì)cells數(shù)組中的元素進(jìn)行操作,當(dāng)要更新的位置的元素為空時(shí)插入新的cell元素,否則在該位置進(jìn)行CAS的累加操作,如果CAS操作失敗并且數(shù)組大小沒有超過(guò)核數(shù)就擴(kuò)容cells數(shù)組。

總結(jié)

LongAdder類與AtomicLong類的區(qū)別在于高并發(fā)時(shí)前者將對(duì)單一變量的CAS操作分散為對(duì)數(shù)組cells中多個(gè)元素的CAS操作,取值時(shí)進(jìn)行求和;而在并發(fā)較低時(shí)僅對(duì)base變量進(jìn)行CAS操作,與AtomicLong類原理相同。不得不說(shuō)這種分布式的設(shè)計(jì)還是很巧妙的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容