java多線程與高并發(fā)(四)Atomic類和線程同步新機(jī)制

1. Atomic類和線程同步新機(jī)制

這章我們來(lái)繼續(xù)將Amotic的問(wèn)題,然后將除了synchronized之外的鎖。事實(shí)上,無(wú)鎖化操作比synchronized效率更高。
下面寫(xiě)個(gè)程序分別說(shuō)明synchronize 和longAdder,Amotic

package com.learn.thread.three;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

public class TestAtomicOrSynOrLongAdder {
    static long count1 = 0L;
    private static final AtomicLong count2 = new AtomicLong(0L);
    private static LongAdder count3 = new LongAdder();
    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[1000];

        // AtomicLong
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() ->{
                for (int k = 0; k < 10000; k++) {
                    count2.incrementAndGet();
                }
            });
        }
        long start = System.currentTimeMillis();
        for (Thread t : threads) {
            t.start();
        }
        for (Thread t : threads) {
            t.join();
        }
        long end = System.currentTimeMillis();
        System.out.println("Amotice " + count2.get() + "time " + (end - start));

        // synchronized Long
        Object lock = new Object();
        for (int i = 0; i < threads.length; i++) {
            threads[i] =
                    new Thread(() -> {
                       for (int k = 0; k < 10000; k++) {
                           synchronized (lock) {
                               count1 ++;
                           }
                       }
                    });
        }
        start = System.currentTimeMillis();
        for (Thread t : threads) {
            t.start();
        }
        for (Thread t : threads) {
            t.join();
        }
        end = System.currentTimeMillis();
        System.out.println("Sync: " + count1 + "time " + (end - start));

        // LongAdder
        for (int i = 0; i < threads.length; i++) {
            threads[i] =
                    new Thread(() -> {
                        for (int k = 0; k < 10000; k++) {
                            count3.increment();
                        }
                    });
        }
        start = System.currentTimeMillis();
        for (Thread t : threads) {
            t.start();
        }
        for (Thread t : threads) {
            t.join();
        }
        end = System.currentTimeMillis();
        System.out.println("LongAdder: " + count1 + "time " + (end - start));

    }

}

至于以上各種“鎖”的效率,要分情況使用。先來(lái)看這三種的優(yōu)勢(shì)

Amotic和synchronized的對(duì)比下,synchronzied有可能要去操作系統(tǒng)申請(qǐng)重量級(jí)鎖,所以synchronized的效率是偏低的
LongAdder和Amotic對(duì)比,LongAdder的內(nèi)部做了一個(gè)分段鎖,類似于分段鎖的概念,在它的內(nèi)部的時(shí)候,會(huì)把一個(gè)值放到一個(gè)數(shù)組里,比如說(shuō)數(shù)組長(zhǎng)度為4,最開(kāi)始是0,1000個(gè)線程,250個(gè)線程就放在第一個(gè)數(shù)組元素里,以此類推,每一個(gè)都網(wǎng)上遞增算出來(lái)的結(jié)果加在一起。
先來(lái)復(fù)習(xí)一下之前將的synchronized的細(xì)節(jié)
這是有鎖分級(jí)的情況,在一定情況下,synchronized是pian是效率是好的,但是如果升級(jí)為重量級(jí)鎖,那么效率是低的。
執(zhí)行時(shí)間短,要同步的代碼量少,線程數(shù)少用CAS
執(zhí)行時(shí)間長(zhǎng),線程數(shù)多,用系統(tǒng)鎖

當(dāng)線程數(shù)為一萬(wàn)個(gè)的時(shí)候

2. 復(fù)習(xí)完了synchronized,下面看看基于CAS的一些新型鎖,先來(lái)講這些鎖的用法,再來(lái)說(shuō)這些鎖的原理

2.1. ReentrantLock

