顯示鎖(Lock)和阻塞對(duì)列(BlockingQueue)

synchronized是不錯(cuò),但它并不完美。它有一些功能性的限制:比如它無(wú)法中斷一個(gè)正在等候獲得鎖的線程;

顯示鎖LocK

java.util.concurrent.lock 中的Lock 框架是鎖定的一個(gè)抽象,它允許把鎖定的實(shí)現(xiàn)作為 Java 類,而不是作為語(yǔ)言的特性來(lái)實(shí)現(xiàn)。這就為L(zhǎng)ock 的多種實(shí)現(xiàn)留下了空間,各種實(shí)現(xiàn)可能有不同的調(diào)度算法、性能特性或者鎖定語(yǔ)義。

ReentrantLock

ReentrantLock 類實(shí)現(xiàn)了Lock ,它擁有與synchronized 相同的并發(fā)性和內(nèi)存語(yǔ)義,但是添加了類似鎖投票、定時(shí)鎖等候和可中斷鎖等候的一些特性。此外,它還提供了在激烈爭(zhēng)用情況下更佳的性能。(換句話說(shuō),當(dāng)許多線程都想訪問(wèn)共享資源時(shí),JVM 可以花更少的時(shí)候來(lái)調(diào)度線程,把更多時(shí)間用在執(zhí)行線程上。)

class LockStudy {            
    private Lock lock = new ReentrantLock();// 鎖對(duì)象            
    public void output(String name) {                              
        lock.lock();      // 得到鎖                
        try {                   
            //doSomething            
        } finally {                  
            lock.unlock();// 釋放鎖                
        }            
    }        
}   

需要注意的是,用synchronized修飾的方法或者語(yǔ)句塊在代碼執(zhí)行完之后鎖自動(dòng)釋放,而是用Lock需要我們手動(dòng)釋放鎖,所以為了保證鎖最終被釋放(發(fā)生異常情況),要把互斥區(qū)放在try內(nèi),釋放鎖放在finally內(nèi)。

Condition

ReentrantLock里有個(gè)函數(shù)newCondition(),該函數(shù)得到一個(gè)鎖上的"條件",用于實(shí)現(xiàn)線程間的通信,條件變量很大一個(gè)程度上是為了解決Object.wait/notify/notifyAll難以使用的問(wèn)題。
Condition擁有await(),signal(),signalAll(),await對(duì)應(yīng)于Object.wait,signal對(duì)應(yīng)于Object.notify,signalAll對(duì)應(yīng)于Object.notifyAll。特別說(shuō)明的是Condition的接口改變名稱就是為了避免與Object中的wait/notify/notifyAll的語(yǔ)義和使用上混淆,因?yàn)镃ondition同樣有wait/notify/notifyAll方法()因?yàn)槿魏晤惗紦碛羞@些方法。
每一個(gè)Lock可以有任意數(shù)據(jù)的Condition對(duì)象,Condition是與Lock綁定的,所以就有Lock的公平性特性:如果是公平鎖,線程為按照FIFO的順序從Condition.await中釋放,如果是非公平鎖,那么后續(xù)的鎖競(jìng)爭(zhēng)就不保證FIFO順序了。下面是一個(gè)用Lock和Condition實(shí)現(xiàn)的一個(gè)生產(chǎn)者消費(fèi)者的模式:

import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;
public class ProductQueue<T> {  

    private final T[] items;  

    private final Lock lock = new ReentrantLock();  

    private Condition notFull = lock.newCondition();  

    private Condition notEmpty = lock.newCondition();  


    private int head, tail, count;  

    public ProductQueue(int maxSize) {  
        items = (T[]) new Object[maxSize];  
    }  

    public ProductQueue() {  
        this(10);  
    }  

    public void put(T t) throws InterruptedException {  
        lock.lock();  
        try {  
            while (count == getCapacity()) {  
                notFull.await();  
            }  
            items[tail] = t;  
            if (++tail == getCapacity()) {  
                tail = 0;  
            }  
            ++count;  
            notEmpty.signalAll();  
        } finally {  
            lock.unlock();  
        }  
    }  

