ThreadLocal類原理簡析——線程封閉的常規(guī)實(shí)現(xiàn)

一、簡介

并發(fā)編程中,當(dāng)訪問共享數(shù)據(jù)時(shí),通常需要使用同步技術(shù)。但如果數(shù)據(jù)不發(fā)布(逸出)到線程以外,僅僅在單線程中被訪問和操作,就無需同步。實(shí)現(xiàn)這一目的的技術(shù)被稱作線程封閉(Thread Confinement)。ThreadLocal類正是實(shí)現(xiàn)線程封閉性的一種規(guī)范途徑。好比現(xiàn)在人手一張公交卡,乘坐公交地鐵時(shí)從口袋里掏出卡刷下就能上下車。公家卡都是保存在每個(gè)人自己身上的,而不是統(tǒng)一存在一個(gè)寄存柜里。否則每次坐公交都要先去寄存柜拿卡,用完后還要換回去,效率之慢可想而知。

從應(yīng)用上講,之前實(shí)習(xí)的時(shí)候給一個(gè)Web項(xiàng)目實(shí)現(xiàn)了一個(gè)“日志追蹤”小功能,即在前臺(tái)向后臺(tái)發(fā)送的每一個(gè)Session都會(huì)帶上一個(gè)UUID,該Session向后臺(tái)發(fā)起的所有操作(讀寫數(shù)據(jù)庫、轉(zhuǎn)發(fā)等)都會(huì)攜帶上這樣的一個(gè)UUID號(hào),并且會(huì)輸出到日志文件中。其實(shí)現(xiàn)原理用的是slf4j的MDC接口,底層使用的就是泛型為Map<String, String>的ThreadLocal類。

下面代碼給出一個(gè)使用ThreadLocal類的示例。

package com.qyz.thread;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class ThreadLocalDemo {
    // ThreadLocal一般權(quán)限聲明為private static
    private static final ThreadLocal<Person> aPerson = new ThreadLocal<Person>() {

        // 默認(rèn)返回null,在此override初始化方法
        @Override
        protected Person initialValue() {
            return new Person("A Baby", 0);
        }
    }; 
    
    Random rand = new Random();
    
    // 輔助演示的javabean
    static class Person {
        String name;
        
        int age;
        
        Person(String name, int age)
        {
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {

            return this.name + "-" + this.age;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }
        
    }
    
    // 輔助演示的Runnable實(shí)現(xiàn)類
    class MyTask implements Runnable {

        private boolean needSet;
        
        MyTask (boolean needSet) {
            this.needSet = needSet;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " Start......");
            // 休眠3s
            try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {}
            
            if (needSet)
                // 向ThreadLocal設(shè)置一個(gè)值
                aPerson.set(new Person(Thread.currentThread().getName(), rand.nextInt(60)));
                
            System.out.println("Thread Name: " + Thread.currentThread().getName()
                    + "This thread's ThreadLocal: " + aPerson.get().toString());
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        ThreadLocalDemo foo = new ThreadLocalDemo();
        Thread t1 = new Thread(foo.new MyTask(true), "Henry");
        Thread t2 = new Thread(foo.new MyTask(true), "Cevin");
        // 該線程不設(shè)置ThreadLocal,直接使用初始值
        Thread t3 = new Thread(foo.new MyTask(false), "Jessica");
        t1.start();
        t2.start();
        t3.start();
    }
} /* Output:
Jessica Start......
Henry Start......
Cevin Start......
This thread's ThreadLocal: A Baby-0
This thread's ThreadLocal: Cevin-24
This thread's ThreadLocal: Henry-12
*///:~

根據(jù)API文檔建議,demo類中聲明了一個(gè)ThreadLocal<Person>類型的靜態(tài)成員,雖然只有一個(gè)全局對象,但t1,t2,t3線程分別操作了一個(gè)單獨(dú)的Person類實(shí)例,在線程內(nèi)部對該實(shí)例的操作都是線程隔離的,不會(huì)作用到其他的線程中。就好像新建線程時(shí),給每個(gè)線程都封裝了一份數(shù)據(jù)的副本。這也是ThreadLocal對象的目的:用于防止對可變的單實(shí)例變量或全局變量進(jìn)行共享。但請記住,無論多少個(gè)線程,ThreadLocal<Person> aPerson對象只有一份。

二、實(shí)現(xiàn)原理簡析

以上代碼展示了ThreadLocal類最基本的使用方法:

  1. 初始化一個(gè)ThreadLocal類靜態(tài)成員,根據(jù)需求override其初始化方法initialValue,設(shè)置初始值。
  2. 使用set方法設(shè)置當(dāng)前線程的ThreadLocal新值。
  3. 使用get獲得當(dāng)前線程的該ThreadLocal的值。
    除此之外,ThreadLocal類還有remove方法用做清除變量功能,以及一個(gè)來自Java1.8的withInitial靜態(tài)方法。
    ThreadLocal所有的API如下表:
ThreadLocal的API

既然是每個(gè)線程都具備一個(gè)數(shù)據(jù)副本,那么使用Thread對象作為key,數(shù)據(jù)為value,將二者作為鍵值對存入一個(gè)Map集合中,不就可以實(shí)現(xiàn)線程封閉的功能了嗎?我確實(shí)看過有人這么實(shí)現(xiàn)過,功能上確實(shí)和ThreadLocal如出一轍。但如果深入思考一下,你就會(huì)發(fā)現(xiàn)此時(shí)Map一直持有一個(gè)Thread實(shí)例的引用,Thread對象的生命周期就受到Map的影響,如果Map不及時(shí)清理運(yùn)行完成的Thread對象,就會(huì)造成內(nèi)存泄露。

當(dāng)然,硬要使用上述方式實(shí)現(xiàn)一個(gè)類似ThreadLocal的類也不是不可能,但是既然JDK已經(jīng)提供了這樣一個(gè)質(zhì)優(yōu)價(jià)廉的ThreadLocal類,何不欣然接受,深入學(xué)習(xí)之呢?

2.1 ThreadLocal的get、set和remove

/**
 * Returns the value in the current thread's copy of this
 * thread-local variable.  If the variable has no value for the
 * current thread, it is first initialized to the value returned
 * by an invocation of the {@link #initialValue} method.
 *
 * @return the current thread's value of this thread-local
 */
public T get() {
    Thread t = Thread.currentThread();
    // 獲取當(dāng)前線程的ThreadLocalMap對象
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        // 傳入ThreadLocal對象,獲得一個(gè)Entry節(jié)點(diǎn)
        ThreadLocalMap.Entry e = map.getEntry(this);
        // 非空判斷
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    // 進(jìn)行初始化并返回初始值
    return setInitialValue();
}

相比于之前所述使用Thread對象作為key值獲取線程隔離數(shù)據(jù)的方式,ThreadLocal反其道而行,以其自身為key查找存儲(chǔ)在ThreadLocalMap數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)。

    // Thread源碼180~182行
    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;