第一種就是之前講過(guò)的可重入鎖ReentrantLock,其實(shí)synchronized也是一種可重入鎖,之前講述線程synchronized概論的時(shí)候就說(shuō)過(guò)方法鎖里面調(diào)用方法鎖或者說(shuō)子類和父類synchronized(this)就是同一把鎖,是會(huì)使用到同一個(gè)鎖的!這就是可重入鎖。
ReentrantLock 是完全可以替代synchronized的,就是把原來(lái)寫(xiě)synchronized的地方換寫(xiě)成lock.lock(),加完鎖之后需要注意記得lock.unlock解鎖,因?yàn)閟ynchronized是自動(dòng)解鎖的,大括號(hào)執(zhí)行完就結(jié)束了,lock不行,lock必須手動(dòng)解鎖,建議手動(dòng)解鎖放在try…finally里面保證最好一定要解鎖,不然的話,上鎖之后中間執(zhí)行的過(guò)程就有問(wèn)題了,死在那里,別人就永遠(yuǎn)別想拿到鎖了?。?!

package com.learn.thread.three;

import com.learn.thread.second.T;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestReentranLock {

    Lock lock = new ReentrantLock();

    void m1() {
        try {
            lock.lock();
            for (int i = 0; i < 10; i++) {
                TimeUnit.SECONDS.sleep(2L);
                System.out.println(i);

            }
        }catch (Exception ex) {

        } finally {
            lock.unlock();
        }
    }
    void m2() {
        try {
            lock.lock();
            System.out.println("m2");
        }catch (Exception ex) {

        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        TestReentranLock testReentranLock = new TestReentranLock();

        new Thread(() -> {
            testReentranLock.m1();
        }).start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(() -> {
            testReentranLock.m2();
        }).start();
    }
}

ReentrantLock比synchronized強(qiáng)大的地方就是tryLock進(jìn)行嘗試鎖定,不管是否鎖定,方法都將繼續(xù)執(zhí)行,synchronized如果搞不定的會(huì),就會(huì)阻塞。但是用ReentrantLock你自己就可以決定你到底要不要wait

package com.learn.thread.three;

import java.sql.Time;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestReentranLockTryLock {
    private static Lock lock = new ReentrantLock();
    void m1() {
        try {
            lock.lock();
            for (int i = 0; i < 10; i++) {
                TimeUnit.SECONDS.sleep(1L);
                System.out.println(i);

            }
        }catch (Exception ex) {

        } finally {
            lock.unlock();
        }
    }

    /**
     * 使用tryLock進(jìn)行嘗試鎖定,不管鎖定與否方法都將繼續(xù)執(zhí)行
     * 可以使用返回值來(lái)判斷是否鎖定,true表示加鎖成功,false表示枷鎖失敗
     * 同樣也可以指定trylock的時(shí)間
     */
    void m2() {
        // 這里不加時(shí)間參數(shù),默認(rèn)是鎖一秒的
        boolean locked = lock.tryLock();
        System.out.println("m2 ... " + locked);
        if (locked) {
            System.out.println("我被鎖住了,現(xiàn)在釋放鎖進(jìn)入m2");
            lock.unlock();
        }
        try {
            // 這里只是加鎖的時(shí)間,過(guò)了5秒以后,鎖釋放
            System.out.println(locked);
            locked = lock.tryLock(5, TimeUnit.SECONDS);
            System.out.println("m2 ....." + locked);
        }catch (Exception ex) {
            System.out.printf(ex.toString());
        } finally {
            // 這里如果不去判斷,會(huì)異常
            if (locked) {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        TestReentranLockTryLock testReentranLockTryLock = new TestReentranLockTryLock();
        new Thread(() -> {
          testReentranLockTryLock.m1();
        }).start();
        try {
            TimeUnit.SECONDS.sleep(1);
        }catch (Exception ex) {

        }
        new Thread(() -> {
            testReentranLockTryLock.m2();
        }).start();
    }
}

當(dāng)然除了tryLock,還可以用lock.lockInterruptibly對(duì)interrupt()做出相應(yīng),可以被打斷的加鎖,比如說(shuō)我們可以調(diào)用一個(gè)t2.interrupt()打斷它的等待,讓它自己可以加鎖。如果有線程1上來(lái)加鎖,加鎖以后開(kāi)始沒(méi)完沒(méi)了的睡,如果線程1加了鎖,那么線程2就永遠(yuǎn)無(wú)法得到鎖了,這時(shí)候就可以使用interrupt強(qiáng)制打斷線程2的等待,通過(guò)異常的形式讓線程2去執(zhí)行。
lockInterruptibly() 方法的作用:如果當(dāng)前線程未被中斷則獲得鎖,如果當(dāng)前線程被中斷則出現(xiàn)異常。

    void m4() {
        try {
            // 強(qiáng)制打斷鎖

            lock.lockInterruptibly();
            System.out.println("線程2 打斷線程1,開(kāi)始執(zhí)行");
            try {
                TimeUnit.SECONDS.sleep(5);
            }catch (Exception ex) {

            }
            System.out.println("線程2 執(zhí)行完成");
        }catch (Exception ex) {
            System.out.println("線程2被中斷著,可以去完成其他事情");
        } finally {
            System.out.println(lock.tryLock());
            lock.unlock();
        }
    }


    public static void main(String[] args) {
        TestReentranLockTryLock testReentranLockTryLock = new TestReentranLockTryLock();
        new Thread(() -> {
            testReentranLockTryLock.m3();
        }).start();
        Thread t2 = new Thread(() -> {
            testReentranLockTryLock.m4();
        });
        t2.start();
        try {
            TimeUnit.SECONDS.sleep(1);
        }catch (Exception ex) {

        }
        // t2 如果
        t2.interrupt();
    }
    
    void m3() {
        try {
            lock.lock();
            System.out.println("線程1 鎖住,無(wú)休止的睡眠");
            try {
                TimeUnit.SECONDS.sleep(100000000);
            }catch (Exception ex) {

            }
        }catch (Exception ex) {

        } finally {
            lock.unlock();
        }
    }

ReentrantLock還可以指定公平鎖。公平鎖的意思就是當(dāng)我們new 一個(gè)ReentrantLock你可以傳一個(gè)參數(shù)為true,這個(gè)表示公平鎖,公平鎖的意思是誰(shuí)等在前面就讓誰(shuí)先執(zhí)行,而不是后來(lái)了就執(zhí)行。ReentrantLock默認(rèn)是非公平鎖

package com.learn.thread.three;

import com.learn.thread.second.T;

import java.util.concurrent.locks.ReentrantLock;

/**
 * 測(cè)試公平鎖
 */
public class TestReentrantLockTrue extends Thread{
    private static ReentrantLock lock = new ReentrantLock(true);


    @Override
    public void run() {
        for (int i = 0; i< 100; i++) {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + "獲得鎖");
            }finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        TestReentrantLockTrue testReentrantLockTrue = new TestReentrantLockTrue();

        Thread t1 = new Thread(testReentrantLockTrue);
        Thread t2 = new Thread(testReentrantLockTrue);
        t1.start();
        t2.start();
    }
}

2.2.回顧ReentrantLock

首先ReentrantLock是可以替代synchronized的,本身底層就是cas
tryLock:自己來(lái)控制,控制不住鎖怎么辦
lockInterruptibly: 用異常的形式,取消等待
公平鎖與非公平鎖的等待
以后聊AQS的時(shí)候,實(shí)際上它內(nèi)部用的是park和unpark,也不是全部cas,也是一個(gè)鎖升級(jí)的概念,只不過(guò)這個(gè)鎖升級(jí)做的比較隱匿,在你等待這個(gè)隊(duì)列的時(shí)候如果你拿不到還是會(huì)進(jìn)入一個(gè)阻塞狀態(tài),前面至少有一個(gè)cas狀態(tài),它不像原先就直接進(jìn)入阻塞狀態(tài)了。

2.3.CountDownLatch

CountDownLatch 叫倒數(shù),Latch是門(mén)栓的意思(倒數(shù)的一個(gè)門(mén)栓, 5 ,4,3,2,1 數(shù)到了,我這個(gè)門(mén)栓就打開(kāi)了)
看一下下面這個(gè)程序unsingcountDownLatch,new了100個(gè)線程,接下來(lái),又來(lái)個(gè)100個(gè)數(shù)量的CountDownLatch,這就是設(shè)置了門(mén)栓,記錄個(gè)數(shù)為1000,每一個(gè)線程結(jié)束的時(shí)候就讓latch.countDown(),然后啟動(dòng)所有的線程,在latch.await(),最后結(jié)束。
latch.countDown()是和latch.await()連用的,countDown是看住門(mén)栓,等每個(gè)線程執(zhí)行到await()的時(shí)候就會(huì)按一下CountDown是,讓其在原來(lái)的基礎(chǔ)上減1,一直到這個(gè)數(shù)字變成0的時(shí)候就會(huì)被打開(kāi),這就是它們的概念,是用來(lái)等著線程結(jié)束的
用join實(shí)際上不太好控制,必須要你線程結(jié)束了才能控制,但是如果是一個(gè)門(mén)栓的話我在線程里不聽(tīng)得CountDown,在一個(gè)線程里就可以控制這個(gè)門(mén)栓什么時(shí)候可以往前走,用join我只能是當(dāng)前線程結(jié)束了,你才能自動(dòng)往前走,用join可以,但是用countDown更加靈活

package com.learn.thread.three;

import com.learn.thread.first.T;

import java.util.concurrent.CountDownLatch;

/**
 *
 */
public class TestCountDownLatch {

    /**
     * 用CountDownLatch控制線程結(jié)束
     */
    private static void usingCountDownLacth() {
        Thread[] threads = new Thread[100];
        CountDownLatch countDownLatch = new CountDownLatch(threads.length);
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                int result = 0;
                for (int j = 0; j < 10000; j++) {
                    result += j;
                }
                System.out.println(result);
                // 看住門(mén)栓,每調(diào)用一次就減1
                countDownLatch.countDown();
                System.out.println("我看住門(mén)栓了");
            });
        }
        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }
        try {
            // 當(dāng)countDown減為0的時(shí)候,這里就會(huì)執(zhí)行了
            countDownLatch.await();
            System.out.println("我來(lái)減門(mén)栓的數(shù)量了");
        }catch (Exception ex) {

        }
        System.out.println("end Latch");
    }


    /**
     * 模擬join 不好控制線程
     */
    private static void usingJoin() {
        Thread[] threads = new Thread[100];

        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                int result = 0;
                for (int j = 0; j < 10000; j++) {
                    result += j;
                }
                System.out.println(result);
                });
        }
        for (int i = 0; i < threads.length; i++) {
            System.out.println("線程start");
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++) {
            try {
                System.out.println("執(zhí)行join");
                threads[i].join();
                // 這里模擬線程結(jié)束結(jié)束之后的動(dòng)作
                System.out.println("equals countDown.latch");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("end usingJoin");
    }

    public static void main(String[] args) {
         usingJoin();
        // usingCountDownLacth();
    }
}

