Java 多線程之 FutureTask 源碼剖析

系統(tǒng)通過(guò)多線程優(yōu)化性能,實(shí)際上就是將串行操作轉(zhuǎn)換為并行操作,也就是說(shuō)將同步操作轉(zhuǎn)換為異步操作。在眾多并發(fā)類中,FutureTask 類可以接收線程返回的結(jié)果,并且可以取消或者中斷線程。

先看下 FutureTask 類的類圖結(jié)構(gòu):


由類圖可以知道,F(xiàn)utureTask 類是 Runnable 的實(shí)現(xiàn)類,所以可以通過(guò)線程池 submit() 或者直接 new Thread() 啟動(dòng)線程。

舉個(gè)栗子:

public class FutureTaskDemo {
    public static void main(String[] args) {
        MyCallableDemo demo = new MyCallableDemo();
//        // 1.直接通過(guò)new Thread()啟動(dòng)線程
//        FutureTask task = new FutureTask(demo);
//        new Thread(task).start();
//        try {
//            System.out.println(task.get());
//        } catch (Exception e) {
//            e.printStackTrace();
//        }

        // 2.通過(guò)線程池啟動(dòng)線程
        ExecutorService service = Executors.newSingleThreadExecutor();

        Future<String> result = service.submit(demo);
        try {
            System.out.println(result.get());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            service.shutdown();
        }
    }
}

class MyCallableDemo implements Callable {
    @Override
    public String call() throws Exception {
        return "MuggleLee";
    }
}

輸出結(jié)果:

MuggleLee

通過(guò)例子可以看出,使用FutureTask類可以接收線程完成后返回的結(jié)果。如果使用場(chǎng)景是需要接收線程執(zhí)行的結(jié)果(無(wú)論是成功執(zhí)行的結(jié)果還是異常返回的信息),實(shí)現(xiàn)Callable接口結(jié)合FutureTask實(shí)現(xiàn)類接收返回?cái)?shù)據(jù)是比較常見(jiàn)的一種做法。更為常見(jiàn)的做法是通過(guò)使用線程池submit()方法接收返回的結(jié)果。

線程池的實(shí)現(xiàn)類 ThreadPoolExecutor 提供了3個(gè) submit() 方法支持獲取線程返回的結(jié)果。

Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Callable<T> task);

可以發(fā)現(xiàn),返回值類型都是 Future 接口。那繼續(xù)看下 Future 接口有哪些抽象方法。

boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

通過(guò)方法名很明顯可以知道各個(gè)抽象方法的作用。

實(shí)際上,submit() 方法將線程的執(zhí)行結(jié)果封裝成 FutureTask 對(duì)象返回。

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    // 將 Callable 接口對(duì)象封裝成 FutureTask 對(duì)象
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

接下來(lái),看下創(chuàng)建 FutureTask 對(duì)象的源碼。

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;
    }

Executor類部分源碼:

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

FutureTask 類的第一個(gè)構(gòu)造方法將參數(shù) Callable 對(duì)象賦值給 FutureTask 對(duì)象的 callable 屬性,并設(shè)置 state 變量為 NEW(稍后再解釋 callable 和 state 兩個(gè)變量的作用);有意思的是第二個(gè)構(gòu)造方法,將第一個(gè)參數(shù) Runnable 對(duì)象傳給 Executor 類的 callable() 方法,再調(diào)用已經(jīng)實(shí)現(xiàn)了 Callable 接口的 RunnableAdapter 適配器類,執(zhí)行 Runnable 對(duì)象的 run() 方法。(設(shè)計(jì)模式中的適配器模式,不熟悉的可以參考我另外一篇拙作:設(shè)計(jì)模式之適配器模式

emmmm...上面幾個(gè)方法的確比較繞,畫(huà)了一張流程圖輔助理解吧。


接下來(lái),重點(diǎn)剖析 FutureTask 類的源碼:

FutureTask 類聲明了7種狀態(tài):

    /**
     * Possible state transitions:(FutureTask狀態(tài)的轉(zhuǎn)換)
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    // 任務(wù)當(dāng)前 FutureTask 對(duì)象的狀態(tài)
    private volatile int state;
    // 新建任務(wù)(初始狀態(tài))
    private static final int NEW = 0;
    // 任務(wù)運(yùn)行中
    private static final int COMPLETING = 1;
    // 任務(wù)正常完成
    private static final int NORMAL = 2;
    // 任務(wù)異常
    private static final int EXCEPTIONAL = 3;
    // 任務(wù)被取消
    private static final int CANCELLED = 4;
    // 任務(wù)中斷中
    private static final int INTERRUPTING = 5;
    // 任務(wù)被中斷
    private static final int INTERRUPTED = 6;

    // 提交的任務(wù)
    private Callable<V> callable;
    // 任務(wù)運(yùn)行的結(jié)果
    private Object outcome;
    // 執(zhí)行任務(wù)的線程
    private volatile Thread runner;
    // 等待結(jié)果的隊(duì)列(單鏈表)
    private volatile WaitNode waiters;

除了記錄 FutureTask 對(duì)象狀態(tài)之外,還聲明了 state、runner、waiters的內(nèi)存偏移量 和等待節(jié)點(diǎn) WaitNode:

    private static final sun.misc.Unsafe UNSAFE;
    // FutureTask 對(duì)象狀態(tài)在內(nèi)存中的偏移量
    private static final long stateOffset;
    // 執(zhí)行任務(wù)對(duì)象在內(nèi)存中的偏移量
    private static final long runnerOffset;
    // 等待鏈表在內(nèi)存中的偏移量
    private static final long waitersOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            // 通過(guò)反射獲取各對(duì)象在內(nèi)存中的偏移量
            stateOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    /**
     * 單鏈表。存儲(chǔ)等待線程
     */
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;

        WaitNode() {
            thread = Thread.currentThread();
        }
    }

