CountDownLatch

CountDownLatch介紹

CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行。例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動框架服務(wù)的線程已經(jīng)啟動所有框架服務(wù)之后執(zhí)行。

CountDownLatch原理

CountDownLatch是通過一個計數(shù)器來實現(xiàn)的,計數(shù)器的初始化值為線程的數(shù)量。每當(dāng)一個線程完成了自己的任務(wù)后,計數(shù)器的值就相應(yīng)得減1。當(dāng)計數(shù)器到達0時,表示所有的線程都已完成任務(wù),然后在閉鎖上等待的線程就可以恢復(fù)執(zhí)行任務(wù)。

CountDownLatch原理示意圖

CountDownLatch的偽代碼

Main thread start
Create CountDownLatch for N threads
Create and start N threads
Main thead wait on latch
N threads completes there tasks are returns
Main thread resume execution

CountDownLatch.java中定義的構(gòu)造函數(shù)

//用等待的線程數(shù)量來進行初始化
public void CountDownLatch(int count){...}

計數(shù)器count是閉鎖需要等待的線程數(shù)量,只能被設(shè)置一次,且CountDownLatch沒有提供任何機制去重新設(shè)置計數(shù)器count。

與CountDownLatch的第一次交互是主線程等待其他線程。主線程必須在啟動其他線程后立即調(diào)用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務(wù)。

其他N個線程必須引用CountDownLatch閉鎖對象,因為它們需要通知CountDownLatch對象,它們各自完成了任務(wù);這種通知機制是通過CountDownLatch.countDown()方法來完成的;每調(diào)用一次,count的值就減1,因此當(dāng)N個線程都調(diào)用這個方法,count的值就等于0,然后主線程就可以通過await()方法,恢復(fù)執(zhí)行自己的任務(wù)。

在實時系統(tǒng)中的使用場景

  1. 實現(xiàn)最大的并行性:有時我們想同時啟動多個線程,實現(xiàn)最大程度的并行性。例如,我們想測試一個單例類。如果我們創(chuàng)建一個初始計數(shù)器為1的CountDownLatch,并讓其他所有線程都在這個鎖上等待,只需要調(diào)用一次countDown()方法就可以讓其他所有等待的線程同時恢復(fù)執(zhí)行。
  2. 開始執(zhí)行前等待N個線程完成各自任務(wù):例如應(yīng)用程序啟動類要確保在處理用戶請求前,所有N個外部系統(tǒng)都已經(jīng)啟動和運行了。
  3. 死鎖檢測:一個非常方便的使用場景是你用N個線程去訪問共享資源,在每個測試階段線程數(shù)量不同,并嘗試產(chǎn)生死鎖。

CountDownLatch使用例子

模擬一個應(yīng)用程序啟動類,開始就啟動N個線程,去檢查N個外部服務(wù)是否正常并通知閉鎖;啟動類一直在閉鎖上等待,一旦驗證和檢查了所有外部服務(wù),就恢復(fù)啟動類執(zhí)行。

BaseHealthChecker.java :這個類是實現(xiàn)了Runnable接口,負(fù)責(zé)所有特定的外部服務(wù)健康檢查的基類。

import java.util.concurrent.CountDownLatch;

public abstract class BaseHealthChecker implements Runnable {
    
    private CountDownLatch _latch;
    private String _serviceName;
    private boolean _serviceUp;
    
    public BaseHealthChecker(String serviceName, CountDownLatch latch)
    {
        super();
        this._latch = latch;
        this._serviceName = serviceName;
        this._serviceUp = false;
    }

    @Override
    public void run() {
        try {
            verifyService();
            _serviceUp = true;
        } catch (Throwable t) {
            t.printStackTrace(System.err);
            _serviceUp = false;
        } finally {
            if(_latch != null) {
                _latch.countDown();
            }
        }
    }

    public String getServiceName() {
        return _serviceName;
    }

    public boolean isServiceUp() {
        return _serviceUp;
    }
    
    public abstract void verifyService();
}

NetworkHealthChecker.java,DatabaseHealthChecker.java和CacheHealthChecker.java都繼承自BaseHealthChecker,引用CountDownLatch實例,除了服務(wù)名和休眠時間不同外,都實現(xiàn)各自的verifyService方法。