2.4.CyclicBarrier

這個(gè)名字叫CyclicBarrier,也是一個(gè)同步工具,意思就是循環(huán)柵欄,就是什么時(shí)候人滿了就把柵欄推到,全部放出去,之后柵欄又重新起來(lái),再來(lái)人,滿了,放出去再繼續(xù)。

舉例
CyclicBarrier的概念比如說(shuō)一個(gè)復(fù)雜的操作,需要訪問(wèn)數(shù)據(jù)庫(kù),需要訪問(wèn)網(wǎng)絡(luò),需要方位文件。有一種方式是順序執(zhí)行,這事一種非常低的效率,還有一種方式就是并發(fā)的執(zhí)行,用不同的線程去操作,并且是這三個(gè)步驟有結(jié)果了我再進(jìn)行下一次操作,這時(shí)候就用到了CyclicBarrier

package com.learn.thread.three;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class TestCyclicBarrier {
    private CyclicBarrier cyclicBarrier;
    public static void main(String[] args) {
        testSout();
    }
    
    
    private static void testSout() {
        // 這里會(huì)起一個(gè)線程去執(zhí)行第二個(gè)參數(shù)的內(nèi)容可以為空
        CyclicBarrier cyclicBarrier = new CyclicBarrier(20, () -> {
            System.out.println("滿人了");
        });
        for (int i = 0; i < 400; i++) {
            new Thread(() -> {
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

}

2.5.Phaser

Phaser更像結(jié)合了CountDownLatch和CyclicBarrier,中文意思就是階段。
Phaser是按照不同的階段來(lái)對(duì)線程進(jìn)行執(zhí)行,就是它本身是維護(hù)著一個(gè)階段這樣的一個(gè)成員變量,比如說(shuō)第0個(gè)階段,第一個(gè)階段,每個(gè)階段不同的時(shí)候這個(gè)線程都可以往前走,有的線程走個(gè)某個(gè)階段就停了,有的線程一直會(huì)走到結(jié)束。
你的程序中如果說(shuō)用到分好幾個(gè)階段執(zhí)行,而且有個(gè)階段必須得幾個(gè)人共同參與的一種情形就可能會(huì)用到Phaser
下面我們模擬一個(gè)場(chǎng)景結(jié)婚,分成4個(gè)階段,分別是到達(dá)、吃飯、離開(kāi)、洞房。首先吃飯之前必須要所有客人到達(dá)婚禮線程,離開(kāi)也是要所用人吃完飯才離開(kāi),但是洞房是新郎和新娘的事情,客人們必須離開(kāi)。

package com.learn.thread.three.marry;

import java.util.Random;
import java.util.concurrent.Phaser;

/**
 * 人類,用來(lái)模擬每一個(gè)階段人的操作
 */
public class Person implements Runnable {

    public Person(String name) {
        this.name = name;
    }

    private String name;

    private void eat() {
        System.out.println(this.name + "吃飯");
        // 進(jìn)入柵欄階段
        TestPhaser.phaser.arriveAndAwaitAdvance();
    }

    private void arrive() {
        System.out.println(this.name + "到達(dá)婚禮現(xiàn)場(chǎng)");
        TestPhaser.phaser.arriveAndAwaitAdvance();

    }

    private void leave() {
        System.out.println(this.name + "離開(kāi)");
        TestPhaser.phaser.arriveAndAwaitAdvance();

    }

    private void hug() {
        if (name.equals("新郎") || "新娘".equals(name)) {
            System.out.println(this.name + "洞房了");
            TestPhaser.phaser.arriveAndAwaitAdvance();
        }else {
            // 其他線程都不參與,控制柵欄的個(gè)數(shù)
            TestPhaser.phaser.arriveAndDeregister();
            // 還可以往柵欄上加線程
            //TestPhaser.phaser.register();

        }
    }
    @Override
    public void run() {
        arrive();
        eat();
        leave();
        hug();
    }

    static class TestPhaser {
        static Random random = new Random();
        static MarriagePhaser phaser = new MarriagePhaser();

    }

    static class MarriagePhaser extends Phaser {

        /**
         * 線程抵達(dá)這個(gè)柵欄的時(shí)候,所有的線程都滿足了這個(gè)第一個(gè)柵欄的條件了這個(gè)方法
         * 會(huì)被自動(dòng)調(diào)用
         *
         * @param phase 第幾個(gè)階段,從0開(kāi)始
         * @param registeredParties 這個(gè)階段有多少線程參與
         * @return
         */
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch (phase) {
                case 0:
                    System.out.println(phase + "所有人都到齊了~" + registeredParties);
                    return false;
                case 1:
                    System.out.println(phase + "所有人都吃完飯了~" + registeredParties);
                    return false;
                case 2:
                    System.out.println(phase + "所有人都離開(kāi)了~" + registeredParties);
                    return false;
                case 3:
                    System.out.println(phase + "婚禮結(jié)束~ 新浪和新娘抱抱" + registeredParties);
                    return true;
                default:
                    return true;
            }
        }
    }

    public static void main(String[] args) {
        TestPhaser.phaser.bulkRegister(7);
        for (int i = 0; i < 5; i++) {
            new Thread(new Person("p" + i)).start();
        }
        new Thread(new Person("新郎")).start();
        new Thread(new Person("新娘")).start();
    }
}

分析上面的程序,我們可以看到phaser.arriveAndAwaitAdvance方法是在進(jìn)入柵欄前停駐,等線程的數(shù)量達(dá)到了就會(huì)自動(dòng)調(diào)用onAdvance方法,返回false說(shuō)明不是最后的階段,返回true就是說(shuō)到達(dá)了最后的階段。最后phaser.arriveAndDeregister方法是注銷線程,讓線程不再參與階段的執(zhí)行。

2.6.ReadWriteLock讀寫(xiě)鎖

讀寫(xiě)鎖的本質(zhì)就是共享鎖和排他鎖,讀鎖就是共享鎖,寫(xiě)鎖就是排他鎖,讀寫(xiě)有很多種情況,比如說(shuō)你的數(shù)據(jù)庫(kù)里某條數(shù)據(jù)你放在內(nèi)存里讀的特別多,但是改的時(shí)候并不多。
我們先來(lái)自己定義一套讀寫(xiě)鎖
假設(shè)有兩個(gè)方法,一個(gè)read方法,一個(gè)write方法,read的時(shí)候我需要往里頭傳一把鎖,這個(gè)鎖我們自己定,可以是排他鎖,也可以讀鎖或者寫(xiě)鎖,write的時(shí)候同樣需要傳這把鎖,同時(shí)你傳一個(gè)新值,在這里值里面?zhèn)饕粋€(gè)內(nèi)容。我們模擬這個(gè)操作,讀的是一個(gè)Int類型的值,讀的時(shí)候上鎖,設(shè)置一秒鐘,完了之后read over 最后unlock,然后寫(xiě)鎖,鎖定之后睡1000秒,然后把新的值給value,write over之后解鎖。
我們可以用之前的ReentrantLock進(jìn)行加鎖,分析一下這種情況,第一種方式就是直接new ReentrantLock傳進(jìn)去,主程序定義了一個(gè)Runnable對(duì)象,第一個(gè)是調(diào)用read方法,第二個(gè)是調(diào)用write方法同時(shí)往里邊扔一個(gè)隨機(jī)值,然后啟18個(gè)讀線程,啟2個(gè)寫(xiě)線程,這個(gè)兩個(gè)我要執(zhí)行完的話,因?yàn)槭怯昧薘eentrantLock加鎖,鎖的一秒鐘內(nèi)不沒(méi)有任何線程可以拿到鎖,每一個(gè)線程執(zhí)行完都要1秒鐘,那么20個(gè)線程就需要20秒。
我們完全可以按功能加鎖
上述無(wú)非兩種功能,讀和寫(xiě),那么能不能讀的時(shí)候,所有讀操作都可以共享這把鎖,寫(xiě)的時(shí)候不讓讀呢?ReentrantReadWirteLock是ReentrantLock的一種實(shí)現(xiàn),可以實(shí)現(xiàn)上述的思想。它能分出兩把鎖,一把readLock,一把writeLock。這兩把鎖在我讀的時(shí)候扔進(jìn)去,因此,18個(gè)線程讀是可以在一秒鐘完成工作的,所以讀寫(xiě)鎖效率會(huì)大大提高
下面我們看看兩種方法的效率

package com.learn.thread.three.ReentrantReadWriteLock;

import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class TestReentrantReadWriteLock {

    static Lock lock = new ReentrantLock();
    private static int value = 10;

    static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    static Lock readLock = readWriteLock.readLock();
    static Lock wirteLock = readWriteLock.writeLock();

    /**
     * 普通方式加鎖
     *
     * @param lock 鎖
     */
    public static void read(Lock lock) {
        try {
            lock.lock();
            Thread.sleep(1000);
            System.out.println("取值" + value);
            System.out.println("read over");
        } catch (Exception exception) {

        } finally {
            lock.unlock();
        }
    }

    /**
     * 寫(xiě)鎖
     *
     * @param lock 鎖
     * @param v 新值
     */
    public static void write(Lock lock, int v) {
        try {
            lock.lock();
            Thread.sleep(1);
            value = v;
            System.out.println("寫(xiě)了" + value);
        } catch (Exception ex) {

        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        testNoReadAndWriteLock();
        //testReadAndWriteLock();
    }

    static void testNoReadAndWriteLock() {
        Runnable runnable = () -> read(lock);
        Runnable write = () -> write(lock, new Random().nextInt());
        for (int i = 0; i < 18; i++) {
            new Thread(runnable).start();
        }
        for (int i = 0; i < 2; i++) {
            new Thread(write).start();
        }
    }

    static void testReadAndWriteLock() {
        Runnable runnable = () -> read(readLock);
        Runnable write = () -> write(wirteLock, new Random().nextInt());
        for (int i = 0; i < 18; i++) {
            new Thread(runnable).start();
        }
        for (int i = 0; i < 2; i++) {
            new Thread(write).start();
        }

    }
}

第一種效率明顯上比第二種慢多了

以后還寫(xiě)不寫(xiě)synchronized?分布式鎖怎么實(shí)現(xiàn)的?
以后一般都不會(huì)用這些新鎖,多數(shù)用到synchronized,只有特別特別追求效率的時(shí)候才用到這些新的鎖,現(xiàn)在的分布式鎖很多,主要有redis和ZooKeeper都可以實(shí)現(xiàn)分布式鎖,數(shù)據(jù)庫(kù)也可以實(shí)現(xiàn),但是數(shù)據(jù)庫(kù)實(shí)現(xiàn)效率就低了。
給大家講一個(gè)簡(jiǎn)單的例子,就說(shuō)秒殺這個(gè)事情,在開(kāi)始秒殺之前它會(huì)從數(shù)據(jù)庫(kù)讀取某一個(gè)數(shù)據(jù),比如電視機(jī)500臺(tái),只能最多銷售500臺(tái),完成這件事情是前面的線程訪問(wèn)同一個(gè)數(shù),最開(kāi)始是0一直漲到500就結(jié)束,需要加鎖,從0遞增。如果是單機(jī)的,LongAdder和AtomicIntegr就可以搞定。如果是分布式的,對(duì)一個(gè)數(shù)進(jìn)行上鎖,redis是單線程的,所以扔在一臺(tái)機(jī)器上就ok。

2.7.Semaphore

詞面意思就是信號(hào)燈,可以往里邊傳一個(gè)數(shù),permits是允許的數(shù)量,你可以想著有幾個(gè)信號(hào)燈,燈閃爍著數(shù)字表示到底允許幾個(gè)來(lái)參考我這個(gè)信號(hào)燈。
s.acquire()這個(gè)方法叫做阻塞方法,阻塞方法的意思說(shuō)我大概acquire不到的話我就停在這里。acquire的意思就是得到,如果我Semaphore s = new Semaphore(1)寫(xiě)的是1,我取一下,acquire一下他就變成0,當(dāng)變成0之后,別人是acquired不到的,然后繼續(xù)執(zhí)行,線程結(jié)束之后注意要s.release(),執(zhí)行完該執(zhí)行的時(shí)候就把他release掉,release又把0變回去1,還原化。
Semaphore的含義也是限流,比如說(shuō)你在買(mǎi)票,Semaphore寫(xiě)5,就是說(shuō)只能5個(gè)人同時(shí)買(mǎi)票。acquire的意思叫獲取這把鎖,線程如果想繼續(xù)往下執(zhí)行,必須得從Semaphore里獲取一個(gè)許可,他一共有5個(gè)許可用到了0你就得給我等著。
下面舉例一個(gè)場(chǎng)景
例如,有一個(gè)八條車(chē)道的機(jī)動(dòng)車(chē)道,這里只有兩個(gè)收費(fèi)站,到這里,誰(shuí)acquire得到其中某一個(gè)誰(shuí)執(zhí)行。
默認(rèn)Semaphore是非公平的,new Semaphore(2, true)第二個(gè)值傳true才是設(shè)置公平,公平這個(gè)事情是有一堆隊(duì)列在哪兒等,大家伙過(guò)來(lái)排隊(duì)。用車(chē)道和收費(fèi)站來(lái)舉例子,就是我們有四輛車(chē)都在等著進(jìn)一個(gè)車(chē)道,當(dāng)后面再來(lái)一輛車(chē)的時(shí)候,它不會(huì)抄到前面去,這才叫公平,所以說(shuō)內(nèi)部是有隊(duì)列的,不僅內(nèi)部是有隊(duì)列的,本章所講的ReentrantLock,CountDownLatch,CyclicBarrier,Phaser,ReadWriteLock,Semaphore還有后邊講到Exchanger都是用同一個(gè)隊(duì)列,同一個(gè)類實(shí)現(xiàn)的,這個(gè)類叫做AQS。

package com.learn.thread.three;

import java.util.concurrent.Semaphore;

public class TestSemaphore {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3, true);
        new Thread(() -> {
            try {
                // 阻塞方法,如果線程acquire不到,就停在這里,等別的線程釋放
                semaphore.acquire();
                System.out.println("t1 running");
                Thread.sleep(1000);
                System.out.println("t1 ending");
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                semaphore.release();
            }
        }).start();
        new Thread(() -> {
            try {
                semaphore.acquire();
                System.out.println("t2 running");
                Thread.sleep(1000);
                System.out.println("t2 ending");
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                semaphore.release();
            }
        }).start();
    }
}