每個(gè)Thread對象都聲明了一個(gè)ThreadLocalMap對象引用,這個(gè)類似于Map數(shù)據(jù)結(jié)構(gòu)的對象是專門為ThreadLocal類定制的。


/**
 * Variant of set() to establish initialValue. Used instead
 * of set() in case user has overridden the set() method.
 *
 * @return the initial value
 */
private T setInitialValue() {
    // 獲取初始值
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        // 以當(dāng)前ThreadLocal對象為key,初始值為value存入ThreadLocalMap中
        map.set(this, value);
    else
        // 若當(dāng)前線程中的ThreadLocalMap為空,則初始化。
        createMap(t, value);
    return value;
}

/**
 * Create the map associated with a ThreadLocal. Overridden in
 * InheritableThreadLocal.
 *
 * @param t the current thread
 * @param firstValue value for the initial entry of the map
 */
void createMap(Thread t, T firstValue) {
    // 初始化ThreadLocalMap成員
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

ThreadLocal的get方法首先獲取當(dāng)前線程中的ThreadLocalMap成員對象,判空為真時(shí),則調(diào)用setInitialValue初始化ThreadLocalMap,這種懶漢初始化的方式提高了內(nèi)存效率。

以上源碼中的注釋提及了InheritableThreadLocal,說有些方法在InheritableThreadLocal中被重寫了,看看就知道,只是把t.threadLocals改成了t.InheritableThreadLocals。這是個(gè)可實(shí)現(xiàn)“線程繼承”的ThreadLocal類,后面還會(huì)介紹。

/**
 * Sets the current thread's copy of this thread-local variable
 * to the specified value.  Most subclasses will have no need to
 * override this method, relying solely on the {@link #initialValue}
 * method to set the values of thread-locals.
 *
 * @param value the value to be stored in the current thread's copy of
 *        this thread-local.
 */
public void set(T value) {
    // 獲得當(dāng)前線程的ThreadLocalMap
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    // 調(diào)用ThreadLocalMap的方法進(jìn)行set。
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

set方法與setInitialValue類似,參考以上代碼。

/**
 * Removes the current thread's value for this thread-local
 * variable.  If this thread-local variable is subsequently
 * {@linkplain #get read} by the current thread, its value will be
 * reinitialized by invoking its {@link #initialValue} method,
 * unless its value is {@linkplain #set set} by the current thread
 * in the interim.  This may result in multiple invocations of the
 * {@code initialValue} method in the current thread.
 *
 * @since 1.5
 */
 public void remove() {
     // 刪除方法,調(diào)用ThreadLocalMap的接口
     ThreadLocalMap m = getMap(Thread.currentThread());
     if (m != null)
         m.remove(this);
 }

remove方法也少的可憐,一個(gè)判空后就調(diào)用ThreadLocalMap的remove方法了。

ThreadLocal的三個(gè)最常用的接口寥寥幾行也就寫完了,但不難看出其內(nèi)部實(shí)現(xiàn)的真正關(guān)鍵在于ThreadLocalMap類。

三、ThreadLocalMap—線性探測法hash表的最佳實(shí)踐。

圖3.1 ThreadLocal的數(shù)據(jù)存儲(chǔ)結(jié)構(gòu)

3.1 ThreadLocalMap簡析

ThreadLocalMap是ThreadLocal的靜態(tài)內(nèi)部類,其內(nèi)部維護(hù)著一個(gè)Entry數(shù)組做散列表來存儲(chǔ)數(shù)據(jù)(后面會(huì)根據(jù)源碼詳細(xì)講解)。此Entry非Map.Entry,而是一個(gè)WeakReference的子類。WeakReference是Java中弱引用的實(shí)現(xiàn)類,一個(gè)對象如果僅僅被弱引用指向,則在最近一次gc時(shí)就會(huì)被回收掉。(有關(guān)“強(qiáng),軟,弱,虛”引用的更多知識(shí)可以查閱《深入理解Java虛擬機(jī)》第65頁)

以目前我對ThreadLocal的理解來看,之所以使用弱引用,主要是為了以下兩點(diǎn):

  1. 無強(qiáng)引用指向ThreadLocal實(shí)例時(shí),則該實(shí)例自覺被垃圾回收器回收。此時(shí)也說明任何線程都不會(huì)再使用到該ThreadLocal所存儲(chǔ)的數(shù)據(jù),配合第二點(diǎn)實(shí)現(xiàn)ThreadLocalMap自身的垃圾回收機(jī)制。
  2. ThreadLocalMap的expungeStaleEntry方法會(huì)清除key為null的Entry數(shù)據(jù),而導(dǎo)致Key為null的原因要不就是ThreadLocalMap的remove方法調(diào)用了WeakReference的clear方法,斷開了弱引用(可以想象成斷開上圖中的虛線),要不就是因?yàn)榈谝稽c(diǎn)所述的無強(qiáng)引用指向并發(fā)生GC。

請注意:根據(jù)API文檔中對ThreadLocal類建議的用法,一般將其聲明稱類成員。因此,只要包含ThreadLocal的外部類被被JVM卸載,該ThreadLocal對象將一直被一個(gè)強(qiáng)引用指向,因此也不會(huì)被GC。ThreadLocalMap的清理機(jī)制主要是清理線程隔離的數(shù)據(jù),并“騰出”散列表的空位。

由此可見,使用WeakReference是為了配合實(shí)現(xiàn)ThreadLocalMap自己的垃圾清理機(jī)制,防止內(nèi)存泄露。

3.2 源碼走讀

3.2.1 散列表節(jié)點(diǎn)——Entry

/**
 * The entries in this hash map extend WeakReference, using
 * its main ref field as the key (which is always a
 * ThreadLocal object).  Note that null keys (i.e. entry.get()
 * == null) mean that the key is no longer referenced, so the
 * entry can be expunged from table.  Such entries are referred to
 * as "stale entries" in the code that follows.
 * 
 * 散列表中的節(jié)點(diǎn)類。繼承自WeakReference,并擴(kuò)展了一個(gè)Object對象用來
 * 保存線程隔離的數(shù)據(jù)。若一個(gè)entry的指向ThreadLocal實(shí)例的引用為空,
 * 說明變成了stale entry,該entry需要被清理。
 */
static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

Entry類是內(nèi)部散列表的節(jié)點(diǎn),繼承自WeakReference類,并擴(kuò)展了一個(gè)Object成員保存線程隔離的數(shù)據(jù),而弱引用則指向ThreadLocal對象。

3.2.2 線性探測hash表

/**
 * The initial capacity -- MUST be a power of two.
 * 
 * 初始化容量
 */
private static final int INITIAL_CAPACITY = 16;

/**
 * The table, resized as necessary.
 * table.length MUST always be a power of two.
 * 
 * 散列表,表的長度必須為2的冪
 */
private Entry[] table;

/**
 * The number of entries in the table.
 * 
 * 散列表中的Entry個(gè)數(shù)
 */
private int size = 0;

/**
 * The next size value at which to resize.
 */
private int threshold; // Default to 0

/**
 * Set the resize threshold to maintain at worst a 2/3 load factor.
 * 
 * 擴(kuò)容閾值為散列表長的三分之二
 */
private void setThreshold(int len) {
    threshold = len * 2 / 3;
}

/**
 * Increment i modulo len.
 * 環(huán)形后移
 */
private static int nextIndex(int i, int len) {
    return ((i + 1 < len) ? i + 1 : 0);
}

/**
 * Decrement i modulo len.
 * 
 * 環(huán)形前移
 */
private static int prevIndex(int i, int len) {
    return ((i - 1 >= 0) ? i - 1 : len - 1);
}

源碼中提及散列表的長度必須是2的冪。這樣設(shè)置是為了在散列ThreadLocal對象時(shí)能夠盡量均勻。這里需要回顧下ThreadLocal的一段有關(guān)獨(dú)有hashcode計(jì)算的源碼。

public class ThreadLocal<T> {
    /**
     * ThreadLocals rely on per-thread linear-probe hash maps attached
     * to each thread (Thread.threadLocals and
     * inheritableThreadLocals).  The ThreadLocal objects act as keys,
     * searched via threadLocalHashCode.  This is a custom hash code
     * (useful only within ThreadLocalMaps) that eliminates collisions
     * in the common case where consecutively constructed ThreadLocals
     * are used by the same threads, while remaining well-behaved in
     * less common cases.
     * 
     * 以ThreadLocal為key,將線程隔離的數(shù)據(jù)存儲(chǔ)在每個(gè)線程私有的線性探測散列表中。
     * 這個(gè)threadLocalHashCode僅僅用于Thread.ThreadLocalMaps中
     */
    private final int threadLocalHashCode = nextHashCode();

    /**
     * The next hash code to be given out. Updated atomically. Starts at
     * zero.
     * 
     * 從0開始,保存下一個(gè)ThreadLocal對象的hashcode
     */
    private static AtomicInteger nextHashCode =
        new AtomicInteger();

    /**
     * The difference between successively generated hash codes - turns
     * implicit sequential thread-local IDs into near-optimally spread
     * multiplicative hash values for power-of-two-sized tables.
     * 
     * ThreadLocal對象hashcode計(jì)算的增量,使用該值能夠在表長為2的冪的散列表中
     * 均勻分布節(jié)點(diǎn)。
     */
    private static final int HASH_INCREMENT = 0x61c88647;

    /**
     * Returns the next hash code.
     * 
     * 計(jì)算下一個(gè)對象的hashcode
     */
    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }

//===============ThreadLocalMap構(gòu)造方法============================
/**
 * Construct a new map initially containing (firstKey, firstValue).
 * ThreadLocalMaps are constructed lazily, so we only create
 * one when we have at least one entry to put in it.
 * 
 * ThreadLocalMap的構(gòu)造是懶漢式,直到有一個(gè)ThreadLocal被使用,才會(huì)初始化。
 */
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    // 初始化各個(gè)成員
    table = new Entry[INITIAL_CAPACITY];
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    setThreshold(INITIAL_CAPACITY);
}

