跳表(跳躍表)是一種數(shù)據(jù)結(jié)構(gòu),改進(jìn)自鏈表,用于存儲(chǔ)有序的數(shù)據(jù),跳躍表通過空間換時(shí)間的方法來提高數(shù)據(jù)檢索的速度。早些在學(xué)校的數(shù)據(jù)結(jié)構(gòu)課程中并沒有接觸過跳表。第一次接觸是在了解Redis的有序集合的底層實(shí)現(xiàn)的時(shí)候,畢竟Redis的五種數(shù)據(jù)結(jié)構(gòu)是面試中的常見題,即使沒有實(shí)際使用過,也需要提前去了解一下。
但是最近在工作中不一樣了,由于是從事游戲開發(fā),經(jīng)常會(huì)有排行榜的需求需要實(shí)現(xiàn),而這大多數(shù)通過跳表來實(shí)現(xiàn),所以有必要來深入了解一下跳表并簡略的閱讀一下ConcurrentSkipListMap的源碼。前面也提到了Redis中的有序集合底層就是跳表,所以其也是開發(fā)排行榜功能時(shí)可以考慮使用的。有時(shí)候呢,我們需要排序的東西不需要持久化或者為了更高的效率,是直接在內(nèi)存中進(jìn)行排序,畢竟訪問redis是有網(wǎng)絡(luò)io開銷的,那么使用到的數(shù)據(jù)結(jié)構(gòu)就是jdk中的ConcurrentSkipListMap(支持同步)或者TreeMap(不支持同步,性能更好).
不過這次源碼重點(diǎn)在于跳躍表,不在于并發(fā)的控制。
跳躍表
通過上面的介紹,我們了解到跳躍表的應(yīng)用場景大概是這樣的:
- 有序
- 頻繁插入刪除
- 頻繁的查找
對于有序來說,數(shù)組和普通鏈表都可以通過排序算法來實(shí)現(xiàn),在排序復(fù)雜度上不相上下。鏈表在插入和刪除上性能較好,數(shù)組在查找上性能較好,所以都不能滿足我們的要求。跳躍表則是在插入刪除和查找的性能上做了折中,復(fù)雜度為log(n)。
跳表結(jié)構(gòu)如下所示:

