Java并發(fā)

一、線程狀態(tài)轉(zhuǎn)換

<div align="center"> <img src="../pics//ace830df-9919-48ca-91b5-60b193f593d2.png" width=""/> </div>

新建(New)

創(chuàng)建后尚未啟動(dòng)。

可運(yùn)行(Runnable)

可能正在運(yùn)行,也可能正在等待 CPU 時(shí)間片。

包含了操作系統(tǒng)線程狀態(tài)中的 Running 和 Ready。

阻塞(Blocking)

等待獲取一個(gè)排它鎖,如果其線程釋放了鎖就會(huì)結(jié)束此狀態(tài)。

無限期等待(Waiting)

等待其它線程顯式地喚醒,否則不會(huì)被分配 CPU 時(shí)間片。

進(jìn)入方法 退出方法
沒有設(shè)置 Timeout 參數(shù)的 Object.wait() 方法 Object.notify() / Object.notifyAll()
沒有設(shè)置 Timeout 參數(shù)的 Thread.join() 方法 被調(diào)用的線程執(zhí)行完畢
LockSupport.park() 方法 -

限期等待(Timed Waiting)

無需等待其它線程顯式地喚醒,在一定時(shí)間之后會(huì)被系統(tǒng)自動(dòng)喚醒。

調(diào)用 Thread.sleep() 方法使線程進(jìn)入限期等待狀態(tài)時(shí),常常用“使一個(gè)線程睡眠”進(jìn)行描述。

調(diào)用 Object.wait() 方法使線程進(jìn)入限期等待或者無限期等待時(shí),常常用“掛起一個(gè)線程”進(jìn)行描述。

睡眠和掛起是用來描述行為,而阻塞和等待用來描述狀態(tài)。

阻塞和等待的區(qū)別在于,阻塞是被動(dòng)的,它是在等待獲取一個(gè)排它鎖。而等待是主動(dòng)的,通過調(diào)用 Thread.sleep() 和 Object.wait() 等方法進(jìn)入。

進(jìn)入方法 退出方法
Thread.sleep() 方法 時(shí)間結(jié)束
設(shè)置了 Timeout 參數(shù)的 Object.wait() 方法 時(shí)間結(jié)束 / Object.notify() / Object.notifyAll()
設(shè)置了 Timeout 參數(shù)的 Thread.join() 方法 時(shí)間結(jié)束 / 被調(diào)用的線程執(zhí)行完畢
LockSupport.parkNanos() 方法 -
LockSupport.parkUntil() 方法 -

死亡(Terminated)

可以是線程結(jié)束任務(wù)之后自己結(jié)束,或者產(chǎn)生了異常而結(jié)束。

二、使用線程

有三種使用線程的方法:

  • 實(shí)現(xiàn) Runnable 接口;
  • 實(shí)現(xiàn) Callable 接口;
  • 繼承 Thread 類。

實(shí)現(xiàn) Runnable 和 Callable 接口的類只能當(dāng)做一個(gè)可以在線程中運(yùn)行的任務(wù),不是真正意義上的線程,因此最后還需要通過 Thread 來調(diào)用??梢哉f任務(wù)是通過線程驅(qū)動(dòng)從而執(zhí)行的。

實(shí)現(xiàn) Runnable 接口

需要實(shí)現(xiàn) run() 方法。

通過 Thread 調(diào)用 start() 方法來啟動(dòng)線程。

public class MyRunnable implements Runnable {
    public void run() {
        // ...
    }
}
public static void main(String[] args) {
    MyRunnable instance = new MyRunnable();
    Thread thread = new Thread(instance);
    thread.start();
}

實(shí)現(xiàn) Callable 接口

與 Runnable 相比,Callable 可以有返回值,返回值通過 FutureTask 進(jìn)行封裝。

public class MyCallable implements Callable<Integer> {
    public Integer call() {
        return 123;
    }
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
    MyCallable mc = new MyCallable();
    FutureTask<Integer> ft = new FutureTask<>(mc);
    Thread thread = new Thread(ft);
    thread.start();
    System.out.println(ft.get());
}

繼承 Thread 類

同樣也是需要實(shí)現(xiàn) run() 方法,因?yàn)?Thread 類也實(shí)現(xiàn)了 Runable 接口。

public class MyThread extends Thread {
    public void run() {
        // ...
    }
}
public static void main(String[] args) {
    MyThread mt = new MyThread();
    mt.start();
}

實(shí)現(xiàn)接口 VS 繼承 Thread

實(shí)現(xiàn)接口會(huì)更好一些,因?yàn)椋?/p>

  • Java 不支持多重繼承,因此繼承了 Thread 類就無法繼承其它類,但是可以實(shí)現(xiàn)多個(gè)接口;
  • 類可能只要求可執(zhí)行就行,繼承整個(gè) Thread 類開銷過大。

三、基礎(chǔ)線程機(jī)制

Executor

Executor 管理多個(gè)異步任務(wù)的執(zhí)行,而無需程序員顯式地管理線程的生命周期。這里的異步是指多個(gè)任務(wù)的執(zhí)行互不干擾,不需要進(jìn)行同步操作。

主要有三種 Executor:

  • CachedThreadPool:一個(gè)任務(wù)創(chuàng)建一個(gè)線程;
  • FixedThreadPool:所有任務(wù)只能使用固定大小的線程;
  • SingleThreadExecutor:相當(dāng)于大小為 1 的 FixedThreadPool。
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 5; i++) {
        executorService.execute(new MyRunnable());
    }
    executorService.shutdown();
}

Daemon

守護(hù)線程是程序運(yùn)行時(shí)在后臺(tái)提供服務(wù)的線程,不屬于程序中不可或缺的部分。

當(dāng)所有非守護(hù)線程結(jié)束時(shí),程序也就終止,同時(shí)會(huì)殺死所有守護(hù)線程。

main() 屬于非守護(hù)線程。

使用 setDaemon() 方法將一個(gè)線程設(shè)置為守護(hù)線程。

public static void main(String[] args) {
    Thread thread = new Thread(new MyRunnable());
    thread.setDaemon(true);
}

sleep()

Thread.sleep(millisec) 方法會(huì)休眠當(dāng)前正在執(zhí)行的線程,millisec 單位為毫秒。

sleep() 可能會(huì)拋出 InterruptedException,因?yàn)楫惓2荒芸缇€程傳播回 main() 中,因此必須在本地進(jìn)行處理。線程中拋出的其它異常也同樣需要在本地進(jìn)行處理。