以上代碼說明了ThreadLocal對象在ThreadLocalMap中的散列方式。新建的ThreadLocal對象都以前一個(gè)新建ThreadLocal對象的threadLocalHashCode + 0x61c88647作為為自己的hashcode,初始值為0。實(shí)踐證明,用0x61c88647的累加值與2的整數(shù)冪取模,可以得到一個(gè)范圍內(nèi)近似均勻分布的序列。由于使用了線性探測法,散列表在邏輯上是一個(gè)環(huán)形結(jié)構(gòu)(從nextIndex和prevIndex方法也可以看出)。均勻分布的好處在于當(dāng)發(fā)生hash沖突是,很快就能探測到下一個(gè)臨近的可用位置。而對2的n次冪取模,則可以用& (2^n - 1)代替% (2^n)。因此,才有了散列表的容量必須是2的整數(shù)冪的限制。

有關(guān)下文中用到的一些名詞,在這里做些約定。


圖3.2 環(huán)形散列表結(jié)構(gòu)圖
  1. slot:散列表中的一個(gè)位置(節(jié)點(diǎn))。
  2. full entry和full slot:表示full slot是散列表中一個(gè)索引位置。該位置上的Entry稱為full entry,其所含的弱引用指向不為null。
  3. stale entry和stale slot:stale entry是散列表中stale slot位置的Entry對象,該Entry所含弱引用指向?yàn)閚ull。下文中的清理stale entry或者清理stale slot都是意為斷開該Entry對象的所有引用,輔助GC,并騰空stale slot位置。
  4. null slot:散列表的null slot位置為null,可用于設(shè)置新的Entry。
  5. run:散列表中任意兩個(gè)null slot之間的一段,不包括兩端。