為了更好的支持插入和刪除,所以采用鏈表的形式,可以看到圖片中最下面一行是一個(gè)有序的鏈表。但如果只是一個(gè)單一的鏈表,查找時(shí)復(fù)雜度為O(n),性能太差。如何優(yōu)化呢?
在有序數(shù)組中,我們查找時(shí)用的是二分查找,一次查找可以排除一半元素的遍歷。在數(shù)組中之所以可以用二分查找,是因?yàn)槲覀兡軌蚩焖俚囊設(shè)(1)的復(fù)雜度定位到中間的位置,但是鏈表只能是O(N)。所以跳躍表采取空間換時(shí)間的方式,既然你找不到中間點(diǎn),或者三分之一點(diǎn)等中間位置,那么我可以通過多增加一個(gè)節(jié)點(diǎn)來指向中間位置,這樣你也能夠快速的定位到中間的位置,然后一定程度的減少你遍歷元素的個(gè)數(shù),提高效率。圖中有多層,相鄰的兩層,采用的都是這樣的思想。
這個(gè)圖一目了然,很容易就可以讓大家了解跳表的思想。至于我們應(yīng)該添加多少層額外的鏈表,給什么位置的節(jié)點(diǎn)添加索引才能更好的優(yōu)化檢索和插入的效率,就是我希望通過閱讀源碼找尋的.
節(jié)點(diǎn)
通過上圖,可以發(fā)現(xiàn)有兩種節(jié)點(diǎn)類型,第一種是最下層的,與普通鏈表相似,第二種是除了最后一層以外的其他索引層節(jié)點(diǎn),有兩個(gè)指針。
但是在jdk源碼中還存在一種節(jié)點(diǎn),是索引層的頭節(jié)點(diǎn),還維護(hù)了其層數(shù)信息,下面先給出源碼注釋中的跳表樣例。
* Head nodes Index nodes
* +-+ right +-+ +-+
* |2|---------------->| |--------------------->| |->null
* +-+ +-+ +-+
* | down | |
* v v v
* +-+ +-+ +-+ +-+ +-+ +-+
* |1|----------->| |->| |------>| |----------->| |------>| |->null
* +-+ +-+ +-+ +-+ +-+ +-+
* v | | | | |
* Nodes next v v v v v
* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+
* | |->|A|->|B|->|C|->|D|->|E|->|F|->|G|->|H|->|I|->|J|->|K|->null
* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+
*
接著我們來分別看下它們在源碼中是如何定義的。
// 記住了,只關(guān)注跳表,不去關(guān)注并發(fā)?。?!水平有限,不誤導(dǎo)大家
// Node是最底層的鏈表的節(jié)點(diǎn),包括鍵值對和指向下一個(gè)節(jié)點(diǎn)的指針
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;
// 至于為什么需要兩個(gè)構(gòu)造函數(shù),后面源碼會(huì)有解釋
Node(K key, Object value, Node<K,V> next) {
this.key = key;
this.value = value;
this.next = next;
}
Node(Node<K,V> next) {
this.key = null;
this.value = this;
this.next = next;
}
// ...配套method
}
// 索引節(jié)點(diǎn)結(jié)構(gòu)
// 存儲(chǔ)了兩個(gè)指針,分別指向右邊和下邊的節(jié)點(diǎn)
// 索引節(jié)點(diǎn)的value為鏈表節(jié)點(diǎn)
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
this.node = node;
this.down = down;
this.right = right;
}
// ...配套method
}
// 索引層的頭節(jié)點(diǎn)結(jié)構(gòu)
// 在索引節(jié)點(diǎn)的基礎(chǔ)上添加了表示層數(shù)的level變量
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
源碼閱讀小技巧
JDK中集合包下的代碼,一個(gè)集合差不多有幾千行,想要從上往下逐行看完是不現(xiàn)實(shí)的,需要能快速的定位到我們想看的方法。如果是HashMap這種常用的數(shù)據(jù)結(jié)構(gòu),我們經(jīng)常使用,對它有哪些方法非常的了解,就可以通過搜索方法名字來定位代碼的位置。但是像今天這種ConcurrentSkipListMap比較不常用的類,在不了解它大多數(shù)方法的時(shí)候,可以通過idea提供的功能幫我們快速定位。打開下圖中的show members就可以讓類現(xiàn)實(shí)它所有的方法和內(nèi)部類,就可以靠猜名字快速定位到源碼的位置。
[圖片上傳失敗...(image-61db2d-1568213866130)]
構(gòu)造函數(shù)
了解完內(nèi)部鏈表節(jié)點(diǎn)的實(shí)現(xiàn)情況,現(xiàn)在按照老規(guī)矩,從構(gòu)造函數(shù)開始閱讀源碼。
// Compartor接口用來指定key的排序規(guī)則
public ConcurrentSkipListMap() {
this.comparator = null;
initialize();
}
public ConcurrentSkipListMap(Comparator<? super K> comparator) {
this.comparator = comparator;
initialize();
}
// 還有兩個(gè)傳入map和sortedMap的構(gòu)造函數(shù)
private void initialize() {
keySet = null;// 內(nèi)部類
entrySet = null;// 內(nèi)部類
values = null;// 內(nèi)部類
descendingMap = null;// 內(nèi)部類
// private static final Object BASE_HEADER = new Object();
// 從注釋給出的圖來看,這個(gè)head應(yīng)該是一直處于第一層的頭節(jié)點(diǎn)
head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
null, null, 1);
}
構(gòu)造函數(shù)好像沒啥,那么接著往下看吧
put()
// 與一般的map一樣,通過put插入鍵值對,,key,value不能為空
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
// doPut
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // 要被添加的Node
if (key == null)
throw new NullPointerException();
// key的比較方法
Comparator<? super K> cmp = comparator;
outer: for (;;) { // 因?yàn)閏as操作可能失敗,套了個(gè)無限循環(huán)
// findPrecessor返回小于key但最接近key的節(jié)點(diǎn),不存在則為鏈表的頭節(jié)點(diǎn),下面也會(huì)介紹
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
// b--->n--->f
if (n != b.next)
break;
if ((v = n.value) == null) { // n is deleted
// 簡單來說就是b.next = f,但是考慮了cas
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
// 如果key大于n.key
if ((c = cpr(cmp, key, n.key)) > 0) {
// 說明有并發(fā)插入,繼續(xù)向后遍歷一個(gè)節(jié)點(diǎn)
b = n;
n = f;
continue;
}
// 如果key相等
if (c == 0) {
// 沒有指定putIfAbsent的話,通過cas替換value并返回新的value
// 指定了putIfabsent則返回原有value值
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; //cas失敗,重試
}
// else c < 0;則位置正確,插入就行
}
// 創(chuàng)建新的Node節(jié)點(diǎn)
z = new Node<K,V>(key, value, n);
// 更新鏈表b->n->f ==> b->z->n->f
if (!b.casNext(n, z))
break;
break outer;
}
}
int rnd = ThreadLocalRandom.nextSecondarySeed();
// 0x80000001轉(zhuǎn)換為二進(jìn)制1000...0001
// 看起來似乎添加層數(shù)是有一定隨機(jī)性的
// rnd為0xxx...xxx0時(shí)可以進(jìn)入
if ((rnd & 0x80000001) == 0) {
int level = 1, max;
// 有多少個(gè)1,level遞增多少次
while (((rnd >>>= 1) & 1) != 0)
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
if (level <= (max = h.level)) {
// 如果level比現(xiàn)在的層數(shù)小,則在新增加的節(jié)點(diǎn)z上
// 建立level個(gè)索引節(jié)點(diǎn),忘記了可以回上面看看索引節(jié)點(diǎn)和其他節(jié)點(diǎn)區(qū)別
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
else {
// 如果level大于層數(shù),則level設(shè)為層數(shù)+1
level = max + 1;
// 構(gòu)造索引節(jié)點(diǎn)數(shù)組
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
//為新建的節(jié)點(diǎn)z創(chuàng)造level個(gè)索引節(jié)點(diǎn)
//下標(biāo)從1開始
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
//用于判斷并發(fā)修改,不考慮
//正確情況下該分支的level>當(dāng)前層數(shù)
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
for (int j = oldLevel+1; j <= level; ++j)
//為每層生成一個(gè)headIndex
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
//更新最上層的headIndex指針
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
splice: for (int insertionLevel = level;;) {
int j = h.level;
// h為最新的頭節(jié)點(diǎn)
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
// r為前面的idxs[x]
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
// 沒并發(fā)的情況下,idxs數(shù)組里都是新增的key
// c應(yīng)該=0
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
// j開始時(shí)為新跳表層數(shù)
//insertionLevel為舊跳表,經(jīng)過后面的幾個(gè)j--才會(huì)進(jìn)入該分支
if (j == insertionLevel) {
//將t插入q,r之間
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
if (--insertionLevel == 0)
break splice;
}
if (--j >= insertionLevel && j < level)
t = t.down;
//向下一層
q = q.down;
r = q.right;
}
}
}
return null;
}
// findPredecessor
// 回想一下前面那個(gè)跳表的結(jié)構(gòu),該方法就是根據(jù)key,先從head往右找,然后往下找
//然后再往右再往下,知道找到比key小的節(jié)點(diǎn)
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException();
for (;;) {
for (Index<K,V> q = head, r = q.right, d;;) {
// 從head頭節(jié)點(diǎn)點(diǎn)向后遍歷
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
//說明被刪除了,更新q.next
if (n.value == null) {
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
// 失敗了就從下一層開始
if ((d = q.down) == null)
return q.node;
q = d;
r = d.right;
}
}
}
put函數(shù)介紹完了,最后一段更新跳表的操作還是有些亂和難以理解...
get()
public V get(Object key) {
return doGet(key);
}
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//findPredecessor找到離key最近的,小于key的node
// 沒有并發(fā)的情況下,要么是n,要么沒有要找的key
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
//key與n.key相同
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
if (c < 0)
break outer;
//并發(fā)導(dǎo)致key>n.key,說明findPredecessor找的位置不對
//往后位移一個(gè)節(jié)點(diǎn),重新開始
b = n;
n = f;
}
}
return null;
}
get方法還是比較簡單的,findPrecessor方法找到的就是小于key但是最接近key的節(jié)點(diǎn),所以key如果存在,必然是findPrecessor找到的節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),而代碼中為了考慮并發(fā)帶來的修改,還要做很多其他的判斷。
remove()
接下來看下刪除的代碼。
public V remove(Object key) {
return doRemove(key, null);
}
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
// findPrecessor找到比key小但是最近的node
//不考慮并發(fā)的話,如果存在key那就是n=b.next
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
//考慮一些并發(fā)修改的問題
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
// 如果key<n,說明key不存在,退出最外層循環(huán)
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
//c>0說明findPre方法找到的節(jié)點(diǎn)過期了,重新找
if (c > 0) {
b = n;
n = f;
continue;
}
// c==0
if (value != null && !value.equals(v))
break outer;
//通過cas將節(jié)點(diǎn)n的value設(shè)為null,就是key對應(yīng)的節(jié)點(diǎn)
//findPrecessor方法會(huì)在讀到value為null的值時(shí)進(jìn)行刪除
if (!n.casValue(v, null))
break;
//n添加刪除標(biāo)志并且更新b.next指針
//appendMarker創(chuàng)建一個(gè)key為null,value為自己的節(jié)點(diǎn)
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // retry via findNode
else {
//刪除成功并且更新b.next后進(jìn)入該分支
//findPredecessor會(huì)在value==null時(shí),更新next指針,實(shí)現(xiàn)刪除
findPredecessor(key, cmp); // clean index
if (head.right == null)
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
findPredecessor()
到這里插入、讀取和刪除都介紹完了,這三個(gè)方法都調(diào)用了findPredecessor,可以說這個(gè)方法是跳表的核心了。所以這里拿出來再看一下該方法。
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException();
for (;;) {
//q為第一層的head節(jié)點(diǎn)
//r=q.right說明先向右遍歷
for (Index<K,V> q = head, r = q.right, d;;) {
//向右遍歷的時(shí)候,右邊的節(jié)點(diǎn)不為空
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
//在remove方法中,n.casValue(v, null)將節(jié)點(diǎn)的值置空
//置空后,在這里進(jìn)行索引節(jié)點(diǎn)的刪除
if (n.value == null) {
//unlink操作為將原本的q->r->r.next
//轉(zhuǎn)換為q->r.next
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
//如果r的節(jié)點(diǎn)沒被刪除,就執(zhí)行到這里
//如果key大于當(dāng)前的k,則向后移一個(gè)節(jié)點(diǎn)
//head索引節(jié)點(diǎn)也向后移一個(gè)節(jié)點(diǎn)
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
//如果索引節(jié)點(diǎn)的右邊沒有節(jié)點(diǎn)了,則向下移動(dòng)
//向下移動(dòng)后如果還是索引層,則繼續(xù)向后
//如果是節(jié)點(diǎn),直接返回了
if ((d = q.down) == null)
return q.node;
q = d;
r = d.right;
}
}
}
總結(jié)
本文介紹了跳表的特點(diǎn)和使用場景,并對ConcurrentSkipListMap的源碼進(jìn)行了簡略的分析,但由于并發(fā)的存在,使得在理解跳表上增加了一些不必要的難度,同時(shí)其生成索引層的代碼也比較晦澀難懂,可以通過維基百科等更簡單的途徑來了解其原理和源碼。