跳表-ConcurrentSkipListMap走馬觀花

跳表(跳躍表)是一種數(shù)據(jù)結(jié)構(gòu),改進(jìn)自鏈表,用于存儲(chǔ)有序的數(shù)據(jù),跳躍表通過空間換時(shí)間的方法來提高數(shù)據(jù)檢索的速度。早些在學(xué)校的數(shù)據(jù)結(jié)構(gòu)課程中并沒有接觸過跳表。第一次接觸是在了解Redis的有序集合的底層實(shí)現(xiàn)的時(shí)候,畢竟Redis的五種數(shù)據(jù)結(jié)構(gòu)是面試中的常見題,即使沒有實(shí)際使用過,也需要提前去了解一下。

但是最近在工作中不一樣了,由于是從事游戲開發(fā),經(jīng)常會(huì)有排行榜的需求需要實(shí)現(xiàn),而這大多數(shù)通過跳表來實(shí)現(xiàn),所以有必要來深入了解一下跳表并簡略的閱讀一下ConcurrentSkipListMap的源碼。前面也提到了Redis中的有序集合底層就是跳表,所以其也是開發(fā)排行榜功能時(shí)可以考慮使用的。有時(shí)候呢,我們需要排序的東西不需要持久化或者為了更高的效率,是直接在內(nèi)存中進(jìn)行排序,畢竟訪問redis是有網(wǎng)絡(luò)io開銷的,那么使用到的數(shù)據(jù)結(jié)構(gòu)就是jdk中的ConcurrentSkipListMap(支持同步)或者TreeMap(不支持同步,性能更好).

不過這次源碼重點(diǎn)在于跳躍表,不在于并發(fā)的控制。

跳躍表

通過上面的介紹,我們了解到跳躍表的應(yīng)用場景大概是這樣的:

  1. 有序
  2. 頻繁插入刪除
  3. 頻繁的查找

對于有序來說,數(shù)組和普通鏈表都可以通過排序算法來實(shí)現(xiàn),在排序復(fù)雜度上不相上下。鏈表在插入和刪除上性能較好,數(shù)組在查找上性能較好,所以都不能滿足我們的要求。跳躍表則是在插入刪除和查找的性能上做了折中,復(fù)雜度為log(n)。

跳表結(jié)構(gòu)如下所示:

跳躍表

為了更好的支持插入和刪除,所以采用鏈表的形式,可以看到圖片中最下面一行是一個(gè)有序的鏈表。但如果只是一個(gè)單一的鏈表,查找時(shí)復(fù)雜度為O(n),性能太差。如何優(yōu)化呢?

在有序數(shù)組中,我們查找時(shí)用的是二分查找,一次查找可以排除一半元素的遍歷。在數(shù)組中之所以可以用二分查找,是因?yàn)槲覀兡軌蚩焖俚囊設(shè)(1)的復(fù)雜度定位到中間的位置,但是鏈表只能是O(N)。所以跳躍表采取空間換時(shí)間的方式,既然你找不到中間點(diǎn),或者三分之一點(diǎn)等中間位置,那么我可以通過多增加一個(gè)節(jié)點(diǎn)來指向中間位置,這樣你也能夠快速的定位到中間的位置,然后一定程度的減少你遍歷元素的個(gè)數(shù),提高效率。圖中有多層,相鄰的兩層,采用的都是這樣的思想。

這個(gè)圖一目了然,很容易就可以讓大家了解跳表的思想。至于我們應(yīng)該添加多少層額外的鏈表,給什么位置的節(jié)點(diǎn)添加索引才能更好的優(yōu)化檢索和插入的效率,就是我希望通過閱讀源碼找尋的.

節(jié)點(diǎn)