3.2.3 ThreadLocalMap的get和set

/**
 * Get the entry associated with key.  This method
 * itself handles only the fast path: a direct hit of existing
 * key. It otherwise relays to getEntryAfterMiss.  This is
 * designed to maximize performance for direct hits, in part
 * by making this method readily inlinable.
 *  
 * 先用hashcode對表長取模計(jì)算索引,此為直接命中。
 * 如果正好命中了,則返回,否則調(diào)用getEntryAfterMiss.
 *
 * @param  key the thread local object
 * @return the entry associated with key, or null if no such
 */
private Entry getEntry(ThreadLocal<?> key) {
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key)
        return e;
    else
        return getEntryAfterMiss(key, i, e);
}

/**
 * Version of getEntry method for use when key is not found in
 * its direct hash slot.
 * 
 * 直接命中失敗后調(diào)用此方法
 * 
 * @param  key the thread local object
 * @param  i the table index for key's hash code
 * @param  e the entry at table[i]
 * @return the entry associated with key, or null if no such
 */
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;
    // 使用線性探測法嘗試命中
    while (e != null) {
        ThreadLocal<?> k = e.get();
        // 命中,則返回
        if (k == key)
            return e;
        // 找到個(gè)stale entry,進(jìn)行一次連續(xù)段清理
        if (k == null)
            expungeStaleEntry(i);
        else
            // 向后移動(dòng)
            i = nextIndex(i, len);
        e = tab[i];
    }
    // 走到這一步,說明不存在該ThreadLocal。
    return null;
}

