JAVA并發(fā)工具

1、CountDownLatch

CountDownLatch允許一個(gè)或多個(gè)線程等待其他線程完成操作。

假如有這樣一個(gè)需求:我們需要解析一個(gè)Excel里多個(gè)sheet的數(shù)據(jù),此時(shí)可以考慮使用多線程,每個(gè)線程解析一個(gè)sheet里的數(shù)據(jù),等到所有的sheet都解析完之后,程序需要提示解析完成。在這個(gè)需求中,要實(shí)現(xiàn)主線程等待所有線程完成sheet的解析操作,最簡(jiǎn)單的做法是使用join()方法。

public class JoinCountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        Thread parser1 = new Thread(new Runnable() {
            @Override
            public void run() {
            }
        });
        Thread parser2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("parser2 finish");
            }
        });
        parser1.start();
        parser2.start();
        parser1.join();
        parser2.join();
        System.out.println("all parser finish");
    }
}

join用于讓當(dāng)前執(zhí)行線程等待join線程執(zhí)行結(jié)束。其實(shí)現(xiàn)原理是不停檢查join線程是否存活,如果join線程存活則讓當(dāng)前線程永遠(yuǎn)等待。

在JDK 1.5之后的并發(fā)包中提供的CountDownLatch也可以實(shí)現(xiàn)join的功能,并且比join的功能更多。

public class CountDownLatchTest {
    static CountDownLatch c = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(1);
                c.countDown();
                System.out.println(2);
                c.countDown();
            }
        }).start();
        c.await();
        System.out.println("3");
    }
}

CountDownLatch的構(gòu)造函數(shù)接收一個(gè)int類型的參數(shù)作為計(jì)數(shù)器,如果你想等待N個(gè)點(diǎn)完成,這里就傳入N。

當(dāng)我們調(diào)用CountDownLatch的countDown方法時(shí),N就會(huì)減1,CountDownLatch的await方法會(huì)阻塞當(dāng)前線程,直到N變成零。由于countDown方法可以用在任何地方,所以這里說的N個(gè)點(diǎn),可以是N個(gè)線程,也可以是1個(gè)線程里的N個(gè)執(zhí)行步驟。用在多個(gè)線程時(shí),只需要把這個(gè)CountDownLatch的引用傳遞到線程里即可。

如果有某個(gè)解析sheet的線程處理得比較慢,我們不可能讓主線程一直等待,所以可以使用另外一個(gè)帶指定時(shí)間的await方法——await(long time,TimeUnit unit),這個(gè)方法等待特定時(shí)間后,就會(huì)不再阻塞當(dāng)前線程。

2、同步屏障CyclicBarrier

CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行。

CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。

public class CyclicBarrierTest {
    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {
                }
                System.out.println(1);
            }
        }).start();
        try {
            c.await();
        } catch (Exception e) {
        }
        System.out.println(2);
    }
}

因?yàn)橹骶€程和子線程的調(diào)度是由CPU決定的,兩個(gè)線程都有可能先執(zhí)行,所以會(huì)產(chǎn)生兩種輸出。

如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),則主線程和子線程會(huì)永遠(yuǎn)等待,因?yàn)闆]有第三個(gè)線程執(zhí)行await方法,即沒有第三個(gè)線程到達(dá)屏障,所以之前到達(dá)屏障的兩個(gè)線程都不會(huì)繼續(xù)執(zhí)行。

CyclicBarrier和CountDownLatch的區(qū)別

1、CountDownLatch的計(jì)數(shù)器只能使用一次,而CyclicBarrier的計(jì)數(shù)器可以使用reset()方法重置。

所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場(chǎng)景。例如,如果計(jì)算發(fā)生錯(cuò)誤,可以重置計(jì)數(shù)器,并讓線程重新執(zhí)行一次。

2、CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的線程數(shù)量。isBroken()方法用來了解阻塞的線程是否被中斷。

3、控制并發(fā)線程數(shù)的Semaphore

Semaphore(信號(hào)量)是用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。

多年以來,我都覺得從字面上很難理解Semaphore所表達(dá)的含義,只能把它比作是控制流量的紅綠燈。比如××馬路要限制流量,只允許同時(shí)有一百輛車在這條路上行使,其他的都必須在路口等待,所以前一百輛車會(huì)看到綠燈,可以開進(jìn)這條馬路,后面的車會(huì)看到紅燈,不能駛?cè)搿痢榴R路,但是如果前一百輛中有5輛車已經(jīng)離開了××馬路,那么后面就允許有5輛車駛?cè)腭R路,這個(gè)例子里說的車就是線程,駛?cè)腭R路就表示線程在執(zhí)行,離開馬路就表示線程執(zhí)行完成,看見紅燈就表示線程被阻塞,不能執(zhí)行。

Semaphore可以用于做流量控制,特別是公用資源有限的應(yīng)用場(chǎng)景,比如數(shù)據(jù)庫連接。

public class SemaphoreTest {
    private static final int THREAD_COUNT = 30;
    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore s = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        s.acquire();
                        System.out.println("save data");
                        s.release();
                    } catch (InterruptedException e) {
                    }
                }
            });
        }
        threadPool.shutdown();
    }
}

在代碼中,雖然有30個(gè)線程在執(zhí)行,但是只允許10個(gè)并發(fā)執(zhí)行。Semaphore的構(gòu)造方法Semaphore(int permits)接受一個(gè)整型的數(shù)字,表示可用的許可證數(shù)量。Semaphore(10)表示允許10個(gè)線程獲取許可證,也就是最大并發(fā)數(shù)是10。Semaphore的用法也很簡(jiǎn)單,首先線程使用Semaphore的acquire()方法獲取一個(gè)許可證,使用完之后調(diào)用release()方法歸還許可證。還可以用tryAcquire()方法嘗試獲取許可證。

4、線程間交換數(shù)據(jù)的Exchanger

Exchanger(交換者)是一個(gè)用于線程間協(xié)作的工具類。Exchanger用于進(jìn)行線程間的數(shù)據(jù)交換。它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn),兩個(gè)線程可以交換彼此的數(shù)據(jù)。這兩個(gè)線程通過exchange方法交換數(shù)據(jù),如果第一個(gè)線程先執(zhí)行exchange()方法,它會(huì)一直等待第二個(gè)線程也執(zhí)行exchange方法,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí),這兩個(gè)線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對(duì)方。
public class ExchangerTest {
    private static final Exchanger<String> exgr = new Exchanger<String>();

    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "銀行流水A";// A錄入銀行流水?dāng)?shù)據(jù)
                    exgr.exchange(A);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "銀行流水B";// B錄入銀行流水?dāng)?shù)據(jù)
                    String A = exgr.exchange(B);
                    System.out.println("A和B數(shù)據(jù)是否一致:" + A.equals(B) + ",A錄入的是:" + A + ",B錄入是:" + B);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.shutdown();
    }

如果兩個(gè)線程有一個(gè)沒有執(zhí)行exchange()方法,則會(huì)一直等待,如果擔(dān)心有特殊情況發(fā)生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)設(shè)置最大等待時(shí)長(zhǎng)。

-------摘自《java并發(fā)編程的藝術(shù)》

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容