public void run() {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

yield()

對(duì)靜態(tài)方法 Thread.yield() 的調(diào)用聲明了當(dāng)前線程已經(jīng)完成了生命周期中最重要的部分,可以切換給其它線程來執(zhí)行。該方法只是對(duì)線程調(diào)度器的一個(gè)建議,而且也只是建議具有相同優(yōu)先級(jí)的其它線程可以運(yùn)行。

public void run() {
    Thread.yield();
}

四、中斷

一個(gè)線程執(zhí)行完畢之后會(huì)自動(dòng)結(jié)束,如果在運(yùn)行過程中發(fā)生異常也會(huì)提前結(jié)束。

InterruptedException

通過調(diào)用一個(gè)線程的 interrupt() 來中斷該線程,如果該線程處于阻塞、限期等待或者無限期等待狀態(tài),那么就會(huì)拋出 InterruptedException,從而提前結(jié)束該線程。但是不能中斷 I/O 阻塞和 synchronized 鎖阻塞。

對(duì)于以下代碼,在 main() 中啟動(dòng)一個(gè)線程之后再中斷它,由于線程中調(diào)用了 Thread.sleep() 方法,因此會(huì)拋出一個(gè) InterruptedException,從而提前結(jié)束線程,不執(zhí)行之后的語句。

public class InterruptExample {

    private static class MyThread1 extends Thread {
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
                System.out.println("Thread run");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public static void main(String[] args) throws InterruptedException {
    Thread thread1 = new MyThread1();
    thread1.start();
    thread1.interrupt();
    System.out.println("Main run");
}
Main run
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at InterruptExample.lambda$main$0(InterruptExample.java:5)
    at InterruptExample$$Lambda$1/713338599.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)

interrupted()

如果一個(gè)線程的 run() 方法執(zhí)行一個(gè)無限循環(huán),并且沒有執(zhí)行 sleep() 等會(huì)拋出 InterruptedException 的操作,那么調(diào)用線程的 interrupt() 方法就無法使線程提前結(jié)束。

但是調(diào)用 interrupt() 方法會(huì)設(shè)置線程的中斷標(biāo)記,此時(shí)調(diào)用 interrupted() 方法會(huì)返回 true。因此可以在循環(huán)體中使用 interrupted() 方法來判斷線程是否處于中斷狀態(tài),從而提前結(jié)束線程。

public class InterruptExample {

    private static class MyThread2 extends Thread {
        @Override
        public void run() {
            while (!interrupted()) {
                // ..
            }
            System.out.println("Thread end");
        }
    }
}
public static void main(String[] args) throws InterruptedException {
    Thread thread2 = new MyThread2();
    thread2.start();
    thread2.interrupt();
}
Thread end

Executor 的中斷操作

調(diào)用 Executor 的 shutdown() 方法會(huì)等待線程都執(zhí)行完畢之后再關(guān)閉,但是如果調(diào)用的是 shutdownNow() 方法,則相當(dāng)于調(diào)用每個(gè)線程的 interrupt() 方法。

以下使用 Lambda 創(chuàng)建線程,相當(dāng)于創(chuàng)建了一個(gè)匿名內(nèi)部線程。

public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
        try {
            Thread.sleep(2000);
            System.out.println("Thread run");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    executorService.shutdownNow();
    System.out.println("Main run");
}
Main run
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9)
    at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

如果只想中斷 Executor 中的一個(gè)線程,可以通過使用 submit() 方法來提交一個(gè)線程,它會(huì)返回一個(gè) Future<?> 對(duì)象,通過調(diào)用該對(duì)象的 cancel(true) 方法就可以中斷線程。

Future<?> future = executorService.submit(() -> {
    // ..
});
future.cancel(true);

五、互斥同步

Java 提供了兩種鎖機(jī)制來控制多個(gè)線程對(duì)共享資源的互斥訪問,第一個(gè)是 JVM 實(shí)現(xiàn)的 synchronized,而另一個(gè)是 JDK 實(shí)現(xiàn)的 ReentrantLock。

synchronized

1. 同步一個(gè)代碼塊

public void func() {
    synchronized (this) {
        // ...
    }
}

它只作用于同一個(gè)對(duì)象,如果調(diào)用兩個(gè)對(duì)象上的同步代碼塊,就不會(huì)進(jìn)行同步。

對(duì)于以下代碼,使用 ExecutorService 執(zhí)行了兩個(gè)線程,由于調(diào)用的是同一個(gè)對(duì)象的同步代碼塊,因此這兩個(gè)線程會(huì)進(jìn)行同步,當(dāng)一個(gè)線程進(jìn)入同步語句塊時(shí),另一個(gè)線程就必須等待。

public class SynchronizedExample {

    public void func1() {
        synchronized (this) {
            for (int i = 0; i < 10; i++) {
                System.out.print(i + " ");
            }
        }
    }
}
public static void main(String[] args) {
    SynchronizedExample e1 = new SynchronizedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> e1.func1());
    executorService.execute(() -> e1.func1());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

對(duì)于以下代碼,兩個(gè)線程調(diào)用了不同對(duì)象的同步代碼塊,因此這兩個(gè)線程就不需要同步。從輸出結(jié)果可以看出,兩個(gè)線程交叉執(zhí)行。

public static void main(String[] args) {
    SynchronizedExample e1 = new SynchronizedExample();
    SynchronizedExample e2 = new SynchronizedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> e1.func1());
    executorService.execute(() -> e2.func1());
}
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9

2. 同步一個(gè)方法

public synchronized void func () {
    // ...
}

它和同步代碼塊一樣,作用于同一個(gè)對(duì)象。

3. 同步一個(gè)類

public void func() {
    synchronized (SynchronizedExample.class) {
        // ...
    }
}

作用于整個(gè)類,也就是說兩個(gè)線程調(diào)用同一個(gè)類的不同對(duì)象上的這種同步語句,也會(huì)進(jìn)行同步。

public class SynchronizedExample {

    public void func2() {
        synchronized (SynchronizedExample.class) {
            for (int i = 0; i < 10; i++) {
                System.out.print(i + " ");
            }
        }
    }
}
public static void main(String[] args) {
    SynchronizedExample e1 = new SynchronizedExample();
    SynchronizedExample e2 = new SynchronizedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> e1.func2());
    executorService.execute(() -> e2.func2());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

4. 同步一個(gè)靜態(tài)方法

public synchronized static void fun() {
    // ...
}

作用于整個(gè)類。

ReentrantLock

ReentrantLock 是 java.util.concurrent(J.U.C)包中的鎖。

public class LockExample {

    private Lock lock = new ReentrantLock();

    public void func() {
        lock.lock();
        try {
            for (int i = 0; i < 10; i++) {
                System.out.print(i + " ");
            }
        } finally {
            lock.unlock(); // 確保釋放鎖,從而避免發(fā)生死鎖。
        }
    }
}
public static void main(String[] args) {
    LockExample lockExample = new LockExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> lockExample.func());
    executorService.execute(() -> lockExample.func());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

比較

1. 鎖的實(shí)現(xiàn)

synchronized 是 JVM 實(shí)現(xiàn)的,而 ReentrantLock 是 JDK 實(shí)現(xiàn)的。

2. 性能

新版本 Java 對(duì) synchronized 進(jìn)行了很多優(yōu)化,例如自旋鎖等,synchronized 與 ReentrantLock 大致相同。

3. 等待可中斷

當(dāng)持有鎖的線程長(zhǎng)期不釋放鎖的時(shí)候,正在等待的線程可以選擇放棄等待,改為處理其他事情。

ReentrantLock 可中斷,而 synchronized 不行。

4. 公平鎖

公平鎖是指多個(gè)線程在等待同一個(gè)鎖時(shí),必須按照申請(qǐng)鎖的時(shí)間順序來依次獲得鎖。

synchronized 中的鎖是非公平的,ReentrantLock 默認(rèn)情況下也是非公平的,但是也可以是公平的。

5. 鎖綁定多個(gè)條件

一個(gè) ReentrantLock 可以同時(shí)綁定多個(gè) Condition 對(duì)象。

使用選擇

除非需要使用 ReentrantLock 的高級(jí)功能,否則優(yōu)先使用 synchronized。這是因?yàn)?synchronized 是 JVM 實(shí)現(xiàn)的一種鎖機(jī)制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。并且使用 synchronized 不用擔(dān)心沒有釋放鎖而導(dǎo)致死鎖問題,因?yàn)?JVM 會(huì)確保鎖的釋放。

六、線程之間的協(xié)作

當(dāng)多個(gè)線程可以一起工作去解決某個(gè)問題時(shí),如果某些部分必須在其它部分之前完成,那么就需要對(duì)線程進(jìn)行協(xié)調(diào)。

join()

在線程中調(diào)用另一個(gè)線程的 join() 方法,會(huì)將當(dāng)前線程掛起,而不是忙等待,直到目標(biāo)線程結(jié)束。

對(duì)于以下代碼,雖然 b 線程先啟動(dòng),但是因?yàn)樵?b 線程中調(diào)用了 a 線程的 join() 方法,b 線程會(huì)等待 a 線程結(jié)束才繼續(xù)執(zhí)行,因此最后能夠保證 a 線程的輸出先于 b 線程的輸出。

public class JoinExample {