以上代碼使用線性探測法嘗試在散列表中命中Entry。在getEntryAfterMiss方法中,當(dāng)判斷當(dāng)前位置上的Entry的弱引用指向?yàn)榭諘r(shí),則調(diào)用expungeStaleEntry方法進(jìn)行垃圾清理。expungeStaleEntry是ThreadLocalMap的核心算法。

    private void set(ThreadLocal<?> key, Object value) {
        // We don't use a fast path as with get() because it is at
        // least as common to use set() to create new entries as
        // it is to replace existing ones, in which case, a fast
        // path would fail more often than not.
        // 以上注釋大致含義:
        /*
         * 以上注釋含義:set方法不像get方法這樣使用直接命中方式設(shè)置Entry,
         * 因?yàn)槭状螄L試命中在線性探測散列表中經(jīng)常失敗。
         * set嘗試使用一種“替換已經(jīng)存在”策略來設(shè)置Entry,而不是直接new一個(gè)。
         */
        Entry[] tab = table;
        int len = tab.length;
        // 計(jì)算散列表索引
        int i = key.threadLocalHashCode & (len-1);
        // 線性探測查找,不為null slot 則一直向下找。
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            ThreadLocal<?> k = e.get();
            if (k == key) {
                // 命中,將當(dāng)前Entry的value進(jìn)行設(shè)置
                e.value = value;
                return;
            }
            if (k == null) {
                // 如果當(dāng)前位置Entry的弱引用指向?yàn)榭?,則替換該Entry
                replaceStaleEntry(key, value, i);
                return;
            }
        }
        // 找到個(gè)null slot,則以輸入?yún)?shù)創(chuàng)建一個(gè)新的Entry,并放置進(jìn)散列表中
        tab[i] = new Entry(key, value);
        int sz = ++size;
        // 嘗試一次啟發(fā)式清理,如果沒有清理任何Entry并且個(gè)數(shù)超出閾值,
        // 則rehash.
        if (!cleanSomeSlots(i, sz) && sz >= threshold)
            rehash();
    }
    /**
     * Replace a stale entry encountered during a set operation
     * with an entry for the specified key.  The value passed in
     * the value parameter is stored in the entry, whether or not
     * an entry already exists for the specified key.
     *
     * As a side effect, this method expunges all stale entries in the
     * "run" containing the stale entry.  (A run is a sequence of entries
     * between two null slots.)
     *
     * @param  key the key
     * @param  value the value to be associated with key
     * @param  staleSlot index of the first stale entry encountered while
     *         searching for key.
     */
    // 替換策略,進(jìn)入這個(gè)方法,staleSlot位置上肯定會(huì)變成full slot
    private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                   int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;
        Entry e;

        // Back up to check for prior stale entry in current run.
        // We clean out whole runs at a time to avoid continual
        // incremental rehashing due to garbage collector freeing
        // up refs in bunches (i.e., whenever the collector runs).
                    
        // 往前遍歷,找到到該slot所在run的首個(gè)stale slot位置,設(shè)置為清理起點(diǎn)
        int slotToExpunge = staleSlot; // 清理起點(diǎn)
        for (int i = prevIndex(staleSlot, len);
             (e = tab[i]) != null;
             i = prevIndex(i, len))
            if (e.get() == null)
                slotToExpunge = i;

        // Find either the key or trailing null slot of run, whichever
        // occurs first
        // 向后遍歷
        for (int i = nextIndex(staleSlot, len);
             (e = tab[i]) != null;
             i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();

            // If we find key, then we need to swap it
            // with the stale entry to maintain hash table order.
            // The newly stale slot, or any other stale slot
            // encountered above it, can then be sent to expungeStaleEntry
            // to remove or rehash all of the other entries in run.
            /*
             * 進(jìn)入到了replaceStaleEntry方法,說明命中了一個(gè)stale slot,
             * 則命中的Entry弱引用為空,首次命中的索引為入?yún)taleSlot。
             * 因此,如果在當(dāng)前段run中命中了key,則將對應(yīng)的Entry替換到
             * 散列表的staleSlot位置。其實(shí)就是將stale Entry往run的后邊
             * 移動(dòng),方便清理。之后進(jìn)行一次連續(xù)段清理和一次啟發(fā)式清理。
             */
            if (k == key) {
                e.value = value;

                tab[i] = tab[staleSlot];
                tab[staleSlot] = e;

                // Start expunge at preceding stale entry if it exists
                if (slotToExpunge == staleSlot)
                    slotToExpunge = i;
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                return;
            }

            // If we didn't find stale entry on backward scan, the
            // first stale entry seen while scanning for key is the
            // first still present in the run.
            /*
             * stale entry表示弱引用為空的Entry,如果該位置為stale Entry,
             * 且之前的前向遍歷沒有找到一個(gè)stale Entry,則將當(dāng)前位置設(shè)置為
             * 清理起始位置
             */
            if (k == null && slotToExpunge == staleSlot)
                slotToExpunge = i;
        }

        // If key not found, put new entry in stale slot
                    // 沒用找到可以實(shí)施替換策略的slot,只能新建一個(gè)。
        tab[staleSlot].value = null;
        tab[staleSlot] = new Entry(key, value);

        // If there are any other stale entries in run, expunge them
        /*
         * 執(zhí)行到此,如果slotToExpunge==staleSlot,說明沒有stale entry在run中,
         * 否則,從slotToExpunge執(zhí)行一次連續(xù)段清理和啟發(fā)式清理。
         */
        if (slotToExpunge != staleSlot)
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    }

replaceStaleEntry實(shí)現(xiàn)了一種策略:

  1. 將full entry向前移動(dòng),將stale slot向后移動(dòng),最小化連續(xù)段清理方法expungeStaleEntry的清理范圍
  2. 以入?yún)taleSlot位置為中心,先向前,再向后遍歷找到expungeStaleEntry方法的執(zhí)行起點(diǎn)位置slotToExpunge。從該位置(包括該位置)往后到遇到一個(gè)null slot之前這段,一定有一堆stale slot等待清理。

3.2.4 清理方法

3.2.4.1 連續(xù)段清理——expungeStaleEntry

/**
 * Expunge a stale entry by rehashing any possibly colliding entries
 * lying between staleSlot and the next null slot.  This also expunges
 * any other stale entries encountered before the trailing null.  See
 * Knuth, Section 6.4
 *
 * @param staleSlot index of slot known to have null key
 * @return the index of the next null slot after staleSlot
 * (all between staleSlot and this slot will have been checked
 * for expunging).
 * 
 * 在staleSlot和下一個(gè)null slot之間的slot都會(huì)被檢查一遍是否需要清理,
 * 即連續(xù)段清理。該方法在清理的同時(shí)還會(huì)重新“整理”散列表中的Entry,即rehash。
 */
private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    // expunge entry at staleSlot
    // 手動(dòng)將value置null,輔助GC
    tab[staleSlot].value = null;
    // 將散列表的staleSlot位置置空,輔助GC
    tab[staleSlot] = null;
    size--;

    // Rehash until we encounter null
    Entry e;
    int i;
    // 從staleSlot向后遍歷,直到遇到個(gè)null slot
    for (i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        // 找到一個(gè)stale slot,置null,輔助GC
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            // 根據(jù)hashcode計(jì)算k在不沖突情況下散列表的索引
            int h = k.threadLocalHashCode & (len - 1);
            /*
             * h!=i說明h位置上發(fā)生了hash沖突,導(dǎo)致k的索引值從h向后移動(dòng)到了i。
             * 此時(shí),查找h位置往后第一個(gè)null slot,并將i位置的Entry移動(dòng)到該位置。
             * 這是一個(gè)rehash操作。
             */
            if (h != i) {
                tab[i] = null;

                // Unlike Knuth 6.4 Algorithm R, we must scan until
                // null because multiple entries could have been stale.
                // 查找h位置往后第一個(gè)null slot
                while (tab[h] != null)
                    h = nextIndex(h, len);
                // 移動(dòng)
                tab[h] = e;
            }
        }
    }
    // 返回遇到的第一個(gè)null slot索引
    return i;
}

