Java中的并發(fā)包下常見類

前言

最近在看并發(fā)編程藝術這本書,對看書的一些筆記及個人工作中的總結。

Callable和Future使用

Future模式非常適合在處理很耗時很長的業(yè)務邏輯時進行使用,可以有效的減小系統(tǒng)的響應時間,提高系統(tǒng)的吞吐量。

//get方法的作用之前也講過了,阻塞直到異步執(zhí)行結果返回才不阻塞,繼續(xù)往下執(zhí)行
public class TestFuture implements Callable<String> {
    private String para;

    public TestFuture(String para){
        this.para = para;
    }

    /**
     * 這里是真實的業(yè)務邏輯,其執(zhí)行可能很慢
     */
    @Override
    public String call() throws Exception {
        //模擬執(zhí)行耗時,模擬的是db操作,para就是查詢條件
        Thread.sleep(5000);
        String result = this.para + "處理完成";
        System.out.println("執(zhí)行成功了?。。。。?);
        return result;
    }

    //主控制函數(shù)
    public static void main(String[] args) throws Exception {
        String queryStr = "query";
        //構造FutureTask,并且傳入需要真正進行業(yè)務邏輯處理的類,該類一定是實現(xiàn)了Callable接口的類
        FutureTask<String> future = new FutureTask<>(new TestFuture(queryStr));

        FutureTask<String> future2 = new FutureTask<>(new TestFuture(queryStr));
        //創(chuàng)建一個固定線程的線程池且線程數(shù)為2,
        ExecutorService executor = Executors.newFixedThreadPool(2);
        //這里提交任務future,則開啟線程執(zhí)行RealData的call()方法執(zhí)行
        //submit和execute的區(qū)別: 第一點是submit可以傳入實現(xiàn)Callable接口的實例對象, 第二點是submit方法有返回值
        Future f1 = executor.submit(future);        //單獨啟動一個線程去執(zhí)行的
        Future f2 = executor.submit(future2);
        //f1.get() == null的時候說明已經(jīng)取到了結果,執(zhí)行到了最后
        System.out.println("請求完畢");

        try {
            //這里可以做額外的數(shù)據(jù)操作,也就是主程序執(zhí)行其他業(yè)務邏輯
            System.out.println("處理實際的業(yè)務邏輯...");
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //調(diào)用獲取數(shù)據(jù)方法,如果call()方法沒有執(zhí)行完成,則依然會進行等
        //System.out.println("數(shù)據(jù):" + future.get());
        //System.out.println("數(shù)據(jù):" + future2.get());

        executor.shutdown();
    }

}

CountDownLatch

//join用于讓當前執(zhí)行線程等待join線程執(zhí)行結束。其實現(xiàn)原理是不停檢查,join線程是否存活,如果join線程存活則讓當前線程永遠等待。
public class JoinCountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        Thread parser1 = new Thread(() -> {

        });

        Thread parser2 = new Thread(() -> System.out.println("parser2 finish"));

        parser1.start();
        parser2.start();
        parser1.join();
        parser2.join();
        System.out.println("all parser finish");
    }

}

join用于讓當前執(zhí)行線程等待join線程執(zhí)行結束。其實現(xiàn)原理是不停檢查,join線程是否存活,如果join線程存活則讓當前線程永遠等待。直到join線程中止后,線程的this.notifyAll()方法會被調(diào)用

使用CountDownLatch也可以實現(xiàn)

public class CountDownLatchTest {

    //參數(shù)是2表示對象執(zhí)行2次countDown方法才能釋放鎖
    static CountDownLatch c = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            System.out.println(1);
            c.countDown();  
            System.out.println(2);
            c.countDown();
        }).start();

        c.await(); //線程等待
        System.out.println("3");
    }

}

同步屏障CyclicBarrier

CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運行。

public class CyclicBarrierTest {

    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                c.await();
            } catch (Exception e) {

            }
            System.out.println(1);
        }).start();

        try {
            c.await();
        } catch (Exception e) {

        }
        System.out.println(2);
    }
}

CyclicBarrier還提供一個更高級的構造函數(shù)CyclicBarrier(int parties,Runnable barrier-Action),用于在線程到達屏障時,優(yōu)先執(zhí)行barrierAction,方便處理更復雜的業(yè)務場景

//先執(zhí)行線程A
public class CyclicBarrierTest2 {

    static CyclicBarrier c = new CyclicBarrier(2, new A());

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                c.await();
            } catch (Exception e) {

            }
            System.out.println(1);
        }).start();

        try {
            c.await();
        } catch (Exception e) {

        }
        System.out.println(2);
    }

    static class A implements Runnable {

        @Override
        public void run() {
            System.out.println(3);
        }

    }

}

執(zhí)行結果:

3
1
2

