引言
jdk1.5之后,java concurrent包提供了一些并發(fā)工具類。本文將梳理這些工具類的用法與使用場景。
- CountDownLatch:一個或多個線程阻塞等待,直到另外一批線程完成特定操作。
- CyclicBarrier:多個線程都會阻塞等待,直到所有線程都到達障礙點(barrier),功能上與CountDownLatch比較接近,他們最大區(qū)別是CountDownLatch的倒計時只能生效一次,CyclicBarrier可以循環(huán)使用。
- Exchanger:針對兩個線程的同步器,允許這對線程交換數(shù)據(jù)。
CountDownLatch用法
初始化CountDownLatch需要指定具體的count值。await方法會阻塞線程,直到count值為0,調(diào)用countDown方法可以減少count值。
CountDownLatch的一種典型用法是在主線程中發(fā)起工作線程的執(zhí)行,并等待工作線程的執(zhí)行結(jié)束。
public class CountDownLatch1 {
public static void main(String[] args) {
try {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(5);
for (int i = 0; i < 5; ++i) {
new Thread(new Worker(startSignal, doneSignal)).start();
}
// 發(fā)起開始命令
System.out.println("工作線程開始執(zhí)行...");
startSignal.countDown();
// 執(zhí)行其他任務
System.out.println("執(zhí)行其他流程...");
// 等待作業(yè)線程結(jié)束
doneSignal.await();
System.out.println("工作線程結(jié)束...");
} catch (InterruptedException ex) {
}
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
@Override
public void run() {
try {
startSignal.await();
// 模擬作業(yè)
Thread.sleep(100);
} catch (InterruptedException ex) {
} finally {
System.out.println(Thread.currentThread().getName() + "執(zhí)行結(jié)束");
doneSignal.countDown();
}
}
}
工作線程開始執(zhí)行...
執(zhí)行其他流程...
Thread-1執(zhí)行結(jié)束
Thread-3執(zhí)行結(jié)束
Thread-2執(zhí)行結(jié)束
Thread-4執(zhí)行結(jié)束
Thread-0執(zhí)行結(jié)束
工作線程結(jié)束...
另一種用法是將一個大任務切分成多個子任務,每個子任務在一個新線程執(zhí)行,所有子線程都執(zhí)行結(jié)束就意味著大任務的執(zhí)行結(jié)束。
這種方式非常適用于分治法的實現(xiàn),各個子任務之間沒有依賴關(guān)系。
public class CountDownLatch2 {
public static void main(String[] args) {
try {
List<Integer> numberList = Arrays.asList(1,2,3,4,5);
CountDownLatch doneSignal = new CountDownLatch(5);
Executor e = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; ++i) {
e.execute(new WorkerRunnable(doneSignal, i, numberList));
}
doneSignal.await();
System.out.println("工作線程執(zhí)行結(jié)束");
} catch (InterruptedException ex) {
}
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
private final List<Integer> numberList;
WorkerRunnable(CountDownLatch doneSignal, int i, List<Integer> numberList) {
this.doneSignal = doneSignal;
this.i = i;
this.numberList = numberList;
}
@Override
public void run() {
doWork(i);
doneSignal.countDown();
}
void doWork(int i) {
System.out.println(Thread.currentThread().getName() + "打印" + numberList.get(i));
}
}
pool-1-thread-1打印1
pool-1-thread-5打印5
pool-1-thread-4打印4
pool-1-thread-3打印3
pool-1-thread-2打印2
工作線程執(zhí)行結(jié)束
CyclicBarrier用法
CyclicBarrier非常適用于有一批線程且相互之間需要等待的場景。CyclicBarrier從字面理解是指循環(huán)屏障,這里的屏障意思是說線程調(diào)用await方法導致阻塞等待。
CyclicBarrier有兩個構(gòu)造器方法:
- CyclicBarrier(int parties):parties指定了在這個屏障上,需要有多少個線程調(diào)用await方法到達屏障。
- CyclicBarrier(int parties, Runnable barrierAction):這個構(gòu)造方法比前一個多了
Runnable類型的參數(shù),當最后一個線程到達屏障后就會執(zhí)行這個線程任務。通過這個線程任務可以在所有線程都到達時完成一些諸如狀態(tài)修改的功能。
class Solver {
final int N; //矩陣的行數(shù)
final float[][] data; //要處理的矩陣
final CyclicBarrier barrier; //循環(huán)屏障
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow); //處理指定一行數(shù)據(jù)
try {
barrier.await(); //在屏障處等待直到
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
//初始化CyclicBarrier
barrier = new CyclicBarrier(N, new Runnable() {
public void run() {
mergeRows(); //合并行
}
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();
waitUntilDone();
}
}
Exchanger用法
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger();
Thread thread1 = new Thread(new Worker(exchanger, "thread1"));
Thread thread2 = new Thread(new Worker(exchanger, "thread2"));
thread1.setName("thread1");
thread2.setName("thread2");
thread1.start();
thread2.start();
}
}
class Worker implements Runnable {
private String value;
private final Exchanger exchanger;
public Worker(Exchanger exchanger, String value) {
this.exchanger = exchanger;
this.value = value;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ": " + exchanger.exchange(value));
} catch (InterruptedException ex) {
}
}
}