expungeStaleEntry方法邊清理,邊進(jìn)行散列表的整理,盡量讓full entries連在一起。源碼注釋中提及了Donald E. Knuth所著的《計(jì)算機(jī)程序設(shè)計(jì)藝術(shù)》6.4節(jié)散列的R算法,該算法是關(guān)于如何在一個(gè)線性探測hash表中刪除一個(gè)元素的算法。我天賦太差,一時(shí)領(lǐng)悟不了,現(xiàn)截圖如下,想廣大網(wǎng)友請教。另外還可參考 ThreadLocal源碼解讀一文。

圖3.3 計(jì)算機(jī)程序設(shè)計(jì)藝術(shù)6.4散列-R算法

3.2.4.2 全表清理expungeStaleEntries

/**
 * Expunge all stale entries in the table.
 * 
 * 全表清理方法
 */
private void expungeStaleEntries() {
    Entry[] tab = table;
    int len = tab.length;
    for (int j = 0; j < len; j++) {
        Entry e = tab[j];
        // 全表遍歷,找到一個(gè)stale slot,則做一次連續(xù)段清理
        if (e != null && e.get() == null)
            expungeStaleEntry(j);
    }
}

3.2.4.3 啟發(fā)式清理——cleanSomeSlots

/**
 * Heuristically scan some cells looking for stale entries.
 * This is invoked when either a new element is added, or
 * another stale one has been expunged. It performs a
 * logarithmic number of scans, as a balance between no
 * scanning (fast but retains garbage) and a number of scans
 * proportional to number of elements, that would find all
 * garbage but would cause some insertions to take O(n) time.
 * 
 * 我的理解:
 * 啟發(fā)式清理stale entris,當(dāng)插入一個(gè)新entry或者調(diào)用
 * expungeStaleEntry方法清理entries時(shí)會(huì)調(diào)用此方法。
 * 本方法使用了對數(shù)掃描,以權(quán)衡"不掃描,留存垃圾"和
 * "全量掃描,工作量線性增長"這兩種方式。后一種方式雖然可以找到
 * 所有的垃圾,但是會(huì)當(dāng)時(shí)插入動(dòng)作花費(fèi)O(n)的時(shí)間復(fù)雜度。
 * 
 * @param i a position known NOT to hold a stale entry. The
 * scan starts at the element after i.
 * 
 * 散列表在入?yún)位置上肯定不是stale slot,所以從i后面掃描
 * 
 * @param n scan control: {@code log2(n)} cells are scanned,
 * unless a stale entry is found, in which case
 * {@code log2(table.length)-1} additional cells are scanned.
 * When called from insertions, this parameter is the number
 * of elements, but when from replaceStaleEntry, it is the
 * table length. (Note: all this could be changed to be either
 * more or less aggressive by weighting n instead of just
 * using straight log n. But this version is simple, fast, and
 * seems to work well.)
 *
 * 入?yún)用來計(jì)算本次掃描slot的最小數(shù)量——log2(n),插入(set)動(dòng)作傳入size,
 * replaceStaleEntry中的調(diào)用傳入散列表長度。
 * 
 * 每找到一個(gè)stale entries清理后,還會(huì)進(jìn)行額外的log2(表長度)-1數(shù)量的slot掃描。
 * 
 * Note:掃描次數(shù)根據(jù)權(quán)重n可以增加或者減少,但該版本的比較簡單,快速,使用起來還不錯(cuò)。
 *
 * @return true if any stale entries have been removed.
 * 有slot被清理了就返回true
 */
private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
        // 向后遍歷
        i = nextIndex(i, len);
        Entry e = tab[i];
        // 找到一個(gè)stale slot,執(zhí)行一次連續(xù)段清理
        if (e != null && e.get() == null) {
            /*
             * 重新復(fù)制n,影響循環(huán)條件,
             * 進(jìn)行額外的log2(表長度)-1數(shù)量的slot掃描
             */
            n = len;
            removed = true;
            // 執(zhí)行一次連續(xù)段清理
            i = expungeStaleEntry(i);
        }
    } while ( (n >>>= 1) != 0);
    return removed;
}

3.2.4.4. ThreadLocalMap的remove

/**
 * Remove the entry for key.
 * 
 * 刪除,該方法被ThreadLocal的remove接口調(diào)用
 */
private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        // 找到入?yún)hreadLocal對應(yīng)的Entry,將其變成stale slot
        if (e.get() == key) {
            // WeakReference實(shí)例調(diào)用clear會(huì)斷開指向?qū)ο蟮囊?            // 但其他指向該對象的引用仍然有效。
            e.clear();
            // 此時(shí)該Entry變成了stale entry,進(jìn)行一次連續(xù)段清理
            expungeStaleEntry(i);
            return;
        }
    }
}

