java并發(fā)工具類:CountDownLatch、CyclicBarrier和Exchanger

引言

jdk1.5之后,java concurrent包提供了一些并發(fā)工具類。本文將梳理這些工具類的用法與使用場景。

  • CountDownLatch:一個或多個線程阻塞等待,直到另外一批線程完成特定操作。
  • CyclicBarrier:多個線程都會阻塞等待,直到所有線程都到達障礙點(barrier),功能上與CountDownLatch比較接近,他們最大區(qū)別是CountDownLatch的倒計時只能生效一次,CyclicBarrier可以循環(huán)使用。
  • Exchanger:針對兩個線程的同步器,允許這對線程交換數(shù)據(jù)。

CountDownLatch用法

初始化CountDownLatch需要指定具體的count值。await方法會阻塞線程,直到count值為0,調(diào)用countDown方法可以減少count值。
CountDownLatch的一種典型用法是在主線程中發(fā)起工作線程的執(zhí)行,并等待工作線程的執(zhí)行結(jié)束。

public class CountDownLatch1 {
    public static void main(String[] args) {
        try {
            CountDownLatch startSignal = new CountDownLatch(1);
            CountDownLatch doneSignal = new CountDownLatch(5);
            for (int i = 0; i < 5; ++i) {
                new Thread(new Worker(startSignal, doneSignal)).start();
            }
            // 發(fā)起開始命令
            System.out.println("工作線程開始執(zhí)行...");
            startSignal.countDown();
            // 執(zhí)行其他任務
            System.out.println("執(zhí)行其他流程...");
            // 等待作業(yè)線程結(jié)束
            doneSignal.await();
            System.out.println("工作線程結(jié)束...");
        } catch (InterruptedException ex) {
        }
    }
}
class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }
    @Override
    public void run() {
        try {
            startSignal.await();
            // 模擬作業(yè)
            Thread.sleep(100);
        } catch (InterruptedException ex) {
        } finally {
            System.out.println(Thread.currentThread().getName() + "執(zhí)行結(jié)束");
            doneSignal.countDown();
        }
    }
}

工作線程開始執(zhí)行...
執(zhí)行其他流程...
Thread-1執(zhí)行結(jié)束
Thread-3執(zhí)行結(jié)束
Thread-2執(zhí)行結(jié)束
Thread-4執(zhí)行結(jié)束
Thread-0執(zhí)行結(jié)束
工作線程結(jié)束...

另一種用法是將一個大任務切分成多個子任務,每個子任務在一個新線程執(zhí)行,所有子線程都執(zhí)行結(jié)束就意味著大任務的執(zhí)行結(jié)束。
這種方式非常適用于分治法的實現(xiàn),各個子任務之間沒有依賴關(guān)系。

public class CountDownLatch2 {
    public static void main(String[] args) {
        try {
            List<Integer> numberList = Arrays.asList(1,2,3,4,5);
            CountDownLatch doneSignal = new CountDownLatch(5);
            Executor e = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 5; ++i) {
                e.execute(new WorkerRunnable(doneSignal, i, numberList));
            }
            doneSignal.await();
            System.out.println("工作線程執(zhí)行結(jié)束");
        } catch (InterruptedException ex) {
        }
    }
}
class WorkerRunnable implements Runnable {
    private final CountDownLatch doneSignal;
    private final int i;
    private final List<Integer> numberList;
    WorkerRunnable(CountDownLatch doneSignal, int i, List<Integer> numberList) {
        this.doneSignal = doneSignal;
        this.i = i;
        this.numberList = numberList;
    }
    @Override
    public void run() {
        doWork(i);
        doneSignal.countDown();
    }

    void doWork(int i) {
        System.out.println(Thread.currentThread().getName() + "打印" + numberList.get(i));
    }
}

pool-1-thread-1打印1
pool-1-thread-5打印5
pool-1-thread-4打印4
pool-1-thread-3打印3
pool-1-thread-2打印2
工作線程執(zhí)行結(jié)束

CyclicBarrier用法

CyclicBarrier非常適用于有一批線程且相互之間需要等待的場景。CyclicBarrier從字面理解是指循環(huán)屏障,這里的屏障意思是說線程調(diào)用await方法導致阻塞等待。
CyclicBarrier有兩個構(gòu)造器方法:

  • CyclicBarrier(int parties):parties指定了在這個屏障上,需要有多少個線程調(diào)用await方法到達屏障。
  • CyclicBarrier(int parties, Runnable barrierAction):這個構(gòu)造方法比前一個多了
    Runnable類型的參數(shù),當最后一個線程到達屏障后就會執(zhí)行這個線程任務。通過這個線程任務可以在所有線程都到達時完成一些諸如狀態(tài)修改的功能。
class Solver {
    final int N;    //矩陣的行數(shù)
    final float[][] data;    //要處理的矩陣
    final CyclicBarrier barrier;    //循環(huán)屏障

    class Worker implements Runnable {
        int myRow;
        Worker(int row) { myRow = row; }
        public void run() {
            while (!done()) {
                processRow(myRow);    //處理指定一行數(shù)據(jù)
                try {
                    barrier.await();     //在屏障處等待直到
                } catch (InterruptedException ex) { 
                    return; 
                } catch (BrokenBarrierException ex) {
                    return; 
                }
            }
        }
    }

    public Solver(float[][] matrix) {
        data = matrix;
        N = matrix.length;
        //初始化CyclicBarrier
        barrier = new CyclicBarrier(N, new Runnable() {
                                           public void run() { 
                                             mergeRows(); //合并行
                                           }
                                       });
        for (int i = 0; i < N; ++i) 
            new Thread(new Worker(i)).start();
        waitUntilDone();
    }
}

Exchanger用法

public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger();
        Thread thread1 = new Thread(new Worker(exchanger, "thread1"));
        Thread thread2 = new Thread(new Worker(exchanger, "thread2"));
        thread1.setName("thread1");
        thread2.setName("thread2");
        thread1.start();
        thread2.start();
    }
}
class Worker implements Runnable {
    private String value;
    private final Exchanger exchanger;
    public Worker(Exchanger exchanger, String value) {
        this.exchanger = exchanger;
        this.value = value;
    }
    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + ": " + exchanger.exchange(value));
        } catch (InterruptedException ex) {
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容