    private class A extends Thread {
        @Override
        public void run() {
            System.out.println("A");
        }
    }

    private class B extends Thread {

        private A a;

        B(A a) {
            this.a = a;
        }

        @Override
        public void run() {
            try {
                a.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B");
        }
    }

    public void test() {
        A a = new A();
        B b = new B(a);
        b.start();
        a.start();
    }
}
public static void main(String[] args) {
    JoinExample example = new JoinExample();
    example.test();
}
A
B

wait() notify() notifyAll()

調(diào)用 wait() 使得線程等待某個(gè)條件滿足,線程在等待時(shí)會(huì)被掛起,當(dāng)其他線程的運(yùn)行使得這個(gè)條件滿足時(shí),其它線程會(huì)調(diào)用 notify() 或者 notifyAll() 來喚醒掛起的線程。

它們都屬于 Object 的一部分,而不屬于 Thread。

只能用在同步方法或者同步控制塊中使用,否則會(huì)在運(yùn)行時(shí)拋出 IllegalMonitorStateExeception。

使用 wait() 掛起期間,線程會(huì)釋放鎖。這是因?yàn)?,如果沒有釋放鎖,那么其它線程就無法進(jìn)入對(duì)象的同步方法或者同步控制塊中,那么就無法執(zhí)行 notify() 或者 notifyAll() 來喚醒掛起的線程,造成死鎖。

public class WaitNotifyExample {
    public synchronized void before() {
        System.out.println("before");
        notifyAll();
    }

    public synchronized void after() {
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after");
    }
}
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    WaitNotifyExample example = new WaitNotifyExample();
    executorService.execute(() -> example.after());
    executorService.execute(() -> example.before());
}
before
after

wait() 和 sleep() 的區(qū)別

  • wait() 是 Object 的方法,而 sleep() 是 Thread 的靜態(tài)方法;
  • wait() 會(huì)釋放鎖,sleep() 不會(huì)。

await() signal() signalAll()

java.util.concurrent 類庫中提供了 Condition 類來實(shí)現(xiàn)線程之間的協(xié)調(diào),可以在 Condition 上調(diào)用 await() 方法使線程等待,其它線程調(diào)用 signal() 或 signalAll() 方法喚醒等待的線程。相比于 wait() 這種等待方式,await() 可以指定等待的條件,因此更加靈活。

使用 Lock 來獲取一個(gè) Condition 對(duì)象。