通過上圖,可以發(fā)現(xiàn)有兩種節(jié)點(diǎn)類型,第一種是最下層的,與普通鏈表相似,第二種是除了最后一層以外的其他索引層節(jié)點(diǎn),有兩個(gè)指針。
但是在jdk源碼中還存在一種節(jié)點(diǎn),是索引層的頭節(jié)點(diǎn),還維護(hù)了其層數(shù)信息,下面先給出源碼注釋中的跳表樣例。

* Head nodes          Index nodes
* +-+    right        +-+                      +-+
* |2|---------------->| |--------------------->| |->null
* +-+                 +-+                      +-+
*  | down              |                        |
*  v                   v                        v
* +-+            +-+  +-+       +-+            +-+       +-+
* |1|----------->| |->| |------>| |----------->| |------>| |->null
* +-+            +-+  +-+       +-+            +-+       +-+
*  v              |    |         |              |         |
* Nodes  next     v    v         v              v         v
* +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+
* | |->|A|->|B|->|C|->|D|->|E|->|F|->|G|->|H|->|I|->|J|->|K|->null
* +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+
*

接著我們來分別看下它們在源碼中是如何定義的。

// 記住了,只關(guān)注跳表,不去關(guān)注并發(fā)?。?!水平有限,不誤導(dǎo)大家
// Node是最底層的鏈表的節(jié)點(diǎn),包括鍵值對和指向下一個(gè)節(jié)點(diǎn)的指針
static final class Node<K,V> {
    final K key;
    volatile Object value;
    volatile Node<K,V> next;
// 至于為什么需要兩個(gè)構(gòu)造函數(shù),后面源碼會(huì)有解釋
    Node(K key, Object value, Node<K,V> next) {
        this.key = key;
        this.value = value;
        this.next = next;
    }
    Node(Node<K,V> next) {
        this.key = null;
        this.value = this;
        this.next = next;
    }
    // ...配套method
}

// 索引節(jié)點(diǎn)結(jié)構(gòu)
// 存儲(chǔ)了兩個(gè)指針,分別指向右邊和下邊的節(jié)點(diǎn)
// 索引節(jié)點(diǎn)的value為鏈表節(jié)點(diǎn)
static class Index<K,V> {
    final Node<K,V> node;
    final Index<K,V> down;
    volatile Index<K,V> right;

    Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
        this.node = node;
        this.down = down;
        this.right = right;
    }
    // ...配套method
}

// 索引層的頭節(jié)點(diǎn)結(jié)構(gòu)
// 在索引節(jié)點(diǎn)的基礎(chǔ)上添加了表示層數(shù)的level變量
static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}

源碼閱讀小技巧

JDK中集合包下的代碼,一個(gè)集合差不多有幾千行,想要從上往下逐行看完是不現(xiàn)實(shí)的,需要能快速的定位到我們想看的方法。如果是HashMap這種常用的數(shù)據(jù)結(jié)構(gòu),我們經(jīng)常使用,對它有哪些方法非常的了解,就可以通過搜索方法名字來定位代碼的位置。但是像今天這種ConcurrentSkipListMap比較不常用的類,在不了解它大多數(shù)方法的時(shí)候,可以通過idea提供的功能幫我們快速定位。打開下圖中的show members就可以讓類現(xiàn)實(shí)它所有的方法和內(nèi)部類,就可以靠猜名字快速定位到源碼的位置。

[圖片上傳失敗...(image-61db2d-1568213866130)]

構(gòu)造函數(shù)

了解完內(nèi)部鏈表節(jié)點(diǎn)的實(shí)現(xiàn)情況,現(xiàn)在按照老規(guī)矩,從構(gòu)造函數(shù)開始閱讀源碼。

// Compartor接口用來指定key的排序規(guī)則
public ConcurrentSkipListMap() {
    this.comparator = null;
    initialize();
}
public ConcurrentSkipListMap(Comparator<? super K> comparator) {
    this.comparator = comparator;
    initialize();
}
// 還有兩個(gè)傳入map和sortedMap的構(gòu)造函數(shù)
private void initialize() {
    keySet = null;// 內(nèi)部類
    entrySet = null;// 內(nèi)部類
    values = null;// 內(nèi)部類
    descendingMap = null;// 內(nèi)部類
// private static final Object BASE_HEADER = new Object();
// 從注釋給出的圖來看,這個(gè)head應(yīng)該是一直處于第一層的頭節(jié)點(diǎn)
    head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
                              null, null, 1);
}