有關(guān)WeakReference的clear方法,給出一段測試代碼。

    /*
     * 測試WeakReference
     */
    Person p = new Person("Qiu", 24);
    WeakReference<Person> test = new WeakReference<Person>(p);
    System.out.println("test.get() == p: " + (test.get() == p));
    test.clear();
        // 增加刪除下行輸出結(jié)果不變
    // System.gc();
    TimeUnit.SECONDS.sleep(2);
    System.out.println("test.get(): " + test.get());
    System.out.println("p: " + p);
    /* output:
     * test.get() == p: true
     * test.get(): null
     * p: Qiu-24
     */

輸出結(jié)果說明,該WeakReference對象調(diào)用clear方法只是斷開了指向Person對象的引用(可以想象成圖3.1中虛線斷了。),因此get方法返回null,但是該P(yáng)erson一直被p所引用,因此無論是否顯式調(diào)用GC,都不會(huì)導(dǎo)致Person對象被回收。

3.2.5 擴(kuò)容——rehash與resize

/**
 * Re-pack and/or re-size the table. First scan the entire
 * table removing stale entries. If this doesn't sufficiently
 * shrink the size of the table, double the table size.
 * 
 * 散列表再散列。首先調(diào)用一次全表清理方法,此方法邊清理,邊重新散列各個(gè)Entry。
 * 
 * set操作會(huì)調(diào)用該方法。
 */
private void rehash() {
    expungeStaleEntries();

    // Use lower threshold for doubling to avoid hysteresis
    /*
     * 全表清理之后,如果size沒有減小很多(判斷閾值為(2/3-2/3/4)len=0.5len),
     * 則進(jìn)行一次擴(kuò)容。這樣的操作是為了避免延遲。
     */
    if (size >= threshold - threshold / 4)
        resize();
}

/**
 * Double the capacity of the table.
 * 
 * 擴(kuò)容方法,直接double當(dāng)前容量,并再散列
 */
private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;

    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        // 遍歷原表中所有的非null entry
        if (e != null) {
            ThreadLocal<?> k = e.get();
            // 找到一個(gè)stale entry,則手動(dòng)清理value
            if (k == null) {
                e.value = null; // Help the GC
            } else {
                // 走到這里,說明是一個(gè)full entry,
                // 使用線性探測法方式進(jìn)新表中
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null)
                    h = nextIndex(h, newLen);
                newTab[h] = e;
                count++;
            }
        }
    }
    // 更新變量。
    setThreshold(newLen);
    size = count;
    table = newTab;
}

四、ThreadLocal的內(nèi)存泄露

我對Java中內(nèi)存泄露的理解:你永遠(yuǎn)不會(huì)再使用這個(gè)對象了,但這個(gè)對象卻無法被GC。

ThreadLocal有了內(nèi)存泄露的可能嗎?或者具體的說,線程隔離的數(shù)據(jù)(對象)value會(huì)有內(nèi)存泄露的可能嗎?


ps:不敢說沒有,這樣太囂張了。

value由一條強(qiáng)引用鏈抵達(dá):threadLocals(ThreadLocal.ThreadLocalMap對象)->Entry數(shù)組的一個(gè)slot->entry.value。這要這條鏈連續(xù),那么value就不會(huì)被回收。
在實(shí)際開發(fā)中,一般使用線程池執(zhí)行并發(fā)任務(wù),不會(huì)顯示創(chuàng)建銷毀Thread對象。而線程池中會(huì)維護(hù)一定數(shù)量的可復(fù)用Thread對象(線程池相關(guān)參考Java線程池基礎(chǔ)知識(shí))。因此,只要這些對象不消亡,上述引用鏈中的第一節(jié)——threadLocals就不會(huì)被回收。

至于引用鏈中的第二節(jié)——Entry數(shù)組中的一個(gè)slot。這個(gè)slot中存放著一個(gè)Entry,只有將這個(gè)slot變成null slot,該位置的Entry才會(huì)被回收。ThreadLocalMap的核心清理方法expungeStaleEntry會(huì)在getEntry,set,remove方法中機(jī)會(huì)性調(diào)用,而每次調(diào)用只能說有很大概率清理掉stale entries。而只有當(dāng)slot變成stale slot時(shí),才能夠被expungeStaleEntry方法清理成null slot。因此,如3.1節(jié)開頭所述,需要顯式地調(diào)用ThreadLocal.remove方法,才能斷開弱引用,這時(shí)slot變成stale slot,并且隨后就會(huì)進(jìn)行一次以該slot為起點(diǎn)的expungeStaleEntry清理。