public class AwaitSignalExample {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void before() {
        lock.lock();
        try {
            System.out.println("before");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void after() {
        lock.lock();
        try {
            condition.await();
            System.out.println("after");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    AwaitSignalExample example = new AwaitSignalExample();
    executorService.execute(() -> example.after());
    executorService.execute(() -> example.before());
}
before
after

七、J.U.C - AQS

java.util.concurrent(J.U.C)大大提高了并發(fā)性能,AQS 被認(rèn)為是 J.U.C 的核心。

CountdownLatch

用來控制一個(gè)線程等待多個(gè)線程。

維護(hù)了一個(gè)計(jì)數(shù)器 cnt,每次調(diào)用 countDown() 方法會(huì)讓計(jì)數(shù)器的值減 1,減到 0 的時(shí)候,那些因?yàn)檎{(diào)用 await() 方法而在等待的線程就會(huì)被喚醒。

<div align="center"> <img src="../pics//CountdownLatch.png" width=""/> </div>

public class CountdownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        final int totalThread = 10;
        CountDownLatch countDownLatch = new CountDownLatch(totalThread);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalThread; i++) {
            executorService.execute(() -> {
                System.out.print("run..");
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("end");
        executorService.shutdown();
    }
}
run..run..run..run..run..run..run..run..run..run..end

CyclicBarrier

用來控制多個(gè)線程互相等待,只有當(dāng)多個(gè)線程都到達(dá)時(shí),這些線程才會(huì)繼續(xù)執(zhí)行。

和 CountdownLatch 相似,都是通過維護(hù)計(jì)數(shù)器來實(shí)現(xiàn)的。但是它的計(jì)數(shù)器是遞增的,每次執(zhí)行 await() 方法之后,計(jì)數(shù)器會(huì)加 1,直到計(jì)數(shù)器的值和設(shè)置的值相等,等待的所有線程才會(huì)繼續(xù)執(zhí)行。和 CountdownLatch 的另一個(gè)區(qū)別是,CyclicBarrier 的計(jì)數(shù)器可以循環(huán)使用,所以它才叫做循環(huán)屏障。

下圖應(yīng)該從下往上看才正確。

<div align="center"> <img src="../pics//CyclicBarrier.png" width=""/> </div>

public class CyclicBarrierExample {
    public static void main(String[] args) throws InterruptedException {
        final int totalThread = 10;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalThread; i++) {
            executorService.execute(() -> {
                System.out.print("before..");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.print("after..");
            });
        }
        executorService.shutdown();
    }
}
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..

Semaphore

Semaphore 就是操作系統(tǒng)中的信號(hào)量,可以控制對(duì)互斥資源的訪問線程數(shù)。

<div align="center"> <img src="../pics//Semaphore.png" width=""/> </div>

以下代碼模擬了對(duì)某個(gè)服務(wù)的并發(fā)請(qǐng)求,每次只能有 3 個(gè)客戶端同時(shí)訪問,請(qǐng)求總數(shù)為 10。

public class SemaphoreExample {
    public static void main(String[] args) {
        final int clientCount = 3;
        final int totalRequestCount = 10;
        Semaphore semaphore = new Semaphore(clientCount);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalRequestCount; i++) {
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    System.out.print(semaphore.availablePermits() + " ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            });
        }
        executorService.shutdown();
    }
}
2 1 2 2 2 2 2 1 2 2

八、J.U.C - 其它組件

FutureTask

在介紹 Callable 時(shí)我們知道它可以有返回值,返回值通過 Future<V> 進(jìn)行封裝。FutureTask 實(shí)現(xiàn)了 RunnableFuture 接口,該接口繼承自 Runnable 和 Future<V> 接口,這使得 FutureTask 既可以當(dāng)做一個(gè)任務(wù)執(zhí)行,也可以有返回值。

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>

FutureTask 可用于異步獲取執(zhí)行結(jié)果或取消執(zhí)行任務(wù)的場(chǎng)景。當(dāng)一個(gè)計(jì)算任務(wù)需要執(zhí)行很長(zhǎng)時(shí)間,那么就可以用 FutureTask 來封裝這個(gè)任務(wù),主線程在完成自己的任務(wù)之后再去獲取結(jié)果。

public class FutureTaskExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int result = 0;
                for (int i = 0; i < 100; i++) {
                    Thread.sleep(10);
                    result += i;
                }
                return result;
            }
        });

        Thread computeThread = new Thread(futureTask);
        computeThread.start();

        Thread otherThread = new Thread(() -> {
            System.out.println("other task is running...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        otherThread.start();
        System.out.println(futureTask.get());
    }
}
other task is running...
4950

BlockingQueue

java.util.concurrent.BlockingQueue 接口有以下阻塞隊(duì)列的實(shí)現(xiàn):

  • FIFO 隊(duì)列 :LinkedBlockingQueue、ArrayBlockingQueue(固定長(zhǎng)度)
  • 優(yōu)先級(jí)隊(duì)列 :PriorityBlockingQueue

提供了阻塞的 take() 和 put() 方法:如果隊(duì)列為空 take() 將阻塞,直到隊(duì)列中有內(nèi)容;如果隊(duì)列為滿 put() 將阻塞,直到隊(duì)列有空閑位置。

使用 BlockingQueue 實(shí)現(xiàn)生產(chǎn)者消費(fèi)者問題

public class ProducerConsumer {

    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

    private static class Producer extends Thread {
        @Override
        public void run() {
            try {
                queue.put("product");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.print("produce..");
        }
    }

    private static class Consumer extends Thread {

        @Override
        public void run() {
            try {
                String product = queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.print("consume..");
        }
    }
}
public static void main(String[] args) {
    for (int i = 0; i < 2; i++) {
        Producer producer = new Producer();
        producer.start();
    }
    for (int i = 0; i < 5; i++) {
        Consumer consumer = new Consumer();
        consumer.start();
    }
    for (int i = 0; i < 3; i++) {
        Producer producer = new Producer();
        producer.start();
    }
}
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..

ForkJoin

主要用于并行計(jì)算中,和 MapReduce 原理類似,都是把大的計(jì)算任務(wù)拆分成多個(gè)小任務(wù)并行計(jì)算。

public class ForkJoinExample extends RecursiveTask<Integer> {
    private final int threshold = 5;
    private int first;
    private int last;

    public ForkJoinExample(int first, int last) {
        this.first = first;
        this.last = last;
    }

    @Override
    protected Integer compute() {
        int result = 0;
        if (last - first <= threshold) {
            // 任務(wù)足夠小則直接計(jì)算
            for (int i = first; i <= last; i++) {
                result += i;
            }
        } else {
            // 拆分成小任務(wù)
            int middle = first + (last - first) / 2;
            ForkJoinExample leftTask = new ForkJoinExample(first, middle);
            ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
            leftTask.fork();
            rightTask.fork();
            result = leftTask.join() + rightTask.join();
        }
        return result;
    }
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ForkJoinExample example = new ForkJoinExample(1, 10000);
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    Future result = forkJoinPool.submit(example);
    System.out.println(result.get());
}

ForkJoin 使用 ForkJoinPool 來啟動(dòng),它是一個(gè)特殊的線程池,線程數(shù)量取決于 CPU 核數(shù)。

public class ForkJoinPool extends AbstractExecutorService

ForkJoinPool 實(shí)現(xiàn)了工作竊取算法來提高 CPU 的利用率。每個(gè)線程都維護(hù)了一個(gè)雙端隊(duì)列,用來存儲(chǔ)需要執(zhí)行的任務(wù)。工作竊取算法允許空閑的線程從其它線程的雙端隊(duì)列中竊取一個(gè)任務(wù)來執(zhí)行。竊取的任務(wù)必須是最晚的任務(wù),避免和隊(duì)列所屬線程發(fā)生競(jìng)爭(zhēng)。例如下圖中,Thread2 從 Thread1 的隊(duì)列中拿出最晚的 Task1 任務(wù),Thread1 會(huì)拿出 Task2 來執(zhí)行,這樣就避免發(fā)生競(jìng)爭(zhēng)。但是如果隊(duì)列中只有一個(gè)任務(wù)時(shí)還是會(huì)發(fā)生競(jìng)爭(zhēng)。

<div align="center"> <img src="../pics//15b45dc6-27aa-4519-9194-f4acfa2b077f.jpg" width=""/> </div>

九、線程不安全示例

如果多個(gè)線程對(duì)同一個(gè)共享數(shù)據(jù)進(jìn)行訪問而不采取同步操作的話,那么操作的結(jié)果是不一致的。

以下代碼演示了 1000 個(gè)線程同時(shí)對(duì) cnt 執(zhí)行自增操作,操作結(jié)束之后它的值為 997 而不是 1000。

public class ThreadUnsafeExample {

    private int cnt = 0;

    public void add() {
        cnt++;
    }

    public int get() {
        return cnt;
    }
}
public static void main(String[] args) throws InterruptedException {
    final int threadSize = 1000;
    ThreadUnsafeExample example = new ThreadUnsafeExample();
    final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < threadSize; i++) {
        executorService.execute(() -> {
            example.add();
            countDownLatch.countDown();
        });
    }
    countDownLatch.await();
    executorService.shutdown();
    System.out.println(example.get());
}
997

十、Java 內(nèi)存模型

Java 內(nèi)存模型試圖屏蔽各種硬件和操作系統(tǒng)的內(nèi)存訪問差異,以實(shí)現(xiàn)讓 Java 程序在各種平臺(tái)下都能達(dá)到一致的內(nèi)存訪問效果。

主內(nèi)存與工作內(nèi)存

處理器上的寄存器的讀寫的速度比內(nèi)存快幾個(gè)數(shù)量級(jí),為了解決這種速度矛盾,在它們之間加入了高速緩存。

加入高速緩存帶來了一個(gè)新的問題:緩存一致性。如果多個(gè)緩存共享同一塊主內(nèi)存區(qū)域,那么多個(gè)緩存的數(shù)據(jù)可能會(huì)不一致,需要一些協(xié)議來解決這個(gè)問題。

<div align="center"> <img src="../pics//68778c1b-15ab-4826-99c0-3b4fd38cb9e9.png" width=""/> </div>

所有的變量都存儲(chǔ)在主內(nèi)存中,每個(gè)線程還有自己的工作內(nèi)存,工作內(nèi)存存儲(chǔ)在高速緩存或者寄存器中,保存了該線程使用的變量的主內(nèi)存副本拷貝。

線程只能直接操作工作內(nèi)存中的變量,不同線程之間的變量值傳遞需要通過主內(nèi)存來完成。

<div align="center"> <img src="../pics//47358f87-bc4c-496f-9a90-8d696de94cee.png" width=""/> </div>

內(nèi)存間交互操作

Java 內(nèi)存模型定義了 8 個(gè)操作來完成主內(nèi)存和工作內(nèi)存的交互操作。

<div align="center"> <img src="../pics//536c6dfd-305a-4b95-b12c-28ca5e8aa043.png" width=""/> </div>

  • read:把一個(gè)變量的值從主內(nèi)存?zhèn)鬏數(shù)焦ぷ鲀?nèi)存中
  • load:在 read 之后執(zhí)行,把 read 得到的值放入工作內(nèi)存的變量副本中
  • use:把工作內(nèi)存中一個(gè)變量的值傳遞給執(zhí)行引擎
  • assign:把一個(gè)從執(zhí)行引擎接收到的值賦給工作內(nèi)存的變量
  • store:把工作內(nèi)存的一個(gè)變量的值傳送到主內(nèi)存中
  • write:在 store 之后執(zhí)行,把 store 得到的值放入主內(nèi)存的變量中
  • lock:作用于主內(nèi)存的變量
  • unlock

內(nèi)存模型三大特性

1. 原子性

Java 內(nèi)存模型保證了 read、load、use、assign、store、write、lock 和 unlock 操作具有原子性,例如對(duì)一個(gè) int 類型的變量執(zhí)行 assign 賦值操作,這個(gè)操作就是原子性的。但是 Java 內(nèi)存模型允許虛擬機(jī)將沒有被 volatile 修飾的 64 位數(shù)據(jù)(long,double)的讀寫操作劃分為兩次 32 位的操作來進(jìn)行,即 load、store、read 和 write 操作可以不具備原子性。

有一個(gè)錯(cuò)誤認(rèn)識(shí)就是,int 等原子性的變量在多線程環(huán)境中不會(huì)出現(xiàn)線程安全問題。前面的線程不安全示例代碼中,cnt 變量屬于 int 類型變量,1000 個(gè)線程對(duì)它進(jìn)行自增操作之后,得到的值為 997 而不是 1000。

為了方便討論,將內(nèi)存間的交互操作簡(jiǎn)化為 3 個(gè):load、assign、store。

下圖演示了兩個(gè)線程同時(shí)對(duì) cnt 變量進(jìn)行操作,load、assign、store 這一系列操作整體上看不具備原子性,那么在 T1 修改 cnt 并且還沒有將修改后的值寫入主內(nèi)存,T2 依然可以讀入該變量的值??梢钥闯觯@兩個(gè)線程雖然執(zhí)行了兩次自增運(yùn)算,但是主內(nèi)存中 cnt 的值最后為 1 而不是 2。因此對(duì) int 類型讀寫操作滿足原子性只是說明 load、assign、store 這些單個(gè)操作具備原子性。