構(gòu)造函數(shù)好像沒啥,那么接著往下看吧

put()

// 與一般的map一樣,通過put插入鍵值對,,key,value不能為空
public V put(K key, V value) {
    if (value == null)
        throw new NullPointerException();
    return doPut(key, value, false);
}

// doPut
private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;   // 要被添加的Node
    if (key == null)
        throw new NullPointerException();
    // key的比較方法
    Comparator<? super K> cmp = comparator;
    outer: for (;;) { // 因?yàn)閏as操作可能失敗,套了個(gè)無限循環(huán)
    // findPrecessor返回小于key但最接近key的節(jié)點(diǎn),不存在則為鏈表的頭節(jié)點(diǎn),下面也會(huì)介紹
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            if (n != null) {
                Object v; int c;
                Node<K,V> f = n.next;
                // b--->n--->f
                if (n != b.next)
                    break;
                if ((v = n.value) == null) {   // n is deleted
                // 簡單來說就是b.next = f,但是考慮了cas
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n) // b is deleted
                    break;
                // 如果key大于n.key
                if ((c = cpr(cmp, key, n.key)) > 0) {
                // 說明有并發(fā)插入,繼續(xù)向后遍歷一個(gè)節(jié)點(diǎn)
                    b = n;
                    n = f;
                    continue;
                }
                // 如果key相等
                if (c == 0) {
                    // 沒有指定putIfAbsent的話,通過cas替換value并返回新的value
                    // 指定了putIfabsent則返回原有value值
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        return vv;
                    }
                    break; //cas失敗,重試
                }
                // else c < 0;則位置正確,插入就行
            }
            // 創(chuàng)建新的Node節(jié)點(diǎn)
            z = new Node<K,V>(key, value, n);
            // 更新鏈表b->n->f ==> b->z->n->f
            if (!b.casNext(n, z))
                break;     
            break outer;
        }
    }

    int rnd = ThreadLocalRandom.nextSecondarySeed();
    // 0x80000001轉(zhuǎn)換為二進(jìn)制1000...0001
    // 看起來似乎添加層數(shù)是有一定隨機(jī)性的
    // rnd為0xxx...xxx0時(shí)可以進(jìn)入
    if ((rnd & 0x80000001) == 0) { 
        int level = 1, max;
        // 有多少個(gè)1,level遞增多少次
        while (((rnd >>>= 1) & 1) != 0)
            ++level;
        Index<K,V> idx = null;
        HeadIndex<K,V> h = head;
        if (level <= (max = h.level)) {
        // 如果level比現(xiàn)在的層數(shù)小,則在新增加的節(jié)點(diǎn)z上
        // 建立level個(gè)索引節(jié)點(diǎn),忘記了可以回上面看看索引節(jié)點(diǎn)和其他節(jié)點(diǎn)區(qū)別
            for (int i = 1; i <= level; ++i)
                idx = new Index<K,V>(z, idx, null);
        }
        else {
        // 如果level大于層數(shù),則level設(shè)為層數(shù)+1
            level = max + 1; 
            // 構(gòu)造索引節(jié)點(diǎn)數(shù)組
            @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                (Index<K,V>[])new Index<?,?>[level+1];
            //為新建的節(jié)點(diǎn)z創(chuàng)造level個(gè)索引節(jié)點(diǎn)
            //下標(biāo)從1開始
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K,V>(z, idx, null);
            for (;;) {
                h = head;
                int oldLevel = h.level;
            //用于判斷并發(fā)修改,不考慮
            //正確情況下該分支的level>當(dāng)前層數(shù)
                if (level <= oldLevel) // lost race to add level
                    break;
                HeadIndex<K,V> newh = h;
                Node<K,V> oldbase = h.node;
                for (int j = oldLevel+1; j <= level; ++j)
                //為每層生成一個(gè)headIndex
                    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                //更新最上層的headIndex指針
                if (casHead(h, newh)) {
                    h = newh;
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }
        
        splice: for (int insertionLevel = level;;) {
            int j = h.level;
            // h為最新的頭節(jié)點(diǎn)
            for (Index<K,V> q = h, r = q.right, t = idx;;) {
                if (q == null || t == null)
                    break splice;
                // r為前面的idxs[x]
                if (r != null) {
                    Node<K,V> n = r.node;
        // compare before deletion check avoids needing recheck
                    int c = cpr(cmp, key, n.key);
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                // 沒并發(fā)的情況下,idxs數(shù)組里都是新增的key
                // c應(yīng)該=0
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
            // j開始時(shí)為新跳表層數(shù)
            //insertionLevel為舊跳表,經(jīng)過后面的幾個(gè)j--才會(huì)進(jìn)入該分支
                if (j == insertionLevel) {
                    //將t插入q,r之間
                    if (!q.link(r, t))
                        break; // restart
                    if (t.node.value == null) {
                        findNode(key);
                        break splice;
                    }
                    if (--insertionLevel == 0)
                        break splice;
                }

                if (--j >= insertionLevel && j < level)
                    t = t.down;
            //向下一層
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}

// findPredecessor
// 回想一下前面那個(gè)跳表的結(jié)構(gòu),該方法就是根據(jù)key,先從head往右找,然后往下找
//然后再往右再往下,知道找到比key小的節(jié)點(diǎn)
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
    if (key == null)
        throw new NullPointerException();
    for (;;) {
        for (Index<K,V> q = head, r = q.right, d;;) {
        // 從head頭節(jié)點(diǎn)點(diǎn)向后遍歷
            if (r != null) {
                Node<K,V> n = r.node;
                K k = n.key;
                //說明被刪除了,更新q.next
                if (n.value == null) {
                    if (!q.unlink(r))
                        break;           // restart
                    r = q.right;         // reread r
                    continue;
                }
                if (cpr(cmp, key, k) > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }
            // 失敗了就從下一層開始
            if ((d = q.down) == null)
                return q.node;
            q = d;
            r = d.right;
        }
    }
}

put函數(shù)介紹完了,最后一段更新跳表的操作還是有些亂和難以理解...

get()

public V get(Object key) {
    return doGet(key);
}

private V doGet(Object key) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        //findPredecessor找到離key最近的,小于key的node
        // 沒有并發(fā)的情況下,要么是n,要么沒有要找的key
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            if (n == null)
                break outer;
            Node<K,V> f = n.next;
            if (n != b.next)                // inconsistent read
                break;
            if ((v = n.value) == null) {    // n is deleted
                n.helpDelete(b, f);
                break;
            }
            if (b.value == null || v == n)  // b is deleted
                break;
            //key與n.key相同
            if ((c = cpr(cmp, key, n.key)) == 0) {
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
            if (c < 0)
                break outer;
            //并發(fā)導(dǎo)致key>n.key,說明findPredecessor找的位置不對
            //往后位移一個(gè)節(jié)點(diǎn),重新開始
            b = n;
            n = f;
        }
    }
    return null;
}

get方法還是比較簡單的,findPrecessor方法找到的就是小于key但是最接近key的節(jié)點(diǎn),所以key如果存在,必然是findPrecessor找到的節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),而代碼中為了考慮并發(fā)帶來的修改,還要做很多其他的判斷。

remove()

接下來看下刪除的代碼。

public V remove(Object key) {
    return doRemove(key, null);
}

final V doRemove(Object key, Object value) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
    // findPrecessor找到比key小但是最近的node
    //不考慮并發(fā)的話,如果存在key那就是n=b.next
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            if (n == null)
                break outer;
            Node<K,V> f = n.next;
            //考慮一些并發(fā)修改的問題
            if (n != b.next)                    // inconsistent read
                break;
            if ((v = n.value) == null) {        // n is deleted
                n.helpDelete(b, f);
                break;
            }
            if (b.value == null || v == n)      // b is deleted
                break;
            // 如果key<n,說明key不存在,退出最外層循環(huán)
            if ((c = cpr(cmp, key, n.key)) < 0)
                break outer;
            //c>0說明findPre方法找到的節(jié)點(diǎn)過期了,重新找
            if (c > 0) {
                b = n;
                n = f;
                continue;
            }
            // c==0
            if (value != null && !value.equals(v))
                break outer;
            //通過cas將節(jié)點(diǎn)n的value設(shè)為null,就是key對應(yīng)的節(jié)點(diǎn)
            //findPrecessor方法會(huì)在讀到value為null的值時(shí)進(jìn)行刪除
            if (!n.casValue(v, null))
                break;
            //n添加刪除標(biāo)志并且更新b.next指針
            //appendMarker創(chuàng)建一個(gè)key為null,value為自己的節(jié)點(diǎn)
            if (!n.appendMarker(f) || !b.casNext(n, f))
                findNode(key);                  // retry via findNode
            else {
            //刪除成功并且更新b.next后進(jìn)入該分支
            //findPredecessor會(huì)在value==null時(shí),更新next指針,實(shí)現(xiàn)刪除
                findPredecessor(key, cmp);      // clean index
                if (head.right == null)
                    tryReduceLevel();
            }
            @SuppressWarnings("unchecked") V vv = (V)v;
            return vv;
        }
    }
    return null;
}