NetworkHealthChecker.java類

import java.util.concurrent.CountDownLatch;

public class NetworkHealthChecker extends BaseHealthChecker
{
    public NetworkHealthChecker (CountDownLatch latch)
    {
        super("Network Service", latch);
    }
    
    @Override
    public void verifyService() 
    {
        System.out.println("Checking " + this.getServiceName());
        try 
        {
            Thread.sleep(7000);
        } 
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        System.out.println(this.getServiceName() + " is UP");
    }
}

DatabaseHealthChecker.java類

import java.util.concurrent.CountDownLatch;

public class DatabaseHealthChecker extends BaseHealthChecker
{
    public DatabaseHealthChecker (CountDownLatch latch)
    {
        super("Database Service", latch);
    }
    
    @Override
    public void verifyService() 
    {
        System.out.println("Checking " + this.getServiceName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this.getServiceName() + " is UP");
    }
}

CacheHealthChecker.java類

import java.util.concurrent.CountDownLatch;

public class CacheHealthChecker extends BaseHealthChecker
{
    public CacheHealthChecker (CountDownLatch latch)
    {
        super("Cache Service", latch);
    }
    
    @Override
    public void verifyService() 
    {
        System.out.println("Checking " + this.getServiceName());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this.getServiceName() + " is UP");
    }
}

ApplicationStartupUtil.java:是一個主啟動類,它負(fù)責(zé)初始化閉鎖,然后等待所有服務(wù)都被檢查完成,再恢復(fù)執(zhí)行。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class ApplicationStartupUtil 
{
    private static List<BaseHealthChecker> _services;
    private static CountDownLatch _latch;
    
    private ApplicationStartupUtil()
    {
    }
    
    private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil();
    
    public static ApplicationStartupUtil getInstance()
    {
        return INSTANCE;
    }
    
    public static boolean checkExternalServices() throws Exception
    {
        _latch = new CountDownLatch(3);
        _services = new ArrayList<BaseHealthChecker>();
        _services.add(new NetworkHealthChecker(_latch));
        _services.add(new CacheHealthChecker(_latch));
        _services.add(new DatabaseHealthChecker(_latch));
        
        Executor executor = Executors.newFixedThreadPool(_services.size());
        
        for(final BaseHealthChecker v : _services) 
        {
            executor.execute(v);
        }
        
        _latch.await();
        
        for(final BaseHealthChecker v : _services) 
        {
            if( ! v.isServiceUp())
            {
                return false;
            }
        }
        return true;
    }
}

測試代碼檢測閉鎖功能:

public class Main {
    public static void main(String[] args) 
    {
        boolean result = false;
        try {
            result = ApplicationStartupUtil.checkExternalServices();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("External services validation completed !! Result was :: "+ result);
    }
}

執(zhí)行結(jié)果:

Checking Network Service
Checking Cache Service
Checking Database Service
Database Service is UP
Cache Service is UP
Network Service is UP
External services validation completed !! Result was :: true

工作中的使用(優(yōu)雅地完成初始化)

在移動應(yīng)用開發(fā)中(以Android為例),隨著功能的增多,應(yīng)用初始化工作開始增多,網(wǎng)絡(luò),賬號,推送服務(wù),預(yù)加載數(shù)據(jù)等依次登場,開發(fā)人員都會臨時在Application中找到現(xiàn)有初始化邏輯,將自己的代碼插在其中。隨著版本的迭代,新老員工的交替,幾乎沒人能對應(yīng)用的初始化過程完全了解,刪除一行初始化代碼甚至移動位置都可能造成嚴(yán)重的后果。

應(yīng)用初始化過程極其重要,它是應(yīng)用后續(xù)平穩(wěn)運行的前提和保證。開發(fā)初始化配置模塊(公司內(nèi)部開源不宜公開),更好地管理初始化邏輯,對初始化地工作進行分層,分優(yōu)先級,多線程地規(guī)劃,進而在大幅提升初始化效率,同時還有完整地日志監(jiān)控體系功能。有了它,規(guī)劃整個初始化工作將簡單而優(yōu)雅

參考

  1. 什么時候使用CountDownLatch
  2. Java concurrency – CountDownLatch Example
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容