<div align="center"> <img src="../pics//ef8eab00-1d5e-4d99-a7c2-d6d68ea7fe92.png" width=""/> </div>

AtomicInteger 能保證多個(gè)線程修改的原子性。

<div align="center"> <img src="../pics//952afa9a-458b-44ce-bba9-463e60162945.png" width=""/> </div>

使用 AtomicInteger 重寫之前線程不安全的代碼之后得到以下線程安全實(shí)現(xiàn):

public class AtomicExample {
    private AtomicInteger cnt = new AtomicInteger();

    public void add() {
        cnt.incrementAndGet();
    }

    public int get() {
        return cnt.get();
    }
}
public static void main(String[] args) throws InterruptedException {
    final int threadSize = 1000;
    AtomicExample example = new AtomicExample(); // 只修改這條語句
    final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < threadSize; i++) {
        executorService.execute(() -> {
            example.add();
            countDownLatch.countDown();
        });
    }
    countDownLatch.await();
    executorService.shutdown();
    System.out.println(example.get());
}
1000

除了使用原子類之外,也可以使用 synchronized 互斥鎖來保證操作的原子性。它對(duì)應(yīng)的內(nèi)存間交互操作為:lock 和 unlock,在虛擬機(jī)實(shí)現(xiàn)上對(duì)應(yīng)的字節(jié)碼指令為 monitorenter 和 monitorexit。

public class AtomicSynchronizedExample {
    private int cnt = 0;

    public synchronized void add() {
        cnt++;
    }

    public synchronized int get() {
        return cnt;
    }
}
public static void main(String[] args) throws InterruptedException {
    final int threadSize = 1000;
    AtomicSynchronizedExample example = new AtomicSynchronizedExample();
    final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < threadSize; i++) {
        executorService.execute(() -> {
            example.add();
            countDownLatch.countDown();
        });
    }
    countDownLatch.await();
    executorService.shutdown();
    System.out.println(example.get());
}
1000

2. 可見性

可見性指當(dāng)一個(gè)線程修改了共享變量的值,其它線程能夠立即得知這個(gè)修改。Java 內(nèi)存模型是通過在變量修改后將新值同步回主內(nèi)存,在變量讀取前從主內(nèi)存刷新變量值來實(shí)現(xiàn)可見性的。

主要有有三種實(shí)現(xiàn)可見性的方式:

  • volatile
  • synchronized,對(duì)一個(gè)變量執(zhí)行 unlock 操作之前,必須把變量值同步回主內(nèi)存。
  • final,被 final 關(guān)鍵字修飾的字段在構(gòu)造器中一旦初始化完成,并且沒有發(fā)生 this 逃逸(其它線程通過 this 引用訪問到初始化了一半的對(duì)象),那么其它線程就能看見 final 字段的值。

對(duì)前面的線程不安全示例中的 cnt 變量使用 volatile 修飾,不能解決線程不安全問題,因?yàn)?volatile 并不能保證操作的原子性。

3. 有序性

有序性是指:在本線程內(nèi)觀察,所有操作都是有序的。在一個(gè)線程觀察另一個(gè)線程,所有操作都是無序的,無序是因?yàn)榘l(fā)生了指令重排序。

在 Java 內(nèi)存模型中,允許編譯器和處理器對(duì)指令進(jìn)行重排序,重排序過程不會(huì)影響到單線程程序的執(zhí)行,卻會(huì)影響到多線程并發(fā)執(zhí)行的正確性。

volatile 關(guān)鍵字通過添加內(nèi)存屏障的方式來禁止指令重排,即重排序時(shí)不能把后面的指令放到內(nèi)存屏障之前。

也可以通過 synchronized 來保證有序性,它保證每個(gè)時(shí)刻只有一個(gè)線程執(zhí)行同步代碼,相當(dāng)于是讓線程順序執(zhí)行同步代碼。

先行發(fā)生原則

上面提到了可以用 volatile 和 synchronized 來保證有序性。除此之外,JVM 還規(guī)定了先行發(fā)生原則,讓一個(gè)操作無需控制就能先于另一個(gè)操作完成。

主要有以下這些原則:

1. 單一線程原則

Single Thread rule

在一個(gè)線程內(nèi),在程序前面的操作先行發(fā)生于后面的操作。

<div align="center"> <img src="../pics//single-thread-rule.png" width=""/> </div>

2. 管程鎖定規(guī)則

Monitor Lock Rule

一個(gè) unlock 操作先行發(fā)生于后面對(duì)同一個(gè)鎖的 lock 操作。

<div align="center"> <img src="../pics//monitor-lock-rule.png" width=""/> </div>

3. volatile 變量規(guī)則

Volatile Variable Rule

對(duì)一個(gè) volatile 變量的寫操作先行發(fā)生于后面對(duì)這個(gè)變量的讀操作。

<div align="center"> <img src="../pics//volatile-variable-rule.png" width=""/> </div>

4. 線程啟動(dòng)規(guī)則

Thread Start Rule

Thread 對(duì)象的 start() 方法調(diào)用先行發(fā)生于此線程的每一個(gè)動(dòng)作。

<div align="center"> <img src="../pics//thread-start-rule.png" width=""/> </div>

5. 線程加入規(guī)則

Thread Join Rule

Thread 對(duì)象的結(jié)束先行發(fā)生于 join() 方法返回。

<div align="center"> <img src="../pics//thread-join-rule.png" width=""/> </div>

6. 線程中斷規(guī)則

Thread Interruption Rule

對(duì)線程 interrupt() 方法的調(diào)用先行發(fā)生于被中斷線程的代碼檢測(cè)到中斷事件的發(fā)生,可以通過 interrupted() 方法檢測(cè)到是否有中斷發(fā)生。

7. 對(duì)象終結(jié)規(guī)則

Finalizer Rule

一個(gè)對(duì)象的初始化完成(構(gòu)造函數(shù)執(zhí)行結(jié)束)先行發(fā)生于它的 finalize() 方法的開始。

8. 傳遞性

Transitivity

如果操作 A 先行發(fā)生于操作 B,操作 B 先行發(fā)生于操作 C,那么操作 A 先行發(fā)生于操作 C。

十一、線程安全

線程安全定義

一個(gè)類在可以被多個(gè)線程安全調(diào)用時(shí)就是線程安全的。

線程安全分類

線程安全不是一個(gè)非真即假的命題,可以將共享數(shù)據(jù)按照安全程度的強(qiáng)弱順序分成以下五類:不可變、絕對(duì)線程安全、相對(duì)線程安全、線程兼容和線程對(duì)立。

1. 不可變

不可變(Immutable)的對(duì)象一定是線程安全的,無論是對(duì)象的方法實(shí)現(xiàn)還是方法的調(diào)用者,都不需要再采取任何的線程安全保障措施,只要一個(gè)不可變的對(duì)象被正確地構(gòu)建出來,那其外部的可見狀態(tài)永遠(yuǎn)也不會(huì)改變,永遠(yuǎn)也不會(huì)看到它在多個(gè)線程之中處于不一致的狀態(tài)。

不可變的類型:

  • final 關(guān)鍵字修飾的基本數(shù)據(jù)類型;
  • String
  • 枚舉類型
  • Number 部分子類,如 Long 和 Double 等數(shù)值包裝類型,BigInteger 和 BigDecimal 等大數(shù)據(jù)類型。但同為 Number 的子類型的原子類 AtomicInteger 和 AtomicLong 則并非不可變的。

對(duì)于集合類型,可以使用 Collections.unmodifiableXXX() 方法來獲取一個(gè)不可變的集合。

public class ImmutableExample {
    public static void main(String[] args) {
        Map<String, Integer> map = new HashMap<>();
        Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map);
        unmodifiableMap.put("a", 1);
    }
}
Exception in thread "main" java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
    at ImmutableExample.main(ImmutableExample.java:9)