    public T take() throws InterruptedException {  
        lock.lock();  
        try {  
            while (count == 0) {  
                notEmpty.await();  
            }  
            T ret = items[head];  
            items[head] = null;//GC  
      
            if (++head == getCapacity()) {  
                head = 0;  
            }  
            --count;  
            notFull.signalAll();  
            return ret;  
        } finally {  
            lock.unlock();  
        }  
    }  

    public int getCapacity() {  
        return items.length;  
    }  

    public int size() {  
        lock.lock();  
        try {  
            return count;  
        } finally {  
            lock.unlock();  
        }  
    }
}  

這就是多個(gè)Condition的強(qiáng)大之處,假設(shè)緩存隊(duì)列中已經(jīng)存滿,那么阻塞的肯定是寫(xiě)線程,喚醒的肯定是讀線程,相反,阻塞的肯定是讀線程,喚醒的肯定是寫(xiě)線程,那么假設(shè)只有一個(gè)Condition會(huì)有什么效果呢,緩存隊(duì)列中已經(jīng)存滿,這個(gè)Lock不知道喚醒的是讀線程還是寫(xiě)線程了,如果喚醒的是讀線程,皆大歡喜,如果喚醒的是寫(xiě)線程,那么線程剛被喚醒,又被阻塞了,這時(shí)又去喚醒,這樣就浪費(fèi)了很多時(shí)間。

讀寫(xiě)鎖ReadWriteLock

讀-寫(xiě)鎖定允許對(duì)共享數(shù)據(jù)進(jìn)行更高級(jí)別的并發(fā)訪問(wèn)。雖然一次只有一個(gè)線程(writer 線程)可以修改共享數(shù)據(jù),但在許多情況下,任何數(shù)量的線程可以同時(shí)讀取共享數(shù)據(jù)(reader 線程)。

阻塞隊(duì)列——BlockingQueue

阻塞隊(duì)列(BlockingQueue)是java.util.concurrent下的主要用來(lái)控制線程同步的工具。如果BlockQueue是空的,從BlockingQueue取東西的操作將會(huì)被阻斷進(jìn)入等待狀態(tài),直到BlockingQueue進(jìn)了東西才會(huì)被喚醒。同樣,如果BlockingQueue是滿的,任何試圖往里存東西的操作也會(huì)被阻斷進(jìn)入等待狀態(tài),直到BlockingQueue里有空間才會(huì)被喚醒繼續(xù)操作。
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。BlockingQueue是具體實(shí)現(xiàn)的接口。具體的實(shí)現(xiàn)類有LinkedBlockingQueue,ArrayBlockingQueued等。一般其內(nèi)部的都是通過(guò)Lock和Condition來(lái)實(shí)現(xiàn)阻塞和喚醒。下面是一個(gè)通過(guò)BlockingQueue實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者的例子。

生產(chǎn)者:

public class Producer implements Runnable {  
    BlockingQueue<String> queue;  

    public Producer(BlockingQueue<String> queue) {  
        this.queue = queue;  
    }  

    @Override  
    public void run() {  
        try {  
            String temp = "A Product, 生產(chǎn)線程:"  
                + Thread.currentThread().getName(    );  
            System.out.println("I have made a product:"  
                + Thread.currentThread().getName());  
            queue.put(temp);//如果隊(duì)列是滿的話,會(huì)阻塞當(dāng)前線程  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  

消費(fèi)者:

public class Consumer implements Runnable{  
    BlockingQueue<String> queue;  
  
    public Consumer(BlockingQueue<String> queue){  
        this.queue = queue;  
    }  
  
    @Override  
    public void run() {  
        try {  
            String temp = queue.take();//如果隊(duì)列為空,會(huì)阻塞當(dāng)前線程  
            System.out.println(temp);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  

測(cè)試類:

public class ProducerConsumeTest{  

    public static void main(String[] args) {  
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);  
        // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();  
        //不設(shè)置的話,LinkedBlockingQueue默認(rèn)大小為Integer.MAX_VALUE  
      
        // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);  
        Consumer consumer = new Consumer(queue);  
        Producer producer = new Producer(queue);  
        for (int i = 0; i < 5; i++) {  
            new Thread(producer, "Producer" + (i + 1)).start();  
            new Thread(consumer, "Consumer" + (i + 1)).start();  
        }  
    }  
}

參考:Java線程(九):Condition-線程通信更高效的方式

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容