當(dāng)程序執(zhí)行到線程池的execute(Runnable runnable)方法的時(shí)候,由于 execute() 方法接收的參數(shù)是 FutureTask 對(duì)象,所以肯定是執(zhí)行 FutureTask 類的 run() 方法。

FutureTask.run()方法剖析

    public void run() {
        // 如果當(dāng)前 FutureTask 對(duì)象的狀態(tài)不是 NEW 或者執(zhí)行 CAS 操作賦值給 runnerOffset 失敗直接跳出 run 方法
        if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // 設(shè)置 runner 為 null ,利于 GC
            runner = null;
            int s = state;
            // 如果有其它線程在中斷任務(wù),會(huì)調(diào)用 handlePossibleCancellationInterrupt 方法處理
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    // 執(zhí)行任務(wù)正常結(jié)束后調(diào)用此方法
    protected void set(V v) {
        // 執(zhí)行 CAS 方法設(shè)置 FutureTask 對(duì)象的狀態(tài)由 NEW -> COMPLETING -> NORMAL
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 將任務(wù)執(zhí)行的結(jié)果賦值給 outcome 變量
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            // 喚醒等待線程
            finishCompletion();
        }
    }
    // 執(zhí)行任務(wù)異常會(huì)調(diào)用此方法
    protected void setException(Throwable t) {
        // 執(zhí)行 CAS 方法設(shè)置 FutureTask 對(duì)象的狀態(tài)由 NEW -> COMPLETING -> EXCEPTIONAL
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 將任務(wù)執(zhí)行的結(jié)果賦值給 outcome 變量
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
            // 喚醒等待線程
            finishCompletion();
        }
    }
    /**
     * 喚醒等待線程
     */
    private void finishCompletion() {
        for (WaitNode q; (q = waiters) != null; ) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                // 自旋遍歷單鏈表
                for (; ; ) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        // 喚醒線程
                        LockSupport.unpark(t);
                    }
                    // 獲取下一個(gè)節(jié)點(diǎn),直到節(jié)點(diǎn)為null
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null;
                    q = next;
                }
                break;
            }
        }
        // 這是一個(gè)空方法,可以讓開(kāi)發(fā)中擴(kuò)展使用
        done();
        callable = null;
    }
    private void handlePossibleCancellationInterrupt(int s) {
        // 雙重判斷,這里困擾我很久。為什么需要兩次判斷狀態(tài)是否為 INTERRUPTING 呢?
        // 考慮的場(chǎng)景是:需要確保其它線程執(zhí)行 cencel(true) 是在執(zhí)行 run() 或者 runAndReset()的過(guò)程中
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield();// 通過(guò)自旋,優(yōu)先讓其它線程執(zhí)行,等待 cancel(true) 執(zhí)行完成
    }

核心代碼都加上了注釋,結(jié)合源碼,畫(huà)了張流程圖加深理解吧!

