戳我的筆記鏈接地址
本文是對《Java并發(fā)編程》專欄的讀后小結,跟大家分享。
目錄
1、bug的源頭-三個屬性
2、Java內(nèi)存模型
3、死鎖的解決方案
死鎖發(fā)生的條件
死鎖的預防
4、等待-通知機制
wait的使用范式
wait和sleep的區(qū)別
5、線程的生命周期
通用的線程生命周期(五態(tài)模型)
Java中線程的生命周期
狀態(tài)轉換
6、創(chuàng)建合理的線程數(shù)量
CPU密集型應用
I/O密集型應用
7、Lock與synchronized的不同
用兩個條件變量實現(xiàn)阻塞隊列
異步轉同步
8、用Semaphore實現(xiàn)一個限流器
9、讀寫鎖 ReadWriteLock
讀寫鎖升級問題
9、StampedLock 比讀寫鎖更快的鎖
10、CountDownLatch 和 CyclicBarrier 讓多線程步調(diào)一致
11、Java并發(fā)容器
注意事項
12、原子類
ABA問題
原子類組成概覽
13、Java中的線程池
// todo 線程的運行過程
ThreadPoolExecutor 線程池參數(shù)
拒絕策略
使用線程池的注意事項
14、Future 獲取異步執(zhí)行結果
如何獲取異步任務執(zhí)行結果
FutureTask工具類
14、CompletableFuture 異步編程
15、CompletionService 批量執(zhí)行異步任務
16、Fork/Join 并行計算框架
模擬MapReduce統(tǒng)計單詞數(shù)量
1、bug的源頭-三個屬性
可見性、有序性、原子性。
我們的 CPU、內(nèi)存、I/O 設備都在不斷迭代,不斷朝著更快的方向努力。但是,在這個快速發(fā)展的過程中,有一個核心矛盾一直存在,就是這三者的速度差異。
cpu寄存器緩存導致的可見性問題。
線程切換帶來原子性問題。
編譯優(yōu)化帶來有序性問題。
其中,在 Java 領域一個經(jīng)典的案例就是利用雙重檢查創(chuàng)建單例對象:
public class Singleton {
static Singleton instance;
static Singleton getInstance(){
if (instance == null) {
synchronized(Singleton.class) {
if (instance == null)
instance = new Singleton(); //在這一步如果編譯優(yōu)化
}
}
return instance;
}
}
第八行,如果發(fā)生編譯優(yōu)化:我們以為的 new 操作應該是:
分配一塊內(nèi)存 M;
在內(nèi)存 M 上初始化 Singleton 對象;
然后 M 的地址賦值給 instance 變量。
但是實際上優(yōu)化后的執(zhí)行路徑卻是這樣的:
分配一塊內(nèi)存 M;
將 M 的地址賦值給 instance 變量;
最后在內(nèi)存 M 上初始化 Singleton 對象。
如果在第2步發(fā)生線程a到線程b的切換,線程b直接返回instance,這個時候調(diào)用沒有初始化過的instance對象,會產(chǎn)生空指針異常。
解決辦法:對instance對象加volatile語義申明。
2、Java內(nèi)存模型
Java 內(nèi)存模型規(guī)范了 JVM 如何提供按需禁用緩存和編譯優(yōu)化的方法。具體來說,這些方法包括 volatile、synchronized 和 final 三個關鍵字,以及七項 Happens-Before 規(guī)則。
3、死鎖的解決方案
使用細粒度鎖可以提高并行度,是性能優(yōu)化的一個重要手段。但是有時細粒度的鎖容易導致死鎖。死鎖的一個比較專業(yè)的定義是:一組互相競爭資源的線程因互相等待,導致“永久”阻塞的現(xiàn)象。
死鎖發(fā)生的條件
互斥,共享資源 X 和 Y 只能被一個線程占用;
占有且等待,線程 T1 已經(jīng)取得共享資源 X,在等待共享資源 Y 的時候,不釋放共享資源 X;
不可搶占,其他線程不能強行搶占線程 T1 占有的資源;
循環(huán)等待,線程 T1 等待線程 T2 占有的資源,線程 T2 等待線程 T1 占有的資源,就是循環(huán)等待。
死鎖的預防
反過來分析,也就是說只要我們破壞其中一個,就可以成功避免死鎖的發(fā)生。互斥就是鎖的目的,所以無法預防。
破壞占有且等待,可以一次性申請所有資源。要么全部獲取成功,要么全部獲取失敗。
破壞不可搶占,核心是要能夠主動釋放它占有的資源,這一點 synchronized 是做不到的。java.util.concurrent 這個包下面提供的 Lock 是可以輕松解決這個問題的。提供tryLock(long, TimeUnit) 方法,在一段時間后放棄獲取鎖。
破壞循環(huán)等待條件,破壞這個條件,需要對資源進行排序,然后按序申請資源。
4、等待-通知機制
用 synchronized 實現(xiàn)等待 - 通知機制在 Java 語言里,等待 - 通知機制可以有多種實現(xiàn)方式,比如 Java 語言內(nèi)置的 synchronized 配合 wait()、notify()、notifyAll() 這三個方法就能輕松實現(xiàn)。
如何用 synchronized 實現(xiàn)互斥鎖,你應該已經(jīng)很熟悉了。在下面這個圖里,左邊有一個等待隊列,同一時刻,只允許一個線程進入 synchronized 保護的臨界區(qū)(這個臨界區(qū)可以看作大夫的診室),當有一個線程進入臨界區(qū)后,其他線程就只能進入圖中左邊的等待隊列里等待(相當于患者分診等待)。這個等待隊列和互斥鎖是一對一的關系,每個互斥鎖都有自己獨立的等待隊列。
wait()工作原理圖
notify()工作原理圖
上面我們一直強調(diào) wait()、notify()、notifyAll() 方法操作的等待隊列是互斥鎖的等待隊列,所以如果 synchronized 鎖定的是 this,那么對應的一定是 this.wait()、this.notify()、this.notifyAll();如果 synchronized 鎖定的是 target,那么對應的一定是 target.wait()、target.notify()、target.notifyAll() 。
而且 wait()、notify()、notifyAll() 這三個方法能夠被調(diào)用的前提是已經(jīng)獲取了相應的互斥鎖,所以我們會發(fā)現(xiàn) wait()、notify()、notifyAll() 都是在 synchronized{}內(nèi)部被調(diào)用的。如果在 synchronized{}外部調(diào)用,或者鎖定的 this,而用 target.wait() 調(diào)用的話,JVM 會拋出一個運行時異常:java.lang.IllegalMonitorStateException。
// wait的使用范式
while(條件不滿足) {
wait();
}
除非經(jīng)過深思熟慮,否則盡量使用 notifyAll(),只用notify()可能會導致有線程永遠得不到執(zhí)行。
wait和sleep的區(qū)別
wait與sleep區(qū)別在于: 1. wait會釋放所有鎖而sleep不會釋放鎖資源. 2. wait只能在同步方法和同步塊中使用,而sleep任何地方都可以. 3. wait無需捕捉異常,而sleep需要. 兩者相同點:都會讓渡CPU執(zhí)行時間,等待再次調(diào)度! wait()方法與sleep()方法的不同之處在于,wait()方法會釋放對象的“鎖標志”。當調(diào)用某一對象的wait()方法后,會使當前線程暫停執(zhí)行,并將當前線程放入對象等待池中,直到調(diào)用了notify()方法后,將從對象等待池中移出任意一個線程并放入鎖標志等待池中,只有鎖標志等待池中的線程可以獲取鎖標志,它們隨時準備爭奪鎖的擁有權。當調(diào)用了某個對象的notifyAll()方法,會將對象等待池中的所有線程都移動到該對象的鎖標志等待池。 sleep()方法需要指定等待的時間,它可以讓當前正在執(zhí)行的線程在指定的時間內(nèi)暫停執(zhí)行,進入阻塞狀態(tài),該方法既可以讓其他同優(yōu)先級或者高優(yōu)先級的線程得到執(zhí)行的機會,也可以讓低優(yōu)先級的線程得到執(zhí)行機會。但是sleep()方法不會釋放“鎖標志”,也就是說如果有synchronized同步塊,其他線程仍然不能訪問共享數(shù)據(jù)。
5、線程的生命周期
通用的線程生命周期(五態(tài)模型)
初始狀態(tài),指的是線程已經(jīng)被創(chuàng)建,但是還不允許分配 CPU 執(zhí)行。這個狀態(tài)屬于編程語言特有的,在操作系統(tǒng)層面,真正的線程還沒有創(chuàng)建。
可運行狀態(tài),指的是線程可以分配 CPU 執(zhí)行。在這種狀態(tài)下,真正的操作系統(tǒng)線程已經(jīng)被成功創(chuàng)建了,所以可以分配 CPU 執(zhí)行。
當有空閑的 CPU 時,操作系統(tǒng)會將其分配給一個處于可運行狀態(tài)的線程,被分配到 CPU 的線程的狀態(tài)就轉換成了運行狀態(tài)。
運行狀態(tài)的線程如果調(diào)用一個阻塞的 API(例如以阻塞方式讀文件)或者等待某個事件(例如條件變量),那么線程的狀態(tài)就會轉換到休眠狀態(tài),同時釋放 CPU 使用權,休眠狀態(tài)的線程永遠沒有機會獲得 CPU 使用權。當?shù)却氖录霈F(xiàn)了,線程就會從休眠狀態(tài)轉換到可運行狀態(tài)。
線程執(zhí)行完或者出現(xiàn)異常就會進入終止狀態(tài),終止狀態(tài)的線程不會切換到其他任何狀態(tài),進入終止狀態(tài)也就意味著線程的生命周期結束了。
Java中線程的生命周期
這五種狀態(tài)在不同編程語言里會有簡化合并。
Java 語言里則把可運行狀態(tài)和運行狀態(tài)合并了(變成了運行狀態(tài)),這兩個狀態(tài)在操作系統(tǒng)調(diào)度層面有用,而 JVM 層面不關心這兩個狀態(tài),因為 JVM 把線程調(diào)度交給操作系統(tǒng)處理了。除了簡化合并,這五種狀態(tài)也有可能被細化,比如,Java 語言里就細化了休眠狀態(tài)(Blocked、Waiting、Timed_Waiting).
Java 語言中線程共有六種狀態(tài),分別是:
NEW(初始化狀態(tài))
RUNNABLE(可運行 / 運行狀態(tài))
BLOCKED(阻塞狀態(tài))
WAITING(無時限等待)
TIMED_WAITING(有時限等待)
TERMINATED(終止狀態(tài))
狀態(tài)轉換
RUNNABLE 與 BLOCKED 的狀態(tài)轉換
只有一種場景會觸發(fā)這種轉換,就是線程等待 synchronized 的隱式鎖。RUNNABLE 與 WAITING 的狀態(tài)轉換
第一種場景,獲得 synchronized 隱式鎖的線程,調(diào)用無參數(shù)的 Object.wait() 方法。
第二種場景,調(diào)用無參數(shù)的 Thread.join() 方法。其中的 join() 是一種線程同步方法,例如有一個線程對象 thread A,當調(diào)用 A.join() 的時候,執(zhí)行這條語句的線程會等待 thread A 執(zhí)行完,而等待中的這個線程,其狀態(tài)會從 RUNNABLE 轉換到 WAITING。當線程 thread A 執(zhí)行完,原來等待它的線程又會從 WAITING 狀態(tài)轉換到 RUNNABLE。
第三種場景,調(diào)用 LockSupport.park() 方法。其中的 LockSupport 對象,也許你有點陌生,其實 Java 并發(fā)包中的鎖,都是基于它實現(xiàn)的。調(diào)用 LockSupport.park() 方法,當前線程會阻塞,線程的狀態(tài)會從 RUNNABLE 轉換到 WAITING。調(diào)用 LockSupport.unpark(Thread thread) 可喚醒目標線程,目標線程的狀態(tài)又會從 WAITING 狀態(tài)轉換到 RUNNABLE。RUNNABLE 與 TIMED_WAITING 的狀態(tài)轉換
有五種場景會觸發(fā)這種轉換:
調(diào)用帶超時參數(shù)的 Thread.sleep(long millis) 方法;
獲得 synchronized 隱式鎖的線程,調(diào)用帶超時參數(shù)的 Object.wait(long timeout) 方法;
調(diào)用帶超時參數(shù)的 Thread.join(long millis) 方法;
調(diào)用帶超時參數(shù)的 LockSupport.parkNanos(Object blocker, long deadline) 方法;
調(diào)用帶超時參數(shù)的 LockSupport.parkUntil(long deadline) 方法。
這里你會發(fā)現(xiàn) TIMED_WAITING 和 WAITING 狀態(tài)的區(qū)別,僅僅是觸發(fā)條件多了超時參數(shù)。從 NEW 到 RUNNABLE 狀態(tài)
從 NEW 狀態(tài)轉換到 RUNNABLE 狀態(tài)很簡單,只要調(diào)用線程對象的 start() 方法就可以了從 RUNNABLE 到 TERMINATED 狀態(tài)
線程執(zhí)行完 run() 方法后,會自動轉換到 TERMINATED 狀態(tài),當然如果執(zhí)行 run() 方法的時候異常拋出,也會導致線程終止。有時候我們需要強制中斷 run() 方法的執(zhí)行,例如 run() 方法訪問一個很慢的網(wǎng)絡,我們等不下去了,想終止怎么辦呢?Java 的 Thread 類里面倒是有個 stop() 方法,不過已經(jīng)標記為 @Deprecated,所以不建議使用了。正確的姿勢其實是調(diào)用 interrupt() 方法。
stop() 方法會真的殺死線程,不給線程喘息的機會,如果線程持有 ReentrantLock 鎖,被 stop() 的線程并不會自動調(diào)用 ReentrantLock 的 unlock() 去釋放鎖,那其他線程就再也沒機會獲得 ReentrantLock 鎖。
interrupt() 方法僅僅是通知線程,線程有機會執(zhí)行一些后續(xù)操作,同時也可以無視這個通知。被 interrupt 的線程,是怎么收到通知的呢?一種是異常,另一種是主動檢測。
當線程 A 處于 WAITING、TIMED_WAITING 狀態(tài)時,如果其他線程調(diào)用線程 A 的 interrupt() 方法,會使線程 A 返回到 RUNNABLE 狀態(tài),同時線程 A 的代碼會觸發(fā) InterruptedException 異常。上面我們提到轉換到 WAITING、TIMED_WAITING 狀態(tài)的觸發(fā)條件,都是調(diào)用了類似 wait()、join()、sleep() 這樣的方法,我們看這些方法的簽名,發(fā)現(xiàn)都會 throws InterruptedException 這個異常。這個異常的觸發(fā)條件就是:其他線程調(diào)用了該線程的 interrupt() 方法。
6、創(chuàng)建合理的線程數(shù)量
創(chuàng)建合理的線程數(shù)量,目的是將硬件的性能發(fā)揮到極致。根據(jù)不同的應用場景,我們將應用分為兩種場景論述,I/O密集型應用和CPU密集型應用。
CPU密集型應用
對于 CPU 密集型的計算場景,理論上“線程的數(shù)量 =CPU 核數(shù)”就是最合適的。不過在工程上,線程的數(shù)量一般會設置為“CPU 核數(shù) +1”,這樣的話,當線程因為偶爾的內(nèi)存頁失效或其他原因?qū)е伦枞麜r,這個額外的線程可以頂上,從而保證 CPU 的利用率。
I/O密集型應用
對于 I/O 密集型計算場景,最佳的線程數(shù)是與程序中 CPU 計算和 I/O 操作的耗時比相關的,我們可以總結出這樣一個公式:
最佳線程數(shù) =1 +(I/O 耗時 / CPU 耗時)
不過上面這個公式是針對單核 CPU 的,至于多核 CPU,也很簡單,只需要等比擴大就可以了,計算公式如下:
最佳線程數(shù) =CPU 核數(shù) * [ 1 +(I/O 耗時 / CPU 耗時)]
Q: 有些同學對于最佳線程數(shù)的設置積累了一些經(jīng)驗值,認為對于 I/O 密集型應用,最佳線程數(shù)應該為:2 * CPU 的核數(shù) + 1,你覺得這個經(jīng)驗值合理嗎?
A: 工作中都是按照邏輯核數(shù)來的,理論值和經(jīng)驗值只是提供個指導,實際上還是要靠壓測?。?!
7、Lock與synchronized的不同
synchronized存在,Javasdk還造一個Lock的原因主要是為了彌補synchronized,會阻塞線程且不會釋放已經(jīng)占有的資源的問題,lock的三個方法如下:
// 支持響應中斷
void lockInterruptibly()
throws InterruptedException;
// 支持超時
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException;
// 支持非阻塞獲取鎖
boolean tryLock();
并且,Lock支持多個條件變量。Lock用來實現(xiàn)“互斥”,condition用來實現(xiàn)“同步”。
用兩個條件變量實現(xiàn)阻塞隊列
public class BlockedQueue<T>{
final Lock lock =
new ReentrantLock();
// 條件變量:隊列不滿
final Condition notFull =
lock.newCondition();
// 條件變量:隊列不空
final Condition notEmpty =
lock.newCondition();
// 入隊
void enq(T x) {
lock.lock();
try {
while (隊列已滿){
// 等待隊列不滿
notFull.await();
}
// 省略入隊操作...
//入隊后,通知可出隊
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出隊
void deq(){
lock.lock();
try {
while (隊列已空){
// 等待隊列不空
notEmpty.await();
}
// 省略出隊操作...
//出隊后,通知可入隊
notFull.signal();
}finally {
lock.unlock();
}
}
}
不過,這里你需要注意,Lock 和 Condition 實現(xiàn)的管程,線程等待和通知需要調(diào)用 await()、signal()、signalAll(),它們的語義和 wait()、notify()、notifyAll() 是相同的。但是不一樣的是,Lock&Condition 實現(xiàn)的管程里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 實現(xiàn)的管程里才能使用。如果一不小心在 Lock&Condition 實現(xiàn)的管程里調(diào)用了 wait()、notify()、notifyAll(),那程序可就徹底玩兒完了。
異步轉同步
遠程調(diào)用rpc請求時,面臨異步轉同步的問題,因為tcp層面上rpc請求就是異步的,它不會等待請求返回結果,所以類似于dubbo這種rpc框架也是做了異步轉同步的工作,具體實現(xiàn)類似于上面“用兩個條件變量實現(xiàn)阻塞隊列”的代碼。
8、用Semaphore實現(xiàn)一個限流器
極客專欄鏈接 https://time.geekbang.org/column/article/88499
Semaphore是信號量的意思,可以允許多個線程訪問一個臨界區(qū)??梢杂脕韺崿F(xiàn)比較常見的需求就是我們工作中遇到的各種池化資源,例如連接池、對象池、線程池等等。
9、讀寫鎖 ReadWriteLock
極客專欄鏈接(質(zhì)量很高,也很實用) https://time.geekbang.org/column/article/88909
這個鏈接里文章實現(xiàn)了一個簡易的完備緩存的示例。
讀寫鎖升級問題
讀鎖不能升級為寫鎖。
寫鎖可以降級為讀鎖。
如果進行讀寫鎖升級,讀鎖還沒有釋放,此時獲取寫鎖,會導致寫鎖永久等待,最終導致相關線程都被阻塞,永遠也沒有機會被喚醒。鎖的升級是不允許的,這個你一定要注意。
9、StampedLock 比讀寫鎖更快的鎖
極客專欄鏈接 使用stampedLock有幾個比較重要需要注意的點,所以謹慎使用。
支持三種模式
寫鎖
悲觀讀鎖(類似與讀寫鎖的讀鎖)
樂觀讀(無鎖,檢測到有鎖的時候需要轉成悲觀讀鎖)
10、CountDownLatch 和 CyclicBarrier 讓多線程步調(diào)一致
極客時間專欄鏈接 https://time.geekbang.org/column/article/89461
CountDownLatch 和 CyclicBarrier 是 Java 并發(fā)包提供的兩個非常易用的線程同步工具類,這兩個工具類用法的區(qū)別在這里還是有必要再強調(diào)一下:
CountDownLatch 主要用來解決一個線程等待多個線程的場景,可以類比旅游團團長要等待所有的游客到齊才能去下一個景點;
而 CyclicBarrier 是一組線程之間互相等待,更像是幾個驢友之間不離不棄。
除此之外 CountDownLatch 的計數(shù)器是不能循環(huán)利用的,也就是說一旦計數(shù)器減到 0,再有線程調(diào)用 await(),該線程會直接通過。但 CyclicBarrier 的計數(shù)器是可以循環(huán)利用的,而且具備自動重置的功能,一旦計數(shù)器減到 0 會自動重置到你設置的初始值。除此之外,CyclicBarrier 還可以設置回調(diào)函數(shù),可以說是功能豐富。
11、Java并發(fā)容器
通過對操作方法加synchronized關鍵字,可以使得一個容器變成線程安全的容器。這類容器稱為同步容器,針對同步容器的性能問題,Java在1.5版本之后出來了并發(fā)容器,性能更高。
同步容器:
常見的有Vector、Stack 和 Hashtable等,通過對操作方法加synchronized關鍵字實現(xiàn)。
并發(fā)容器:
數(shù)量較多,主要有以下四大類:Set、Map、Set、Queue
并發(fā)容器關系圖:
比較熟悉的有ConcurrentHashMap、BlockingQueue....
注意事項
1、在容器領域一個容易被忽視的“坑”是用迭代器遍歷容器。
// 有問題的寫法
List list = Collections.
synchronizedList(new ArrayList());
Iterator i = list.iterator();
while (i.hasNext())
foo(i.next());
// 正確的寫法
// 因為是對list的操作,如果list變化了,會使得迭代器報錯
// 所以先把list鎖住,保證迭代器運行期間list不會變化
// 這也是在迭代器里對當前元素刪除會報錯的原因
List list = Collections.
synchronizedList(new ArrayList());
Iterator i = list.iterator();
while (i.hasNext())
foo(i.next());
2、另外,使用隊列時,需要格外注意隊列是否支持有界(所謂有界指的是內(nèi)部的隊列是否有容量限制)。實際工作中,一般都不建議使用無界的隊列,因為數(shù)據(jù)量大了之后很容易導致 OOM。上面我們提到的這些 Queue 中,只有 ArrayBlockingQueue 和 LinkedBlockingQueue 是支持有界的,所以在使用其他無界隊列時,一定要充分考慮是否存在導致 OOM 的隱患。
12、原子類
極客時間專欄 https://time.geekbang.org/column/article/90515
原子類高性能的秘密就是硬件支持,基于CPU提供的cas(compare and swap)指令。
ABA問題
aba問題,指的是一個共享變量經(jīng)歷了A--》B--》A的一個過程,雖然最終值一樣,但是已經(jīng)被更新過了。使用原子化的更新對象很可能就需要關心 ABA 問題,因為兩個 A 雖然相等,但是第二個 A 的屬性可能已經(jīng)發(fā)生變化了。
相關實現(xiàn)有 AtomicReference、AtomicStampedReference 和 AtomicMarkableReference,利用它們可以實現(xiàn)對象引用的原子化更新。AtomicReference 提供的方法和原子化的基本數(shù)據(jù)類型差不多,這里不再贅述。不過需要注意的是,對象引用的更新需要重點關注 ABA 問題,AtomicStampedReference 和 AtomicMarkableReference 這兩個原子類可以解決 ABA 問題。
原子類組成概覽
13、Java中的線程池
美團技術文章Java線程池
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
使用線程池的目的是為了避免線程的頻繁創(chuàng)建和銷毀。線程是一個重量級的對象,應該避免頻繁創(chuàng)建和銷毀。
線程池是一種生產(chǎn)者 - 消費者模式。線程池的使用方是生產(chǎn)者,線程池本身是消費者。在下面的示例代碼中,我們創(chuàng)建了一個非常簡單的線程池 MyThreadPool,你可以通過它來理解線程池的工作原理。
//簡化的線程池,僅用來說明工作原理
class MyThreadPool{
//利用阻塞隊列實現(xiàn)生產(chǎn)者-消費者模式
BlockingQueue<Runnable> workQueue;
//保存內(nèi)部工作線程
List<WorkerThread> threads
= new ArrayList<>();
// 構造方法
MyThreadPool(int poolSize,
BlockingQueue<Runnable> workQueue){
this.workQueue = workQueue;
// 創(chuàng)建工作線程
for(int idx=0; idx<poolSize; idx++){
WorkerThread work = new WorkerThread();
work.start();
threads.add(work);
}
}
// 提交任務
void execute(Runnable command){
workQueue.put(command);
}
// 工作線程負責消費任務,并執(zhí)行任務
class WorkerThread extends Thread{
public void run() {
//循環(huán)取任務并執(zhí)行
while(true){ ①
Runnable task = workQueue.take();
task.run();
}
}
}
}
/** 下面是使用示例 **/
// 創(chuàng)建有界阻塞隊列
BlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(2);
// 創(chuàng)建線程池
MyThreadPool pool = new MyThreadPool(
10, workQueue);
// 提交任務
pool.execute(()->{
System.out.println("hello");
});
在 MyThreadPool 的內(nèi)部,我們維護了一個阻塞隊列 workQueue 和一組工作線程,工作線程的個數(shù)由構造函數(shù)中的 poolSize 來指定。用戶通過調(diào)用 execute() 方法來提交 Runnable 任務,execute() 方法的內(nèi)部實現(xiàn)僅僅是將任務加入到 workQueue 中。MyThreadPool 內(nèi)部維護的工作線程會消費 workQueue 中的任務并執(zhí)行任務,相關的代碼就是代碼①處的 while 循環(huán)。線程池主要的工作原理就這些,是不是還挺簡單的?
// todo 線程的運行過程
什么情況下增加新線程、添加到任務隊列等等
ThreadPoolExecutor 線程池參數(shù)
ThreadPoolExecutor(
int corePoolSize, // 表示線程池保有的最小線程數(shù)。
int maximumPoolSize, // 表示線程池創(chuàng)建的最大線程數(shù)。當項目很忙時,就需要加人,最多加到 maximumPoolSize 個人。
long keepAliveTime, // 線程可以空閑的存活時間
TimeUnit unit,
BlockingQueue<Runnable> workQueue, //工作隊列,用來保存生產(chǎn)的資源,也是線程要消費的資源
ThreadFactory threadFactory, // 通過這個參數(shù)自定義如何創(chuàng)建線程,如給線程指定一個名字
RejectedExecutionHandler handler) // 拒絕策略
拒絕策略
ThreadPoolExecutor 已經(jīng)提供了以下 4 種策略。
CallerRunsPolicy:提交任務的線程自己去執(zhí)行該任務。
AbortPolicy:默認的拒絕策略,會 throws RejectedExecutionException。
DiscardPolicy:直接丟棄任務,沒有任何異常拋出。
DiscardOldestPolicy:丟棄最老的任務,其實就是把最早進入工作隊列的任務丟棄,然后把新任務加入到工作隊列。
使用線程池的注意事項
1、盡量使用有界隊列
Java 并發(fā)包里提供了一個線程池的靜態(tài)工廠類 Executors,利用 Executors 你可以快速創(chuàng)建線程池。不過目前大廠的編碼規(guī)范中基本上都不建議使用 Executors 了。
不建議使用 Executors 的最重要的原因是:Executors 提供的很多方法默認使用的都是無界的 LinkedBlockingQueue,高負載情境下,無界隊列很容易導致 OOM,而 OOM 會導致所有請求都無法處理,這是致命問題。所以強烈建議使用有界隊列。
2、默認的拒絕策略要慎用
使用有界隊列,當任務過多時,線程池會觸發(fā)執(zhí)行拒絕策略,線程池默認的拒絕策略會 throw RejectedExecutionException 這是個運行時異常,對于運行時異常編譯器并不強制 catch 它,所以開發(fā)人員很容易忽略。因此默認拒絕策略要慎重使用。
14、Future 獲取異步執(zhí)行結果
極客時間專欄 https://time.geekbang.org/column/article/91292
如何獲取異步任務執(zhí)行結果
Java 通過 ThreadPoolExecutor 提供的 3 個 submit() 方法和 1 個 FutureTask 工具類來支持獲得任務執(zhí)行結果的需求。下面我們先來介紹這 3 個 submit() 方法,這 3 個方法的方法簽名如下。
// 提交Runnable任務
Future<?>
submit(Runnable task);
// 提交Callable任務
<T> Future<T>
submit(Callable<T> task);
// 提交Runnable任務及結果引用
<T> Future<T>
submit(Runnable task, T result);
你會發(fā)現(xiàn)它們的返回值都是 Future 接口,F(xiàn)uture 接口有 5 個方法,我都列在下面了,它們分別是取消任務的方法 cancel()、判斷任務是否已取消的方法 isCancelled()、判斷任務是否已結束的方法 isDone()以及2 個獲得任務執(zhí)行結果的 get() 和 get(timeout, unit),其中最后一個 get(timeout, unit) 支持超時機制。通過 Future 接口的這 5 個方法你會發(fā)現(xiàn),我們提交的任務不但能夠獲取任務執(zhí)行結果,還可以取消任務。不過需要注意的是:這兩個 get() 方法都是阻塞式的,如果被調(diào)用的時候,任務還沒有執(zhí)行完,那么調(diào)用 get() 方法的線程會阻塞,直到任務執(zhí)行完才會被喚醒。
// 取消任務
boolean cancel(
boolean mayInterruptIfRunning);
// 判斷任務是否已取消
boolean isCancelled();
// 判斷任務是否已結束
boolean isDone();
// 獲得任務執(zhí)行結果
get();
// 獲得任務執(zhí)行結果,支持超時
get(long timeout, TimeUnit unit);
FutureTask工具類
前面我們提到的 Future 是一個接口,而 FutureTask 是一個實實在在的工具類,這個工具類有兩個構造函數(shù),它們的參數(shù)和前面介紹的 submit() 方法類似,所以這里我就不再贅述了。
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);
那如何使用 FutureTask 呢?其實很簡單,F(xiàn)utureTask 實現(xiàn)了 Runnable 和 Future 接口,由于實現(xiàn)了 Runnable 接口,所以可以將 FutureTask 對象作為任務提交給 ThreadPoolExecutor 去執(zhí)行,也可以直接被 Thread 執(zhí)行;又因為實現(xiàn)了 Future 接口,所以也能用來獲得任務的執(zhí)行結果。下面的示例代碼是將 FutureTask 對象提交給 ThreadPoolExecutor 去執(zhí)行。
// 創(chuàng)建FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 創(chuàng)建線程池
ExecutorService es =
Executors.newCachedThreadPool();
// 提交FutureTask
es.submit(futureTask);
// 獲取計算結果
Integer result = futureTask.get();
FutureTask 對象直接被 Thread 執(zhí)行的示例代碼如下所示。相信你已經(jīng)發(fā)現(xiàn)了,利用 FutureTask 對象可以很容易獲取子線程的執(zhí)行結果。
// 創(chuàng)建FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 創(chuàng)建并啟動線程
Thread T1 = new Thread(futureTask);
T1.start();
// 獲取計算結果
Integer result = futureTask.get();
// 以上兩種方式還可以組合起來使用。
下面的示例代碼就是用這一章提到的 Future 特性來實現(xiàn)的。首先,我們創(chuàng)建了兩個 FutureTask——ft1 和 ft2,ft1 完成洗水壺、燒開水、泡茶的任務,ft2 完成洗茶壺、洗茶杯、拿茶葉的任務;這里需要注意的是 ft1 這個任務在執(zhí)行泡茶任務前,需要等待 ft2 把茶葉拿來,所以 ft1 內(nèi)部需要引用 ft2,并在執(zhí)行泡茶之前,調(diào)用 ft2 的 get() 方法實現(xiàn)等待。
// 創(chuàng)建任務T2的FutureTask
FutureTask<String> ft2
= new FutureTask<>(new T2Task());
// 創(chuàng)建任務T1的FutureTask
FutureTask<String> ft1
= new FutureTask<>(new T1Task(ft2));
// 線程T1執(zhí)行任務ft1
Thread T1 = new Thread(ft1);
T1.start();
// 線程T2執(zhí)行任務ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待線程T1執(zhí)行結果
System.out.println(ft1.get());
// T1Task需要執(zhí)行的任務:
// 洗水壺、燒開水、泡茶
class T1Task implements Callable<String>{
FutureTask<String> ft2;
// T1任務需要T2任務的FutureTask
T1Task(FutureTask<String> ft2){
this.ft2 = ft2;
}
@Override
String call() throws Exception {
System.out.println("T1:洗水壺...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T1:燒開水...");
TimeUnit.SECONDS.sleep(15);
// 獲取T2線程的茶葉
String tf = ft2.get();
System.out.println("T1:拿到茶葉:"+tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
}
}
// T2Task需要執(zhí)行的任務:
// 洗茶壺、洗茶杯、拿茶葉
class T2Task implements Callable<String> {
@Override
String call() throws Exception {
System.out.println("T2:洗茶壺...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T2:洗茶杯...");
TimeUnit.SECONDS.sleep(2);
System.out.println("T2:拿茶葉...");
TimeUnit.SECONDS.sleep(1);
return "龍井";
}
}
// 一次執(zhí)行結果:
T1:洗水壺...
T2:洗茶壺...
T1:燒開水...
T2:洗茶杯...
T2:拿茶葉...
T1:拿到茶葉:龍井
T1:泡茶...
上茶:龍井
14、CompletableFuture 異步編程
極客時間專欄文章鏈接 https://time.geekbang.org/column/article/91569
CompletableFuture 實現(xiàn)了 CompletionStage 接口。任務是有時序關系的,比如有串行關系、并行關系、匯聚關系等,這些都在CompletionStage接口中得到了體現(xiàn)。很好用就是了。具體參考上述文章鏈接。
注意異常處理,默認情況下 CompletableFuture 會使用公共的 ForkJoinPool 線程池,這個線程池默認創(chuàng)建的線程數(shù)是 CPU 的核數(shù)(也可以通過 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設置 ForkJoinPool 線程池的線程數(shù))。如果所有 CompletableFuture 共享一個線程池,那么一旦有任務執(zhí)行一些很慢的 I/O 操作,就會導致線程池中所有線程都阻塞在 I/O 操作上,從而造成線程饑餓,進而影響整個系統(tǒng)的性能。所以,強烈建議你要根據(jù)不同的業(yè)務類型創(chuàng)建不同的線程池,以避免互相干擾。
15、CompletionService 批量執(zhí)行異步任務
當需要批量提交異步任務的時候建議你使用 CompletionService。CompletionService 將線程池 Executor 和阻塞隊列 BlockingQueue 的功能融合在了一起,能夠讓批量異步任務的管理更簡單。除此之外,CompletionService 能夠讓異步任務的執(zhí)行結果有序化,先執(zhí)行完的先進入阻塞隊列,利用這個特性,你可以輕松實現(xiàn)后續(xù)處理的有序性,避免無謂的等待,同時還可以快速實現(xiàn)諸如 Forking Cluster 這樣的需求。CompletionService 的實現(xiàn)類 ExecutorCompletionService,需要你自己創(chuàng)建線程池,雖看上去有些啰嗦,但好處是你可以讓多個 ExecutorCompletionService 的線程池隔離,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險。
16、Fork/Join 并行計算框架
極客時間專欄文章鏈接 https://time.geekbang.org/column/article/92524
Fork/Join 并行計算框架主要解決的是分治任務。分治的核心思想是“分而治之”:將一個大的任務拆分成小的子任務去解決,然后再把子任務的結果聚合起來從而得到最終結果。這個過程非常類似于大數(shù)據(jù)處理中的 MapReduce,所以你可以把 Fork/Join 看作單機版的 MapReduce。
Fork/Join 并行計算框架的核心組件是 ForkJoinPool。ForkJoinPool 支持任務竊取機制,能夠讓所有線程的工作量基本均衡,不會出現(xiàn)有的線程很忙,而有的線程很閑的狀況,所以性能很好。Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 為基礎的。不過需要你注意的是,默認情況下所有的并行流計算都共享一個 ForkJoinPool,這個共享的 ForkJoinPool 默認的線程數(shù)是 CPU 的核數(shù);如果所有的并行流計算都是 CPU 密集型計算的話,完全沒有問題,但是如果存在 I/O 密集型的并行流計算,那么很可能會因為一個很慢的 I/O 計算而拖慢整個系統(tǒng)的性能。所以建議用不同的 ForkJoinPool 執(zhí)行不同類型的計算任務。
模擬MapReduce統(tǒng)計單詞數(shù)量
學習 MapReduce 有一個入門程序,統(tǒng)計一個文件里面每個單詞的數(shù)量,下面我們來看看如何用 Fork/Join 并行計算框架來實現(xiàn)。我們可以先用二分法遞歸地將一個文件拆分成更小的文件,直到文件里只有一行數(shù)據(jù),然后統(tǒng)計這一行數(shù)據(jù)里單詞的數(shù)量,最后再逐級匯總結果,你可以對照前面的簡版分治任務模型圖來理解這個過程。
上述極客時間鏈接里存在具體的實現(xiàn)代碼,可以仔細看看。