Collections.unmodifiableXXX() 先對(duì)原始的集合進(jìn)行拷貝,需要對(duì)集合進(jìn)行修改的方法都直接拋出異常。

public V put(K key, V value) {
    throw new UnsupportedOperationException();
}

多線程環(huán)境下,應(yīng)當(dāng)盡量使對(duì)象成為不可變,來滿足線程安全。

2. 絕對(duì)線程安全

不管運(yùn)行時(shí)環(huán)境如何,調(diào)用者都不需要任何額外的同步措施。

3. 相對(duì)線程安全

相對(duì)的線程安全需要保證對(duì)這個(gè)對(duì)象單獨(dú)的操作是線程安全的,在調(diào)用的時(shí)候不需要做額外的保障措施,但是對(duì)于一些特定順序的連續(xù)調(diào)用,就可能需要在調(diào)用端使用額外的同步手段來保證調(diào)用的正確性。

在 Java 語言中,大部分的線程安全類都屬于這種類型,例如 Vector、HashTable、Collections 的 synchronizedCollection() 方法包裝的集合等。

對(duì)于下面的代碼,如果刪除元素的線程刪除了一個(gè)元素,而獲取元素的線程試圖訪問一個(gè)已經(jīng)被刪除的元素,那么就會(huì)拋出 ArrayIndexOutOfBoundsException。

public class VectorUnsafeExample {
    private static Vector<Integer> vector = new Vector<>();

    public static void main(String[] args) {
        while (true) {
            for (int i = 0; i < 100; i++) {
                vector.add(i);
            }
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.execute(() -> {
                for (int i = 0; i < vector.size(); i++) {
                    vector.remove(i);
                }
            });
            executorService.execute(() -> {
                for (int i = 0; i < vector.size(); i++) {
                    vector.get(i);
                }
            });
            executorService.shutdown();
        }
    }
}
Exception in thread "Thread-159738" java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 3
    at java.util.Vector.remove(Vector.java:831)
    at VectorUnsafeExample.lambda$main$0(VectorUnsafeExample.java:14)
    at VectorUnsafeExample$$Lambda$1/713338599.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)

如果要保證上面的代碼能正確執(zhí)行下去,就需要對(duì)刪除元素和獲取元素的代碼進(jìn)行同步。

executorService.execute(() -> {
    synchronized (vector) {
        for (int i = 0; i < vector.size(); i++) {
            vector.remove(i);
        }
    }
});
executorService.execute(() -> {
    synchronized (vector) {
        for (int i = 0; i < vector.size(); i++) {
            vector.get(i);
        }
    }
});

4. 線程兼容

線程兼容是指對(duì)象本身并不是線程安全的,但是可以通過在調(diào)用端正確地使用同步手段來保證對(duì)象在并發(fā)環(huán)境中可以安全地使用,我們平常說一個(gè)類不是線程安全的,絕大多數(shù)時(shí)候指的是這一種情況。Java API 中大部分的類都是屬于線程兼容的,如與前面的 Vector 和 HashTable 相對(duì)應(yīng)的集合類 ArrayList 和 HashMap 等。

5. 線程對(duì)立

線程對(duì)立是指無論調(diào)用端是否采取了同步措施,都無法在多線程環(huán)境中并發(fā)使用的代碼。由于 Java 語言天生就具備多線程特性,線程對(duì)立這種排斥多線程的代碼是很少出現(xiàn)的,而且通常都是有害的,應(yīng)當(dāng)盡量避免。

線程安全的實(shí)現(xiàn)方法

1. 互斥同步

synchronized 和 ReentrantLock。

2. 非阻塞同步

互斥同步最主要的問題就是進(jìn)行線程阻塞和喚醒所帶來的性能問題,因此這種同步也稱為阻塞同步。

互斥同步屬于一種悲觀的并發(fā)策略,總是認(rèn)為只要不去做正確的同步措施,那就肯定會(huì)出現(xiàn)問題。論共享數(shù)據(jù)是否真的會(huì)出現(xiàn)競(jìng)爭(zhēng),它都要進(jìn)行加鎖(這里討論的是概念模型,實(shí)際上虛擬機(jī)會(huì)優(yōu)化掉很大一部分不必要的加鎖)、用戶態(tài)核心態(tài)轉(zhuǎn)換、維護(hù)鎖計(jì)數(shù)器和檢查是否有被阻塞的線程需要喚醒等操作。

隨著硬件指令集的發(fā)展,我們可以使用基于沖突檢測(cè)的樂觀并發(fā)策略:先進(jìn)行操作,如果沒有其它線程爭(zhēng)用共享數(shù)據(jù),那操作就成功了,否則采取補(bǔ)償措施(不斷地重試,直到成功為止)。這種樂觀的并發(fā)策略的許多實(shí)現(xiàn)都不需要把線程掛起,因此這種同步操作稱為非阻塞同步。

樂觀鎖需要操作和沖突檢測(cè)這兩個(gè)步驟具備原子性,這里就不能再使用互斥同步來保證了,只能靠硬件來完成。

硬件支持的原子性操作最典型的是:比較并交換(Compare-and-Swap,CAS)。CAS 指令需要有 3 個(gè)操作數(shù),分別是內(nèi)存地址 V、舊的預(yù)期值 A 和新值 B。當(dāng)執(zhí)行操作時(shí),只有當(dāng) V 的值等于 A,才將 V 的值更新為 B。

J.U.C 包里面的整數(shù)原子類 AtomicInteger,其中的 compareAndSet() 和 getAndIncrement() 等方法都使用了 Unsafe 類的 CAS 操作。

以下代碼使用了 AtomicInteger 執(zhí)行了自增的操作。

private AtomicInteger cnt = new AtomicInteger();

public void add() {
    cnt.incrementAndGet();
}

以下代碼是 incrementAndGet() 的源碼,它調(diào)用了 unsafe 的 getAndAddInt() 。

public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

以下代碼是 getAndAddInt() 源碼,var1 指示內(nèi)存地址,var2 指示舊值,var4 指示操作需要加的數(shù)值,這里為 1。通過 getIntVolatile(var1, var2) 得到舊的預(yù)期值,通過調(diào)用 compareAndSwapInt() 來進(jìn)行 CAS 比較,如果 var2==var5,那么就更新內(nèi)存地址為 var1 的變量為 var5+var4??梢钥吹?getAndAddInt() 在一個(gè)循環(huán)中進(jìn)行,發(fā)生沖突的做法是不斷的進(jìn)行重試。

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

ABA :如果一個(gè)變量初次讀取的時(shí)候是 A 值,它的值被改成了 B,后來又被改回為 A,那 CAS 操作就會(huì)誤認(rèn)為它從來沒有被改變過。

J.U.C 包提供了一個(gè)帶有標(biāo)記的原子引用類 AtomicStampedReference 來解決這個(gè)問題,它可以通過控制變量值的版本來保證 CAS 的正確性。大部分情況下 ABA 問題不會(huì)影響程序并發(fā)的正確性,如果需要解決 ABA 問題,改用傳統(tǒng)的互斥同步可能會(huì)比原子類更高效。

3. 無同步方案

要保證線程安全,并不是一定就要進(jìn)行同步,兩者沒有因果關(guān)系。同步只是保證共享數(shù)據(jù)爭(zhēng)用時(shí)的正確性的手段,如果一個(gè)方法本來就不涉及共享數(shù)據(jù),那它自然就無須任何同步措施去保證正確性,因此會(huì)有一些代碼天生就是線程安全的。

(一)可重入代碼(Reentrant Code)

這種代碼也叫做純代碼(Pure Code),可以在代碼執(zhí)行的任何時(shí)刻中斷它,轉(zhuǎn)而去執(zhí)行另外一段代碼(包括遞歸調(diào)用它本身),而在控制權(quán)返回后,原來的程序不會(huì)出現(xiàn)任何錯(cuò)誤。