在看看ThreadLocalMap類定義處的一段注釋。

  /**
     * ThreadLocalMap is a customized hash map suitable only for
     * maintaining thread local values. No operations are exported
     * outside of the ThreadLocal class. The class is package private to
     * allow declaration of fields in class Thread.  To help deal with
     * very large and long-lived usages, the hash table entries use
     * WeakReferences for keys. However, since reference queues are not
     * used, stale entries are guaranteed to be removed only when
     * the table starts running out of space.
     * 
     * ThreadLocalMap類是一個(gè)為ThreadLocal專門定制的實(shí)現(xiàn)。使用開放尋址法的散列表存儲(chǔ)
     * <ThreadLocal key, Object value>鍵值對。并自帶基于弱引用的高效地垃圾清理機(jī)制。
     * stale entries只有在散列表空間快用完時(shí),才會(huì)保證一定被清理。
     */
    static class ThreadLocalMap {

stale entries只有在散列表空間快用完時(shí),才會(huì)保證一定被清理。散列表空間快用完了會(huì)發(fā)生什么?發(fā)生rehash。ThreadLocalMap的rehash方法中進(jìn)來就會(huì)執(zhí)行一次全表清理,此時(shí)是遍歷全表找stale slot,因此才有了“保證”這一動(dòng)詞。注意不是調(diào)用resize,因?yàn)檎{(diào)用棧從頂向下是resize -> rehash -> set。

綜上,為了防止ThreadLocal內(nèi)存泄露,最好是在使用完畢后,調(diào)用remove進(jìn)行手動(dòng)清理。否則就靠最后一次set操作會(huì)不會(huì)觸發(fā)rehash。

五、可繼承的ThreadLocal——InheritableThreadLocal

在ThreadLocal的源碼中,可以看到有些注釋說:此方法在InheritableThreadLocal中被重寫。由此知道ThreadLocal有一個(gè)子類InheritableThreadLocal,故名思意,這個(gè)子類具備“繼承”能力。具體的說,InheritableThreadLocal所關(guān)聯(lián)的數(shù)據(jù),能夠從雙親線程中繼承到子線程中。而該“繼承”機(jī)制的實(shí)現(xiàn)原理可以從Thread類的init方法中一窺究竟。

//==================Thread的init方法代碼片段=============================================
// 雙親線程是否有InheritableThreadLocal
if (parent.inheritableThreadLocals != null)
    // 有的話給子線程拷貝一份。
    this.inheritableThreadLocals =
        ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

inheritableThreadLocals是Thread的一個(gè)類成員,也是ThreadLocalMap類型的。

創(chuàng)建一個(gè)線程A的線程B,就是線程A的雙親線程。A線程在創(chuàng)建過程中會(huì)調(diào)用init方法,中間會(huì)判斷B線程中的inheritableThreadLocals是否有內(nèi)容,如果有,則為線程A的inheritableThreadLocals拷貝一份,以實(shí)現(xiàn)"繼承"。

但是,在實(shí)際開發(fā)中,一般不會(huì)顯式創(chuàng)建Thread,而是使用線程池。因此InheritableThreadLocal就沒有用武之地了。但在很多場景下,即使在使用線程池的情況下,我們?nèi)匀恍枰环N“ThreadLocal線程傳遞的功能”以實(shí)現(xiàn)任務(wù)追蹤。這個(gè)時(shí)候該如何是好呢?

由于公司保密策略,這里就不提了,有興趣的朋友可以參考transmittable-thread-local項(xiàng)目,實(shí)現(xiàn)原理大同小異吧。

另外,InheritableThreadLocal曾經(jīng)在Logback1.1.5版本之前(不包括)的LogMDCAdapter類中有過實(shí)現(xiàn)(1.1.4中的LogMDCAdapter第56行),因此之前版本的MDC(Mapping Diagnosis Context)有“線程繼承”的功能,但是文檔中也特別說明了,對于線程池中的線程,需要差別對待。然而自1.1.5及之后的版本中(1.1.5中的LogMDCAdapter,51行和55行),Logback取消了MDC的“線程繼承”功能,原因是為了解決這個(gè)bug——LOGBACK-624 or LOGBACK-422。大致意思就是:“你們這個(gè)“線程繼承”功能既然在線程池中不穩(wěn)定,那不如不要,我這給你提供個(gè)好方法,也能實(shí)現(xiàn)"線程繼承"的需求?!庇谑?.1.5版本開始,Logback確實(shí)取消了該功能,并且到現(xiàn)在也沒有考慮重新實(shí)現(xiàn)。詳情還可以參考MDC Adapter with configurable InheritableThreadLocal以及Logback的commit aa7d584ec。之前,踩過坑,望廣大同行能夠順利跨過。

六、總結(jié)

ThreadLocal的源碼包括注釋也才722行,但其中卻實(shí)現(xiàn)了一套定制的線性探測hash表以及高效的垃圾清理機(jī)制。看ThreadLocal關(guān)鍵看ThreadLocalMap,有如下特點(diǎn):

  1. 以ThreadLocal為key,與之關(guān)聯(lián)的線程隔離數(shù)據(jù)為value,構(gòu)成Entry放置在散列表中。
  2. Entry類是ThreadLocalMap定制的,繼承自WeakReference,以配合ThreadLocalMap自身的垃圾清理方法expungeStaleEntry實(shí)現(xiàn)無效Entry清理。
  3. replaceStaleEntry和expungeStaleEntry都有rehash功能,盡量讓有效的entries連續(xù)分布,無效的entries連續(xù)分布。
  4. get、set和remove方法都能夠機(jī)會(huì)性(remove一定)調(diào)用expungeStaleEntry進(jìn)行清理。該方法邊清理,邊整理,但是并不保證能夠清理完所有的無效Entries,只有擴(kuò)容的時(shí)候殘能夠全表清理。
  5. 為防止內(nèi)存泄露,并發(fā)任務(wù)結(jié)束后請手動(dòng)調(diào)用ThreadLocal.remove。

此外,文章末尾也提及了有關(guān)InheritableThreadLocal的原理及應(yīng)用,雖然這個(gè)類有些雞肋,但I(xiàn)nheritable的功能聊勝于無。

ps: 將源碼連同注釋一起貼在文章中是因?yàn)樽约河⒄Z不是很好怕對類的工作原理的理解有些誤差每段源碼注釋中必要的地方我寫上了自己的理解如果有誤,希望同行指出(其實(shí)我就是為了湊字?jǐn)?shù)呵呵)。

七、參考文獻(xiàn)

  1. 《Java并發(fā)編程實(shí)戰(zhàn)》
  2. ThreadLocal源碼解讀
  3. 對ThreadLocal實(shí)現(xiàn)原理的一點(diǎn)思考
  4. 《深入理解Java虛擬機(jī)(第2版) : JVM高級(jí)特性與最佳實(shí)踐》
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容