findPredecessor()

到這里插入、讀取和刪除都介紹完了,這三個(gè)方法都調(diào)用了findPredecessor,可以說這個(gè)方法是跳表的核心了。所以這里拿出來再看一下該方法。

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
    if (key == null)
        throw new NullPointerException(); 
    for (;;) {
        //q為第一層的head節(jié)點(diǎn)
        //r=q.right說明先向右遍歷
        for (Index<K,V> q = head, r = q.right, d;;) {
            //向右遍歷的時(shí)候,右邊的節(jié)點(diǎn)不為空
            if (r != null) {
                Node<K,V> n = r.node;
                K k = n.key;
                //在remove方法中,n.casValue(v, null)將節(jié)點(diǎn)的值置空
                //置空后,在這里進(jìn)行索引節(jié)點(diǎn)的刪除
                if (n.value == null) {
            //unlink操作為將原本的q->r->r.next
            //轉(zhuǎn)換為q->r.next
                    if (!q.unlink(r))
                        break;           // restart
                    r = q.right;         // reread r
                    continue;
                }
                //如果r的節(jié)點(diǎn)沒被刪除,就執(zhí)行到這里
                //如果key大于當(dāng)前的k,則向后移一個(gè)節(jié)點(diǎn)
                //head索引節(jié)點(diǎn)也向后移一個(gè)節(jié)點(diǎn)
                if (cpr(cmp, key, k) > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }
            //如果索引節(jié)點(diǎn)的右邊沒有節(jié)點(diǎn)了,則向下移動(dòng)
            //向下移動(dòng)后如果還是索引層,則繼續(xù)向后
            //如果是節(jié)點(diǎn),直接返回了
            if ((d = q.down) == null)
                return q.node;
            q = d;
            r = d.right;
        }
    }
}

總結(jié)

本文介紹了跳表的特點(diǎn)和使用場景,并對ConcurrentSkipListMap的源碼進(jìn)行了簡略的分析,但由于并發(fā)的存在,使得在理解跳表上增加了一些不必要的難度,同時(shí)其生成索引層的代碼也比較晦澀難懂,可以通過維基百科等更簡單的途徑來了解其原理和源碼。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容