java并發(fā)工具類

等待多線程完成

主線程等待所有線程完成工作

實(shí)現(xiàn)

thread.join()方法


    private static void join() throws InterruptedException {
        int length = 100;

        Long t1 = System.nanoTime();
        List<Integer> results = new ArrayList<>();
        for (int index = 0; index < length; index++) {
            final int threadIndex = index;
            Thread thread = new Thread(() -> {
                System.out.println("start thread - " + threadIndex);
                Double random = -1.0;
                try {
                    Thread.sleep(1 * 1000);
                    random = (100 * Math.random());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("stop thread - " + threadIndex + " random = " + random.intValue());
                results.add(random.intValue());

            });
            thread.start();
            thread.join();
        }

        System.out.println("跑完了...");
        Long t2 = System.nanoTime();
        System.out.println(results);
        System.out.println("time = " + (t2 - t1));
    }

原理

join 用于讓當(dāng)前執(zhí)行線程等待join線程執(zhí)行結(jié)束。
其實(shí)現(xiàn)原理是不停檢查join線程是否存活,如果join線程存活則讓當(dāng)前線程永遠(yuǎn)等待。
其中,wait(0)表示永遠(yuǎn)等待下去

while(isAlive){
     wait(0);
}

直到j(luò)oin線程中止后,線程的this.notifyAll()方法會(huì)被調(diào)用,調(diào)用notifyAll()是在JVM實(shí)現(xiàn)的,所以在jdk里看不到

CountDownLatch

 private static void countDownLatch() throws InterruptedException {
        int length = 100;
        CountDownLatch latch = new CountDownLatch(length);

        Long t1 = System.nanoTime();
        List<Integer> results = new ArrayList<>();
        for (int index = 0; index < length; index++) {
            final int threadIndex = index;
            Thread thread = new Thread(() -> {
                System.out.println("start thread - " + threadIndex);
                Double random = -1.0;
                try {
                    Thread.sleep(1 * 1000);
                    random = (100 * Math.random());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("stop thread - " + threadIndex + " random = " + random.intValue());
                results.add(random.intValue());
                latch.countDown();

            });
            thread.start();
        }

        // 注意:CountDownLatch要加上這段才生效,原因見下面文字描述
        latch.await();

        System.out.println("跑完了...");
        Long t2 = System.nanoTime();
        System.out.println(results);
        System.out.println("time = " + (t2 - t1));
    }

原理

CountDownLatch的構(gòu)造函數(shù)接受一個(gè)int類型的參數(shù)作為計(jì)數(shù)器,如果你想等待N個(gè)節(jié)點(diǎn)(N個(gè)Thread 如上述示例 ),這里就傳入N

CountDownLatch.countDown()

調(diào)用CountDownLatch.countDown() ,N就會(huì)減1

CountDownLatch.wait()

CountDownLatch.wait()會(huì)阻塞當(dāng)前線程,直到N變成零

當(dāng)某個(gè)線程處理的特別慢時(shí),如果不需要主線程一直等待下去,可以使用另外一個(gè)重載的wait(long time,TimeUnit unit),這個(gè)方法等待指定時(shí)間后就不再阻塞線程了

join也有類似的方法

結(jié)論

join()與CountDownLatch都能實(shí)現(xiàn) 主線程等待所有線程完成工作 的功能,但明顯 CountDownLatch 效率更高

控制并發(fā)線程數(shù)

Semaphore

public class StudySemaphore {

    public static void main(String[] args) {
        int THREAD_COUNT = 30;
        ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

        Semaphore semaphore = new Semaphore(5);

        for (int index = 0; index < THREAD_COUNT; index++) {
            threadPool.execute(() -> {
                try {
                    semaphore.acquire();

                    Thread.sleep(1 * 1000);
                    System.out.println(String.format("%s do with something end - 當(dāng)前等待的線程數(shù):%s", Thread.currentThread().getName(), semaphore.getQueueLength()));
                    semaphore.release();

                } catch (InterruptedException e) {
                    //
                }

            });
        }

        threadPool.shutdown();
    }
}

原理

Semaphore 的構(gòu)造方法接受一個(gè)int類型的參數(shù),表示可用的許可證數(shù)量
new Semaphore(5) 表示允許5個(gè)線程獲取許可證,也就是最大并發(fā)數(shù)5

Semaphore.acquire()

Semaphore.acquire() 方法獲取一個(gè)許可證

Semaphore.release()

使用完后調(diào)用 Semaphore.release() 方法歸還許可證

Semaphore.tryAcquire()

也可以使用 tryAcquire() 嘗試獲取許可證

學(xué)習(xí)《java并發(fā)編程的藝術(shù)》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容