FutureTask.get()方法剖析

    /**
     * 返回執(zhí)行結(jié)果
     */
    public V get() throws InterruptedException, ExecutionException {
        // 獲取當(dāng)前 FutureTask 對(duì)象的狀態(tài)
        int s = state;
        // 如果任務(wù)的狀態(tài)是新建(NEW)或者運(yùn)行中()就執(zhí)行 awaitDone 方法等待獲取
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        // 返回執(zhí)行結(jié)果
        return report(s);
    }

    /**
     * 與上面 get() 相似,加上 timeout 設(shè)置過(guò)期時(shí)間,超時(shí)拋出異常
     *
     * @param timeout 過(guò)期時(shí)間
     * @param unit    時(shí)間單位
     * @return 返回執(zhí)行結(jié)果
     */
    public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V) x;
        if (s >= CANCELLED)
            throw new CancellationException();// 被取消或者中斷,直接拋出異常
        throw new ExecutionException((Throwable) x);// 運(yùn)行過(guò)程中發(fā)生異常
    }

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        // 如果設(shè)置超時(shí),計(jì)算截止時(shí)間
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        // 代表當(dāng)前等待結(jié)果線程的等待節(jié)點(diǎn)
        WaitNode q = null;
        // 記錄是否把當(dāng)前線程加入到了隊(duì)列
        boolean queued = false;
        for (; ; ) {
            if (Thread.interrupted()) {
                removeWaiter(q);// 如果被中斷了,刪除當(dāng)前線程節(jié)點(diǎn),并拋出異常
                throw new InterruptedException();
            }
            int s = state;
            // 如果 state 狀態(tài)大于 COMPLETING 則說(shuō)明任務(wù)已經(jīng)執(zhí)行,直接返回狀態(tài)值
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            } else if (s == COMPLETING) // 如果 state 狀態(tài)等于 COMPLETING,說(shuō)明正在設(shè)置結(jié)果,則放棄時(shí)間片輪詢等待
                Thread.yield();
            else if (q == null) // 任務(wù)狀態(tài)為 NEW ,構(gòu)造等待節(jié)點(diǎn)
                q = new WaitNode();
            else if (!queued) // 狀態(tài)為 NEW ,并且節(jié)點(diǎn)不為 null ,并且該節(jié)點(diǎn)沒(méi)有加入到 waiter 隊(duì)列中
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            else if (timed) { // 如果設(shè)定超時(shí),進(jìn)行超時(shí)判斷
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
            // 阻塞到超時(shí)時(shí)間
                LockSupport.parkNanos(this, nanos);
            } else // 如果沒(méi)有設(shè)置超時(shí),會(huì)一直阻塞,直到被中斷或者被喚醒
                LockSupport.park(this);
        }
    }    
    // 通過(guò)自旋鏈表刪除指定節(jié)點(diǎn)
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            // 設(shè)置節(jié)點(diǎn)為空,通過(guò)自旋找出空節(jié)點(diǎn)并刪除
            node.thread = null;
            // 自旋保證刪除成功
            retry:
            for (; ; ) {
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)// 如果當(dāng)前節(jié)點(diǎn)不是空,不需要?jiǎng)h除
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null)
                            continue retry;
                    } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                            q, s))
                        continue retry;
                }
                break;
            }
        }
    }

結(jié)合源碼,畫(huà)出了以下流程圖加深理解:


FutureTask.cancel()方法剖析

    /**
     * 任務(wù)取消
     * @param mayInterruptIfRunning true:代表中斷線程 false:代表取消線程
     * @return
     */
    public boolean cancel(boolean mayInterruptIfRunning) {
        // 如果任務(wù)不在 NEW 狀態(tài)或者執(zhí)行 UNSAFE 操作失敗直接返回 false
        if (!(state == NEW &&
                UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally {
                    // 設(shè)置最終的狀態(tài)為中斷狀態(tài)
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            // 釋放等待線程
            finishCompletion();
        }
        return true;
    }

cancel 方法參數(shù)為 mayInterruptIfRunning,當(dāng)參數(shù)值為 false 的時(shí)候,直接將 FutureTask 對(duì)象的狀態(tài)設(shè)置為 CANCELLED,并釋放等待線程;當(dāng)參數(shù)值為 ture 的時(shí)候,調(diào)用 interrupt() 方法中斷線程,并設(shè)置最終狀態(tài)為中斷狀態(tài)。

runAndReset()方法剖析

值得一提的是, FutureTask 類有個(gè)方法 runAndReset() 可以讓線程重復(fù)執(zhí)行。譬如使用 ScheduledThreadPoolExecutor 線程池,這個(gè)類可以使線程周期性的重復(fù)執(zhí)行,具體的可以自行查看相關(guān)源碼。

    /**
     * 與 run() 方法類似,但該方法可以執(zhí)行多次。
     * 不同點(diǎn):1.不設(shè)置返回值 2.不設(shè)置 state 值(執(zhí)行完任務(wù)后,F(xiàn)utureTask 對(duì)象狀態(tài)還是 NEW )
     */
    protected boolean runAndReset() {
        if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    // 不設(shè)置返回值
                    c.call();
                    ran = true;
                } catch (Throwable ex) {    
                    setException(ex);
                }
            }
        } finally {
            runner = null;
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        // 任務(wù)執(zhí)行成功,且狀態(tài)重置為NEW
        return ran && s == NEW;
    }

isCancelled()、isDone()方法源碼:

    // 根據(jù)狀態(tài)碼判斷任務(wù)是否被取消
    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    // 根據(jù)狀態(tài)碼判斷任務(wù)是否執(zhí)行完成
    public boolean isDone() {
        return state != NEW;
    }

總結(jié)

使用 FutureTask 類結(jié)合使用線程池,可以通過(guò)多線程異步計(jì)算任務(wù),最后所有子線程執(zhí)行完成之后再繼續(xù)執(zhí)行主線程。

參考資料:

《Java多線程編程實(shí)戰(zhàn)指南-核心篇》

http://www.tianshouzhi.com/api/tutorials/mutithread/317

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容