可重入代碼有一些共同的特征,例如不依賴存儲(chǔ)在堆上的數(shù)據(jù)和公用的系統(tǒng)資源、用到的狀態(tài)量都由參數(shù)中傳入、不調(diào)用非可重入的方法等。

(二)棧封閉

多個(gè)線程訪問同一個(gè)方法的局部變量時(shí),不會(huì)出現(xiàn)線程安全問題,因?yàn)榫植孔兞看鎯?chǔ)在棧中,屬于線程私有的。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class StackClosedExample {
    public void add100() {
        int cnt = 0;
        for (int i = 0; i < 100; i++) {
            cnt++;
        }
        System.out.println(cnt);
    }
}
public static void main(String[] args) {
    StackClosedExample example = new StackClosedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> example.add100());
    executorService.execute(() -> example.add100());
    executorService.shutdown();
}
100
100

(三)線程本地存儲(chǔ)(Thread Local Storage)

如果一段代碼中所需要的數(shù)據(jù)必須與其他代碼共享,那就看看這些共享數(shù)據(jù)的代碼是否能保證在同一個(gè)線程中執(zhí)行。如果能保證,我們就可以把共享數(shù)據(jù)的可見范圍限制在同一個(gè)線程之內(nèi),這樣,無須同步也能保證線程之間不出現(xiàn)數(shù)據(jù)爭(zhēng)用的問題。

符合這種特點(diǎn)的應(yīng)用并不少見,大部分使用消費(fèi)隊(duì)列的架構(gòu)模式(如“生產(chǎn)者-消費(fèi)者”模式)都會(huì)將產(chǎn)品的消費(fèi)過程盡量在一個(gè)線程中消費(fèi)完,其中最重要的一個(gè)應(yīng)用實(shí)例就是經(jīng)典 Web 交互模型中的“一個(gè)請(qǐng)求對(duì)應(yīng)一個(gè)服務(wù)器線程”(Thread-per-Request)的處理方式,這種處理方式的廣泛應(yīng)用使得很多 Web 服務(wù)端應(yīng)用都可以使用線程本地存儲(chǔ)來解決線程安全問題。

可以使用 java.lang.ThreadLocal 類來實(shí)現(xiàn)線程本地存儲(chǔ)功能。

對(duì)于以下代碼,thread1 中設(shè)置 threadLocal 為 1,而 thread2 設(shè)置 threadLocal 為 2。過了一段時(shí)間之后,thread1 讀取 threadLocal 依然是 1,不受 thread2 的影響。

public class ThreadLocalExample {
    public static void main(String[] args) {
        ThreadLocal threadLocal = new ThreadLocal();
        Thread thread1 = new Thread(() -> {
            threadLocal.set(1);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(threadLocal.get());
            threadLocal.remove();
        });
        Thread thread2 = new Thread(() -> {
            threadLocal.set(2);
            threadLocal.remove();
        });
        thread1.start();
        thread2.start();
    }
}
1

為了理解 ThreadLocal,先看以下代碼:

public class ThreadLocalExample1 {
    public static void main(String[] args) {
        ThreadLocal threadLocal1 = new ThreadLocal();
        ThreadLocal threadLocal2 = new ThreadLocal();
        Thread thread1 = new Thread(() -> {
            threadLocal1.set(1);
            threadLocal2.set(1);
        });
        Thread thread2 = new Thread(() -> {
            threadLocal1.set(2);
            threadLocal2.set(2);
        });
        thread1.start();
        thread2.start();
    }
}

它所對(duì)應(yīng)的底層結(jié)構(gòu)圖為:

<div align="center"> <img src="../pics//3646544a-cb57-451d-9e03-d3c4f5e4434a.png" width=""/> </div>

每個(gè) Thread 都有一個(gè) ThreadLocal.ThreadLocalMap 對(duì)象,Thread 類中就定義了 ThreadLocal.ThreadLocalMap 成員。

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

當(dāng)調(diào)用一個(gè) ThreadLocal 的 set(T value) 方法時(shí),先得到當(dāng)前線程的 ThreadLocalMap 對(duì)象,然后將 ThreadLocal->value 鍵值對(duì)插入到該 Map 中。

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

get() 方法類似。

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

ThreadLocal 從理論上講并不是用來解決多線程并發(fā)問題的,因?yàn)楦静淮嬖诙嗑€程競(jìng)爭(zhēng)。在一些場(chǎng)景 (尤其是使用線程池) 下,由于 ThreadLocal.ThreadLocalMap 的底層數(shù)據(jù)結(jié)構(gòu)導(dǎo)致 ThreadLocal 有內(nèi)存泄漏的情況,盡可能在每次使用 ThreadLocal 后手動(dòng)調(diào)用 remove(),以避免出現(xiàn) ThreadLocal 經(jīng)典的內(nèi)存泄漏甚至是造成自身業(yè)務(wù)混亂的風(fēng)險(xiǎn)。

十二、鎖優(yōu)化

這里的鎖優(yōu)化主要是指虛擬機(jī)對(duì) synchronized 的優(yōu)化。

自旋鎖

互斥同步的進(jìn)入阻塞狀態(tài)的開銷都很大,應(yīng)該盡量避免。在許多應(yīng)用中,共享數(shù)據(jù)的鎖定狀態(tài)只會(huì)持續(xù)很短的一段時(shí)間。自旋鎖的思想是讓一個(gè)線程在請(qǐng)求一個(gè)共享數(shù)據(jù)的鎖時(shí)執(zhí)行忙循環(huán)(自旋)一段時(shí)間,如果在這段時(shí)間內(nèi)能獲得鎖,就可以避免進(jìn)入阻塞狀態(tài)。

自選鎖雖然能避免進(jìn)入阻塞狀態(tài)從而減少開銷,但是它需要進(jìn)行忙循環(huán)操作占用 CPU 時(shí)間,它只適用于共享數(shù)據(jù)的鎖定狀態(tài)很短的場(chǎng)景。

在 JDK 1.6 中引入了自適應(yīng)的自旋鎖。自適應(yīng)意味著自旋的次數(shù)不再固定了,而是由前一次在同一個(gè)鎖上的自旋次數(shù)及鎖的擁有者的狀態(tài)來決定。

鎖消除

鎖消除是指對(duì)于被檢測(cè)出不可能存在競(jìng)爭(zhēng)的共享數(shù)據(jù)的鎖進(jìn)行消除。

鎖消除主要是通過逃逸分析來支持,如果堆上的共享數(shù)據(jù)不可能逃逸出去被其它線程訪問到,那么就可以把它們當(dāng)成私有數(shù)據(jù)對(duì)待,也就可以將它們的鎖進(jìn)行消除。

對(duì)于一些看起來沒有加鎖的代碼,其實(shí)隱式的加了很多鎖。例如下面的字符串拼接代碼就隱式加了鎖:

public static String concatString(String s1, String s2, String s3) {
    return s1 + s2 + s3;
}

String 是一個(gè)不可變的類,編譯器會(huì)對(duì) String 的拼接自動(dòng)優(yōu)化。在 JDK 1.5 之前,會(huì)轉(zhuǎn)化為 StringBuffer 對(duì)象的連續(xù) append() 操作:

public static String concatString(String s1, String s2, String s3) {
    StringBuffer sb = new StringBuffer();
    sb.append(s1);
    sb.append(s2);
    sb.append(s3);
    return sb.toString();
}

每個(gè) append() 方法中都有一個(gè)同步塊。虛擬機(jī)觀察變量 sb,很快就會(huì)發(fā)現(xiàn)它的動(dòng)態(tài)作用域被限制在 concatString() 方法內(nèi)部。也就是說,sb 的所有引用永遠(yuǎn)不會(huì)“逃逸”到 concatString() 方法之外,其他線程無法訪問到它,因此可以進(jìn)行消除。

鎖粗化

如果一系列的連續(xù)操作都對(duì)同一個(gè)對(duì)象反復(fù)加鎖和解鎖,頻繁的加鎖操作就會(huì)導(dǎo)致性能損耗。