2.8.Exchanger

這個(gè)Exchanger叫做交換器,是兩個(gè)線程互相交換數(shù)據(jù)用的。比如說(shuō)第一個(gè)線程有一個(gè)成員變量s,然后exchanger.exchange(s),第二個(gè)也是這樣,t1線程名字叫t1,第二個(gè)線程名字叫t2,到最后,打印出來(lái)你會(huì)發(fā)現(xiàn)他們兩的數(shù)據(jù)交換了。線程間通信的方式非常多,這只是其中的一種,就是線程之間交換數(shù)據(jù)用的。
Exchanger 你可以想象成一個(gè)容器,這個(gè)容器有兩個(gè)值,兩個(gè)線程,兩個(gè)格的位置,第一個(gè)線程執(zhí)行到exchanger.exchange的時(shí)候,阻塞。但是要注意我這個(gè)exchange方法的時(shí)候是往里面扔了一個(gè)值,你可以認(rèn)為把t1扔到第一個(gè)格子了,然后第二個(gè)線程開(kāi)始執(zhí)行,也執(zhí)行到exchange方法了,把t2扔到第二個(gè)格子里,接下來(lái)兩個(gè)線程交換了一下,t1扔給t2,t2扔給了t1,兩個(gè)線程繼續(xù)往前跑。Exchanger只能是兩個(gè)線程之間,交換一個(gè)東西只能兩兩進(jìn)行
下面舉一個(gè)游戲中兩個(gè)人狀態(tài)交換

package com.learn.thread.three;

import java.util.concurrent.Exchanger;

public class TestExchanger {
    static Exchanger<String> exchanger = new Exchanger<>();

    public static void main(String[] args) {
        new Thread(() -> {
            String s = "t1";
            try {
                s = exchanger.exchange(s);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);
        },"線程1").start();

        new Thread(() -> {
            String s = "t2";
            try {
                s = exchanger.exchange(s);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);
        },"線程2").start();
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容