** CyclicBarrier的應用場景 **
CyclicBarrier可以用于多線程計算數(shù)據(jù),最后合并計算結果的場景。例如,用一個Excel保存了用戶所有銀行流水,每個Sheet保存一個賬戶近一年的每筆銀行流水,現(xiàn)在需要統(tǒng)計用戶的日均銀行流水,先用多線程處理每個sheet里的銀行流水,都執(zhí)行完之后,得到每個sheet的日均銀行流水,最后,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水,

CyclicBarrier和CountDownLatch的區(qū)別
CountDownLatch的計數(shù)器只能使用一次,而CyclicBarrier的計數(shù)器可以使用reset()方法重置。所以CyclicBarrier能處理更為復雜的業(yè)務場景。例如,如果計算發(fā)生錯誤,可以重置計數(shù)器,并讓線程重新執(zhí)行一次。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的線程數(shù)量。isBroken()方法用來了解阻塞的線程是否被中斷。

public class CyclicBarrierTest3 {

    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        Thread thread = new Thread(() -> {
            try {
                c.await();
            } catch (Exception e) {
            }
        });
        thread.start();
        thread.interrupt();
        try {
            c.await();
        } catch (Exception e) {
            System.out.println(c.isBroken());
        }
    }
}

Semaphore

Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源。
Semaphore可以控制系統(tǒng)的流量,拿到信號量的線程可以進入,否則就等待。通過acquire()和release()獲取和釋放訪問許可。

public class TestSemaphore {
    public static void main(String[] args) {
        // 線程池
        ExecutorService exec = Executors.newCachedThreadPool();
        // 只能5個線程同時訪問
        final Semaphore semp = new Semaphore(5);
        // 模擬20個客戶端訪問
        for (int index = 0; index < 20; index++) {
            final int NO = index;
            Runnable run = () -> {
                try {
                    // 獲取許可
                    semp.acquire();
                    System.out.println("Accessing: " + NO);
                    //模擬實際業(yè)務邏輯
                    Thread.sleep((long) (Math.random() * 10000));
                    // 訪問完后,釋放
                    semp.release();
                } catch (InterruptedException e) {
                }
            };
            exec.execute(run);
        }

        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //System.out.println(semp.getQueueLength());
        // 退出線程池
        exec.shutdown();
    }
}

Semaphore可以用于做流量控制,特別是公用資源有限的應用場景,比如數(shù)據(jù)庫連接。假如有一個需求,要讀取幾萬個文件的數(shù)據(jù),因為都是IO密集型任務,我們可以啟動幾十個線程并發(fā)地讀取,但是如果讀到內(nèi)存后,還需要存儲到數(shù)據(jù)庫中,而數(shù)據(jù)庫的連接數(shù)只有10個,這時我們必須控制只有10個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù),否則會報錯無法獲取數(shù)據(jù)庫連接。這個時候,就可以使用Semaphore來做流量控制

//線程池中有30個線程,但是只有10個線程可以獲得許可
public class SemaphoreTest {

    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

    private static Semaphore s = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        s.acquire(); //獲取許可證
                        System.out.println("save data");
                        Thread.sleep(1000);
                        s.release(); //釋放許可證
                    } catch (InterruptedException e) {
                    }
                }
            });
        }

        threadPool.shutdown();
    }
}

其他方法
Semaphore:還提供一些其他方法,具體如下。
intavailablePermits():返回此信號量中當前可用的許可證數(shù)。
intgetQueueLength():返回正在等待獲取許可證的線程數(shù)。
booleanhasQueuedThreads():是否有線程正在等待獲取許可證。
void reducePermits(int reduction):減少reduction個許可證,是個protected方法。
Collection getQueuedThreads():返回所有等待獲取許可證的線程集合,是個protected方法。

線程間交換數(shù)據(jù)的Exchanger

Exchanger(交換者)是一個用于線程間協(xié)作的工具類。Exchanger用于進行線程間的數(shù)據(jù)交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)。這兩個線程通過exchange方法交換數(shù)據(jù),如果第一個線程先執(zhí)行exchange()方法,它會一直等待第二個線程也執(zhí)行exchange方法,當兩個線程都到達同步點時,這兩個線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對方。

public class ExchangerTest {

    private static final Exchanger<String> exgr = new Exchanger<String>();

    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "銀行流水A";// A錄入銀行流水數(shù)據(jù)
                    Thread.sleep(4000);
                    exgr.exchange(A);
                } catch (InterruptedException e) {
                }
            }
        });

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "銀行流水B";// B錄入銀行流水數(shù)據(jù)
                    String A = exgr.exchange("B"); //得到A值
                    System.out.println("A和B數(shù)據(jù)是否一致:" + A.equals(B) + ",A錄入的是:" + A + ",B錄入是:" + B);
                } catch (InterruptedException e) {
                }
            }
        });

        threadPool.shutdown();

    }
}

如果兩個線程有一個沒有執(zhí)行exchange()方法,則會一直等待,如果擔心有特殊情況發(fā)生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)設置最大等待時長。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容