上一節(jié)的示例代碼中連續(xù)的 append() 方法就屬于這類情況。如果虛擬機(jī)探測(cè)到由這樣的一串零碎的操作都對(duì)同一個(gè)對(duì)象加鎖,將會(huì)把加鎖的范圍擴(kuò)展(粗化)到整個(gè)操作序列的外部。對(duì)于上一節(jié)的示例代碼就是擴(kuò)展到第一個(gè) append() 操作之前直至最后一個(gè) append() 操作之后,這樣只需要加鎖一次就可以了。

輕量級(jí)鎖

JDK 1.6 引入了偏向鎖和輕量級(jí)鎖,從而讓鎖擁有了四個(gè)狀態(tài):無鎖狀態(tài)(unlocked)、偏向鎖狀態(tài)(biasble)、輕量級(jí)鎖狀態(tài)(lightweight locked)和重量級(jí)鎖狀態(tài)(inflated)。

以下是 HotSpot 虛擬機(jī)對(duì)象頭的內(nèi)存布局,這些數(shù)據(jù)被稱為 mark word。其中 tag bits 對(duì)應(yīng)了五個(gè)狀態(tài),這些狀態(tài)在右側(cè)的 state 表格中給出,應(yīng)該注意的是 state 表格不是存儲(chǔ)在對(duì)象頭中的。除了 marked for gc 狀態(tài),其它四個(gè)狀態(tài)已經(jīng)在前面介紹過了。

<div align="center"> <img src="../pics//bb6a49be-00f2-4f27-a0ce-4ed764bc605c.png" width="600"/> </div>

下圖左側(cè)是一個(gè)線程的虛擬機(jī)棧,其中有一部分稱為 Lock Record 的區(qū)域,這是在輕量級(jí)鎖運(yùn)行過程創(chuàng)建的,用于存放鎖對(duì)象的 Mark Word。而右側(cè)就是一個(gè)鎖對(duì)象,包含了 Mark Word 和其它信息。

<div align="center"> <img src="../pics//051e436c-0e46-4c59-8f67-52d89d656182.png" width="500"/> </div>

輕量級(jí)鎖是相對(duì)于傳統(tǒng)的重量級(jí)鎖而言,它使用 CAS 操作來避免重量級(jí)鎖使用互斥量的開銷。對(duì)于絕大部分的鎖,在整個(gè)同步周期內(nèi)都是不存在競(jìng)爭(zhēng)的,因此也就不需要都使用互斥量進(jìn)行同步,可以先采用 CAS 操作進(jìn)行同步,如果 CAS 失敗了再改用互斥量進(jìn)行同步。

當(dāng)嘗試獲取一個(gè)鎖對(duì)象時(shí),如果鎖對(duì)象標(biāo)記為 0 01,說明鎖對(duì)象的鎖未鎖定(unlocked)狀態(tài)。此時(shí)虛擬機(jī)在當(dāng)前線程棧中創(chuàng)建 Lock Record,然后使用 CAS 操作將對(duì)象的 Mark Word 更新為 Lock Record 指針。如果 CAS 操作成功了,那么線程就獲取了該對(duì)象上的鎖,并且對(duì)象的 Mark Word 的鎖標(biāo)記變?yōu)?00,表示該對(duì)象處于輕量級(jí)鎖狀態(tài)。

<div align="center"> <img src="../pics//baaa681f-7c52-4198-a5ae-303b9386cf47.png" width="500"/> </div>

如果 CAS 操作失敗了,虛擬機(jī)首先會(huì)檢查對(duì)象的 Mark Word 是否指向當(dāng)前線程的虛擬機(jī)棧,如果是的話說明當(dāng)前線程已經(jīng)擁有了這個(gè)鎖對(duì)象,那就可以直接進(jìn)入同步塊繼續(xù)執(zhí)行,否則說明這個(gè)鎖對(duì)象已經(jīng)被其他線程線程搶占了。如果有兩條以上的線程爭(zhēng)用同一個(gè)鎖,那輕量級(jí)鎖就不再有效,要膨脹為重量級(jí)鎖。

偏向鎖

偏向鎖的思想是偏向于讓第一個(gè)獲取鎖對(duì)象的線程,這個(gè)線程在之后獲取該鎖就不再需要進(jìn)行同步操作,甚至連 CAS 操作也不再需要。

當(dāng)鎖對(duì)象第一次被線程獲得的時(shí)候,進(jìn)入偏向狀態(tài),標(biāo)記為 1 01。同時(shí)使用 CAS 操作將線程 ID 記錄到 Mark Word 中,如果 CAS 操作成功,這個(gè)線程以后每次進(jìn)入這個(gè)鎖相關(guān)的同步塊就不需要再進(jìn)行任何同步操作。

當(dāng)有另外一個(gè)線程去嘗試獲取這個(gè)鎖對(duì)象時(shí),偏向狀態(tài)就宣告結(jié)束,此時(shí)撤銷偏向(Revoke Bias)后恢復(fù)到未鎖定狀態(tài)或者輕量級(jí)鎖狀態(tài)。

<div align="center"> <img src="../pics//390c913b-5f31-444f-bbdb-2b88b688e7ce.jpg" width="600"/> </div>

十三、多線程開發(fā)良好的實(shí)踐

  • 給線程起個(gè)有意義的名字,這樣可以方便找 Bug。

  • 縮小同步范圍,例如對(duì)于 synchronized,應(yīng)該盡量使用同步塊而不是同步方法。

  • 多用同步類少用 wait() 和 notify()。首先,CountDownLatch, CyclicBarrier, Semaphore 和 Exchanger 這些同步類簡(jiǎn)化了編碼操作,而用 wait() 和 notify() 很難實(shí)現(xiàn)對(duì)復(fù)雜的控制流;其次,這些同步類是由最好的企業(yè)編寫和維護(hù),在后續(xù)的 JDK 中還會(huì)不斷優(yōu)化和完善,使用這些更高等級(jí)的同步工具你的程序可以不費(fèi)吹灰之力獲得優(yōu)化。

  • 多用并發(fā)集合少用同步集合,例如應(yīng)該使用 ConcurrentHashMap 而不是 Hashtable。

  • 使用本地變量和不可變類來保證線程安全。

  • 使用線程池而不是直接創(chuàng)建 Thread 對(duì)象,這是因?yàn)閯?chuàng)建線程代價(jià)很高,線程池可以有效地利用有限的線程來啟動(dòng)任務(wù)。

  • 使用 BlockingQueue 實(shí)現(xiàn)生產(chǎn)者消費(fèi)者問題。

參考資料

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1.解決信號(hào)量丟失和假喚醒 public class MyWaitNotify3{ MonitorObject m...
    Q羅閱讀 1,017評(píng)論 0 1
  • 原文鏈接 譯者:靖靖 并發(fā) 進(jìn)程和線程 在并發(fā)編程當(dāng)中,有兩個(gè)基本的執(zhí)行單元:進(jìn)程和線程。在java中,我們大部分...
    4b4f3ceb6f71閱讀 853評(píng)論 4 16
  • layout: posttitle: 《Java并發(fā)編程的藝術(shù)》筆記categories: Javaexcerpt...
    xiaogmail閱讀 6,025評(píng)論 1 19
  • 2018年5月1日 星期二 桑吉巴爾島 大雨 曾經(jīng)以為一生很漫長(zhǎng),長(zhǎng)得看不到盡頭。驀然回首,卻突然發(fā)現(xiàn),時(shí)光已一點(diǎn)...
    都市虎妞閱讀 418評(píng)論 6 9
  • 有個(gè)MM很抑悶,說男友是直男,老說她眼睛丑。我認(rèn)真看了一下,雖然是26歲的女孩,眼部問題確實(shí)不樂觀,有眼袋,還有...
    易運(yùn)瓊閱讀 331評(píng)論 0 1

友情鏈接更多精彩內(nèi)容