Java中的隊列同步器AQS

一、AQS概念

1、隊列同步器是用來構建鎖或者其他同步組件的基礎框架,使用一個int型變量代表同步狀態(tài),通過內(nèi)置的隊列來完成線程的排隊工作。

2、下面是JDK8文檔中對于AQS的部分介紹

  public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable

  提供一個框架,用于實現(xiàn)依賴先進先出(FIFO)等待隊列的阻塞鎖和相關同步器(信號量,事件等)。 該類被設計為大多數(shù)類型的同步器的有用依據(jù),這些同步器依賴于單個原子int值來表示狀

態(tài)。子類必須定義改變此狀態(tài)的protected方法,以及根據(jù)該對象被獲取或釋放來定義該狀態(tài)的含義。給定這些,這個類中的其他方法執(zhí)行所有排隊和阻塞機制。 子類可以保持其他狀態(tài)字段,但只以

原子方式更新int使用方法操縱值getState() , setState(int)和compareAndSetState(int, int)被跟蹤相對于同步。

  此類支持默認獨占模式和共享模式。 當以獨占模式獲取時,嘗試通過其他線程獲取不能成功。 多線程獲取的共享模式可能(但不需要)成功。 除了在機械意義上,這個類不理解這些差異,當共享

模式獲取成功時,下一個等待線程(如果存在)也必須確定它是否也可以獲取。 在不同模式下等待的線程共享相同的FIFO隊列。 通常,實現(xiàn)子類只支持這些模式之一,但是兩者都可以在

ReadWriteLock中發(fā)揮作用。僅支持獨占或僅共享模式的子類不需要定義支持未使用模式的方法。

總結來說就是:

①子類通過繼承AQS并實現(xiàn)其抽象方法來管理同步狀態(tài),對于同步狀態(tài)的更改通過提供的getState()、setState(int state)、compareAndSetState(int expect, int update)來進行操作,因為使用CAS操作保證同步狀態(tài)的改變是原子的。

②子類被推薦定義為自定義同步組件的靜態(tài)內(nèi)部類,同步器本身并沒有實現(xiàn)任何的同步接口,僅僅是定義了若干狀態(tài)獲取和釋放的方法來提供自定義同步組件的使用。

③同步器既可以支持獨占式的獲取同步狀態(tài),也可以支持共享式的獲取同步狀態(tài)(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等不同類型的同步組件)

3、同步器是實現(xiàn)鎖的關鍵,在鎖的實現(xiàn)中聚合同步器,利用同步器實現(xiàn)鎖的語義;

回到頂部

二、AQS的接口和實例

1、同步器的設計實現(xiàn)原理

繼承同步器并且重寫指定的方法,然后將同步器組合在自定義同步組件的實現(xiàn)中,并且調用同步器提供的模板方法(這些模板方法會調用重寫的方法);而重寫指定的方法的時候,需要使用getState()、setState(int state)、compareAndSetState(int expect, int update)來訪問或者更新同步狀態(tài)。下面是源碼中state變量和三個方法的定義聲明實現(xiàn)

1   /**

2? ? ? * .(同步狀態(tài))

3? ? ? */

4? ? private volatile int state;

5

6? ? /**

7? ? ? * (返回當前的同步狀態(tài))

8? ? ? * 此操作的內(nèi)存語義為@code volatile read

9? ? ? */

10? ? protected final int getState() {

11? ? ? ? return state;

12? ? }

13

14? ? /**

15? ? ? * (設置新的同步狀態(tài))

16? ? ? * 此操作的內(nèi)存語義為@code volatile read

17? ? ? */

18? ? protected final void setState(int newState) {

19? ? ? ? state = newState;

20? ? }

21

22? ? /**

23? ? ? * (如果要更新的狀態(tài)和期望的狀態(tài)相同,那就通過原子的方式更新狀態(tài))

24? ? ? * ( 此操作的內(nèi)存語義為@code volatile read 和 write)

25? ? ? * (如果更新的狀態(tài)和期望的狀態(tài)不同就返回false)

26? ? ? */

27? ? protected final boolean compareAndSetState(int expect, int update) {

28? ? ? ? return unsafe.compareAndSwapInt(this, stateOffset, expect, update);

29? ? }



2、下面介紹AQS提供可被重寫的方法

1 /**

2? * 獨占式的獲取同步狀態(tài),實現(xiàn)該方法需要查詢當前狀態(tài)并判斷同步狀態(tài)是否符合預期,然后再進行CAS設置同步狀態(tài)

3? *

4? */

5 protected boolean tryAcquire(int arg) {

6? ? throw new UnsupportedOperationException();

7 }

8

9 /**

10? * 獨占式的釋放同步狀態(tài),等待獲取同步狀態(tài)的線程可以有機會獲取同步狀態(tài)

11? *

12? */

13 protected boolean tryRelease(int arg) {

14? ? throw new UnsupportedOperationException();

15 }

16

17 /**

18? * 嘗試以共享模式獲取。 該方法應該查詢對象的狀態(tài)是否允許在共享模式下獲取該對象,如果是這樣,就可以獲取它。 該方法總是由執(zhí)行獲取的線程調用。

19? * 如果此方法報告失敗,則獲取方法可能將線程排隊(如果尚未排隊),直到被其他線程釋放為止。 獲取失敗時返回負值,如果在獲取成共享模式下功但沒

20? * 有后續(xù)共享模式獲取可以成功,則為零; 并且如果以共享模式獲取成功并且隨后的共享模式獲取可能成功,則為正值,在這種情況下,后續(xù)等待線程必須檢查可用性。

21? */

22 protected int tryAcquireShared(int arg) {

23? ? throw new UnsupportedOperationException(); //如果不支持共享模式 ,會拋出該異常

24 }

25

26 /**

27? * 嘗試將狀態(tài)設置為以共享模式釋放同步狀態(tài)。 該方法總是由執(zhí)行釋放的線程調用。

28? */

29 protected int tryReleaseShared(int arg) {

30? ? throw new UnsupportedOperationException(); //如果不支持共享模式 ,會拋出該異常

31 }

32

33 /**

34? * 當前同步器是否在獨占模式下被線程占用,一般該方法表示是否被當前線程所獨占

35? */

36 protected int isHeldExclusively(int arg) {

37? ? throw new UnsupportedOperationException(); //如果不支持共享模式 ,會拋出該異常

38 }

3、同步器提供的模板方法

在實現(xiàn)自定義同步組件的時候,需要重寫上面的方法,而下面的模板方法會調用上面重寫的方法。下面介紹同步器提供的模板方法

1 /**

2? * 以獨占模式獲取,忽略中斷。 通過調用至少一次tryAcquire(int)實現(xiàn),成功返回。 否則線

3? * 程排隊,可能會重復阻塞和解除阻塞,直到成功才調用tryAcquire(int)

4? */

5 public final void acquire(int arg) {...}

6

7 /**

8? * 以獨占方式獲得,如果中斷,中止。 通過首先檢查中斷狀態(tài),然后調用至少一次

9? * tryAcquire(int) ,成功返回。 否則線程排隊,可能會重復阻塞和解除阻塞,調用

10? * tryAcquire(int)直到成功或線程中斷。

11? */

12 public final void acquireInterruptibly(int arg) throws InterruptedException {...}

13

14 /**

15? * 嘗試以獨占模式獲取,如果中斷則中止,如果給定的超時時間失敗。 首先檢查中斷狀態(tài),然

16? * 后調用至少一次tryAcquire(int) ,成功返回。 否則,線程排隊,可能會重復阻塞和解除阻

17? * 塞,調用tryAcquire(int)直到成功或線程中斷或超時

18? */

19 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {...}

20

21 /**

22? * 以共享模式獲取,忽略中斷。 通過首次調用至少一次執(zhí)行 tryAcquireShared(int),成功返

23? * 回。 否則線程排隊,可能會重復阻塞和解除阻塞,直到成功調用tryAcquireShared(int) 。

24? */

25 public final void acquireShared(int arg){...}

26

27 /**

28? * 以共享方式獲取,如果中斷,中止。 首先檢查中斷狀態(tài),然后調用至少一次

29? * tryAcquireShared(int) ,成功返回。 否則線程排隊,可能會重復阻塞和解除阻塞,調用

30? * tryAcquireShared(int)直到成功或線程中斷。

31? */

32 public final void acquireSharedInterruptibly(int arg) throws InterruptedException{...}

33

34 /**

35? * 嘗試以共享模式獲取,如果中斷則中止,如果給定的時間超過,則失敗。 通過首先檢查中斷

36? * 狀態(tài),然后調用至少一次tryAcquireShared(int) ,成功返回。 否則,線程排隊,可能會重

37? * 復阻塞和解除阻塞,調用tryAcquireShared(int)直到成功或線程中斷或超時。

38? */

39 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{...}

40

41 /**

42? * 獨占式的釋放同步狀態(tài),該方法會在釋放同步狀態(tài)之后,將同步隊列中的第一個節(jié)點包含的線程喚醒

43? */

44 public final boolean release(int arg){...}

45

46 /**

47? * 共享式的釋放同步狀態(tài)

48? */

49 public final boolean releaseShared(int arg){...}

50

51 /**

52? * 獲取在等待隊列上的線程集合

53? */

54 public final Collection<Thread> getQueuedThreads(){...}

回到頂部

三、隊列同步器的實現(xiàn)分析

?1、同步隊列

a)t同步隊列的實現(xiàn)原理

AQS內(nèi)部維護一個同步隊列來完成同步狀態(tài)的管理,當前線程獲取同步狀態(tài)失敗的時候,AQS會將當前線程以及等待狀態(tài)信息構造成一個結點Node并將其加入同步隊列中,同時阻塞當前線程,當同步狀態(tài)由持有線程釋放的時候,會將同步隊列中的首節(jié)點喚醒使其再次嘗試獲取同步狀態(tài)。同步隊列中的結點用來保存獲取同步狀態(tài)失敗的線程的線程引用、等待狀態(tài)以及前驅結點和后繼結點。下面是Node的屬性分析

1? ? static final class Node {

2? ? ? ? /** 共享模式下構造結點 */

3? ? ? ? static final Node SHARED = new Node();

4? ? ? ? /** 獨占模式下構造結點 */

5? ? ? ? static final Node EXCLUSIVE = null;

6

7? ? ? ? /** 用于指示線程已經(jīng)取消的waitStatus值(由于在同步隊列中等待的線程等待超時或者發(fā)生中斷,需要從同步隊列中取消等待,結點進入該狀態(tài)將不會發(fā)生變化)*/

8? ? ? ? static final int CANCELLED =? 1;

9? ? ? ? /** waitstatus值指示后續(xù)線程需要取消等待(后繼結點的線程處于等待狀態(tài),而當前結點的線程如果釋放了同步狀態(tài)或者CANCELL,將會通知后繼結點的線程以運行) */

10? ? ? ? static final int SIGNAL? ? = -1;

11? ? ? ? /**waitStatus值表示線程正在等待條件(原本結點在等待隊列中,結點線程等待在Condition上,當其他線程對Condition調用了signal()方法之后)該結點會從

         等待隊列中轉移到同步隊列中,進行同步狀態(tài)的獲取 */

12? ? ? ? static final int CONDITION = -2;

13? ? ? ? /**

14? ? ? ? ? * waitStatus值表示下一個共享式同步狀態(tài)的獲取應該無條件傳播下去

15? ? ? ? ? */

16? ? ? ? static final int PROPAGATE = -3;

17

18? ? ? ? /**

19? ? ? ? ? * 不同的等到狀態(tài)的int值

20? ? ? ? ? */

21? ? ? ? volatile int waitStatus;

22

23? ? ? ? /**

24? ? ? ? ? * 前驅結點,當結點加入同步隊列將會被設置前驅結點信息

25? ? ? ? ? */

26? ? ? ? volatile Node prev;

27

28? ? ? ? /**

29? ? ? ? ? * 后繼結點

30? ? ? ? ? */

31? ? ? ? volatile Node next;

32

33? ? ? ? /**

34? ? ? ? ? * 當前獲取到同步狀態(tài)的線程

35? ? ? ? ? */

36? ? ? ? volatile Thread thread;

37

38? ? ? ? /**

39? ? ? ? ? * 等待隊列中的后繼結點,如果當前結點是共享的,那么這個字段是一個SHARED常量;也就是說結點類型(獨占和共享)和等待隊列中的后繼結點公用一個字段

40? ? ? ? ? */

41? ? ? ? Node nextWaiter;

42

43? ? ? ? /**

44? ? ? ? ? * 如果是共享模式下等待,那么返回true(因為上面的Node nextWaiter字段在共享模式下是一個SHARED常量)

45? ? ? ? ? */

46? ? ? ? final boolean isShared() {

47? ? ? ? ? ? return nextWaiter == SHARED;

48? ? ? ? }

49

50? ? ? ? final Node predecessor() throws NullPointerException {

51? ? ? ? ? ? Node p = prev;

52? ? ? ? ? ? if (p == null)

53? ? ? ? ? ? ? ? throw new NullPointerException();

54? ? ? ? ? ? else

55? ? ? ? ? ? ? ? return p;

56? ? ? ? }

57

58? ? ? ? Node() {? ? // 用于建立初始頭結點或SHARED標記

59? ? ? ? }

60

61? ? ? ? Node(Thread thread, Node mode) {? ? // 用于添加到等待隊列

62? ? ? ? ? ? this.nextWaiter = mode;

63? ? ? ? ? ? this.thread = thread;

64? ? ? ? }

65

66? ? ? ? Node(Thread thread, int waitStatus) { // Used by Condition

67? ? ? ? ? ? this.waitStatus = waitStatus;

68? ? ? ? ? ? this.thread = thread;

69? ? ? ? }

70? ? }

b)同步隊列示意圖和簡單分析

①同步隊列示意圖:當一個線程獲取了同步狀態(tài)后,其他線程不能獲取到該同步狀態(tài),就會被構造稱為Node然后添加到同步隊列之中,這個添加的過程基于CAS保證線程安全性。

②同步隊列遵循先進先出(FIFO),首節(jié)點是獲取到同步狀態(tài)的結點,首節(jié)點的線程在釋放同步狀態(tài)的時候將會喚醒后繼結點(然后后繼結點就會變成新的首節(jié)點等待獲取同步狀態(tài))

2、獨占式同步狀態(tài)的獲取和釋放

①前面說過,同步器的acquire()方法會獲取同步狀態(tài),這個方法對不會響應中斷,也就是說當線程獲取通同步狀態(tài)失敗后會被構造成結點加入到同步隊列中,當線程被中斷時不會從同步隊列中移除。

1 /**

2? * ①首先調用tryAcquire方法嘗試獲取同步狀態(tài),如果獲取同步狀態(tài)失敗,就進行下面的操作

3? * ②獲取失?。喊凑摘氄际降哪J綐嬙焱浇Y點并通過addWaiter方法將結點添加到同步隊列的尾部

4? * ③通過acquireQueue方法自旋獲取同步狀態(tài)。

5? * ④如果獲取不到同步狀態(tài),就阻塞結點中的線程,而結點中的線程喚醒主要是通過前驅結點的出隊或者被中斷來實現(xiàn)

6? */

7 public final void acquire(int arg) {

8? ? if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

9? ? ? ? selfInterrupt();

10 }

?②下面是addWaiter、enq和自旋獲取同步狀態(tài)acquireQueue方法的實現(xiàn)(該方法的主要作用就是將獲取同步狀態(tài)失敗的線程構造成結點然后添加到同步隊列的隊尾)

1 private Node addWaiter(Node mode) {

2? ? Node node = new Node(Thread.currentThread(), mode);

3? ? //嘗試直接放在隊尾

4? ? Node pred = tail; //直接獲取同步器的tail結點

5? ? if (pred != null) {

6? ? ? ? node.prev = pred;

7? ? ? ? if (compareAndSetTail(pred, node)) {

8? ? ? ? ? ? //隊尾結點不為空通過原子操作將構造的結點置為隊尾結點

9? ? ? ? ? ? pred.next = node;

10? ? ? ? ? ? return node;

11? ? ? ? }

12? ? }

13? ? //采用自旋方式保證構造的結點添加到同步隊列中

14? ? enq(node);

15? ? return node;

16 }

17 private Node enq(final Node node) {

18? ? for (;;) { //死循環(huán)知道添加成功

19? ? ? ? Node t = tail;

20? ? ? ? if (t == null) { // Must initialize

21? ? ? ? ? ? if (compareAndSetHead(new Node()))

22? ? ? ? ? ? ? ? tail = head;

23? ? ? ? } else {

24? ? ? ? ? ? node.prev = t;

25? ? ? ? ? ? //通過CAS方式將結點添加到同步隊列之后才會返回,否則就會不斷嘗試添加(這樣實際上就是在并發(fā)情況下,把向同步隊列添加Node變得串行化了)

26? ? ? ? ? ? if (compareAndSetTail(t, node)) {

27? ? ? ? ? ? ? ? t.next = node;

28? ? ? ? ? ? ? ? return t;

29? ? ? ? ? ? }

30? ? ? ? }

31? ? }

32 }? ?

33 /**

34? *? ? 通過tryAcquire()和addWaiter(),表示該線程獲取同步狀態(tài)已經(jīng)失敗,被放入同步? ? ? ?

35? * 隊列尾部了。線程阻塞等待直到其他線程(前驅結點獲得同步裝填或者被中斷)釋放同步狀

36? * 態(tài)后喚醒自己,自己才能獲得。

37? */

38 final boolean acquireQueued(final Node node, int arg) {

39? ? boolean failed = true;

40? ? try {

41? ? ? ? boolean interrupted = false;

42? ? ? ? //線程在死循環(huán)的方式中嘗試獲取同步狀態(tài)

43? ? ? ? for (;;) {

44? ? ? ? ? ? final Node p = node.predecessor(); //獲取前驅結點

45? ? ? ? ? ? //只有前驅接待是頭結點的時候才能嘗試獲取同步狀態(tài)

46? ? ? ? ? ? if (p == head && tryAcquire(arg)) {

47? ? ? ? ? ? ? ? setHead(node); //獲取到同步狀態(tài)之后,就將自己設置為頭結點

48? ? ? ? ? ? ? ? p.next = null; //前驅結點已經(jīng)獲得同步狀態(tài)去執(zhí)行自己的程序了,所以需要釋放掉占用的同步隊列的資源,由JVM回收

49? ? ? ? ? ? ? ? failed = false;

50? ? ? ? ? ? ? ? return interrupted;

51? ? ? ? ? ? }

52? ? ? ? ? ? //如果獲取同步狀態(tài)失敗,應該自旋等待繼續(xù)獲取并且校驗自己的中斷標志位信息

53? ? ? ? ? ? if (shouldParkAfterFailedAcquire(p, node) &&

54? ? ? ? ? ? ? ? parkAndCheckInterrupt())

55? ? ? ? ? ? ? ? interrupted = true; //如果被中斷,就改變自己的中斷標志位狀態(tài)信息

56? ? ? ? }

57? ? } finally {

58? ? ? ? if (failed)

59? ? ? ? ? ? cancelAcquire(node);

60? ? }

61 }

③獨占式獲取同步狀態(tài)的整個流程

④獨占式同步器的釋放:release方法執(zhí)行時,會喚醒頭結點的后繼結點線程

public final boolean release(int arg) {

? ? if (tryRelease(arg)) {

? ? ? ? Node h = head;//頭結點

? ? ? ? //喚醒頭結點的后繼結點線程

? ? ? ? if (h != null && h.waitStatus != 0)

? ? ? ? ? ? unparkSuccessor(h);

? ? ? ? return true;

? ? }

? ? return false;

}

3、共享式同步狀態(tài)的獲取和釋放?

①共享式獲取和獨占式獲取最主要的區(qū)別是能否有多個線程同時獲取到同步狀態(tài)。如圖所示簡易描述二者的區(qū)別(共享式訪問的時候,可以允許多個線程訪問資源,但是存在獨占式訪問的時候,同一時刻其他的不管是共享還是獨占都會被阻塞)

②關于共享式獲取同步狀態(tài)的方法

1 /**

2? * 此方法是共享模式下線程獲取共享同步狀態(tài)的頂層入口。它會嘗試去獲取同步狀態(tài),獲取成功則直接返回,

3? * 獲取失敗則進入等待隊列一直嘗試獲取(執(zhí)行doAcquireShared方法體中的內(nèi)容),直到獲取到資源為止(條件就是tryAcquireShared方法返回值大于等于0),整個過程忽略中斷

4? */

5 public final void acquireShared(int arg) {

6? ? if (tryAcquireShared(arg) < 0)

7? ? ? ? doAcquireShared(arg);

8 }

9 /**

10? * "自旋"嘗試獲取同步狀態(tài)

11? */

12 private void doAcquireShared(int arg) {

13? ? //首先將該線程包括線程引用、等待狀態(tài)、前驅結點和后繼結點的信息封裝臺Node中,然后添加到等待隊列里面(一共享模式添加)

14? ? final Node node = addWaiter(Node.SHARED);

15? ? boolean failed = true;

16? ? try {

17? ? ? ? boolean interrupted = false; //當前線程的中斷標志

18? ? ? ? for (;;) {

19? ? ? ? ? ? final Node p = node.predecessor(); //獲取前驅結點

20? ? ? ? ? ? if (p == head) {

21? ? ? ? ? ? ? ? //當前驅結點是頭結點的時候就會以共享的方式去嘗試獲取同步狀態(tài)

22? ? ? ? ? ? ? ? int r = tryAcquireShared(arg);

23? ? ? ? ? ? ? ? //判斷tryAcquireShared的返回值

24? ? ? ? ? ? ? ? if (r >= 0) {

25? ? ? ? ? ? ? ? ? ? //如果返回值大于等于0,表示獲取同步狀態(tài)成功,就修改當前的頭結點并將信息傳播都后續(xù)的結點隊列中

26? ? ? ? ? ? ? ? ? ? setHeadAndPropagate(node, r);

27? ? ? ? ? ? ? ? ? ? p.next = null; // 釋放掉已經(jīng)獲取到同步狀態(tài)的前驅結點的資源

28? ? ? ? ? ? ? ? ? ? if (interrupted)

29? ? ? ? ? ? ? ? ? ? ? ? selfInterrupt(); //檢查中斷標志

30? ? ? ? ? ? ? ? ? ? failed = false;

31? ? ? ? ? ? ? ? ? ? return;

32? ? ? ? ? ? ? ? }

33? ? ? ? ? ? }

34? ? ? ? ? ? if (shouldParkAfterFailedAcquire(p, node) &&

35? ? ? ? ? ? ? ? parkAndCheckInterrupt())

36? ? ? ? ? ? ? ? interrupted = true;

37? ? ? ? }

38? ? } finally {

39? ? ? ? if (failed)

40? ? ? ? ? ? cancelAcquire(node);

41? ? }

42 }

根據(jù)源代碼我們可以了解共享式獲取同步狀態(tài)的整個過程

首先同步器會調用tryAcquireShared方法來嘗試獲取同步狀態(tài),然后根據(jù)這個返回值來判斷是否獲取到同步狀態(tài)(當返回值大于等于0可視為獲取到同步狀態(tài));如果第一次獲取失敗的話,就進入'自旋'狀態(tài)(執(zhí)行doAcquireShared方法)一直嘗試去獲取同步狀態(tài);在自旋獲取中,如果檢查到當前前驅結點是頭結點的話,就會嘗試獲取同步狀態(tài),而一旦獲取成功(tryAcquireShared方法返回值大于等于0)就可以從自旋狀態(tài)退出。

另外,還有一點就是上面說到的一個處于等待隊列的線程要想開始嘗試去獲取同步狀態(tài),需要滿足的條件就是前驅結點是頭結點,那么它本身就是整個隊列中的第二個結點。當頭結點釋放掉所有的臨界資源之后,我們考慮每個線程運行所需資源的不同數(shù)量問題,如下圖所示

③共享式同步狀態(tài)的釋放

對于支持共享式的同步組件(即多個線程同同時訪問),它們和獨占式的主要區(qū)別就是tryReleaseShared方法必須確保同步狀態(tài)的釋放是線程安全的(CAS的模式來釋放同步狀態(tài),因為既然是多個線程能夠訪問,那么釋放的時候也會是多個線程的,就需要保證釋放時候的線程安全)

1 /**

2? * 該方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列里的其他線程來獲取資源。

3? */

4 public final boolean releaseShared(int arg) {

5? ? if (tryReleaseShared(arg)) {

6? ? ? ? doReleaseShared(); //

7? ? ? ? return true;

8? ? }

9? ? return false;

10 }

回到頂部

四、自定義同步組件的實現(xiàn)

1、共享式鎖的實現(xiàn)

①、自定義一個同步組件,可以允許兩個線程訪問(共享式同步組件),超過兩個線程就會被阻塞。

②、既然是共享式同步組件,按照前面所說的,組件本身需要使用AQS提供的共享式模板方法acquireShared等;組件的內(nèi)部類需要實現(xiàn)AQS,并且重寫關于共享式獲取同步狀態(tài)的方法(tryAcquireShared()、tryReleaseShared()等共享模式下的方法)。

③、既然是兩個線程能夠同時訪問的話,那么狀態(tài)數(shù)的取值范圍就是0、1、2了,每當一個線程獲取到同步狀態(tài)的時候state值減1,反之就會增加1;當state值為0的時候就會阻塞其他想要獲取同步狀態(tài)的線程。對于同步狀態(tài)的更改需要使用CAS來進行保證原子性。

package cn.source.concurrent;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

public class TestAQS implements Lock{

??private Sync sync = new Sync(2);


??private static class Sync extends AbstractQueuedSynchronizer {


????Sync(int num) {

??????if(num <= 0) {

????????throw new RuntimeException("num需要大于0");

??????}

??????setState(num);

????}

????@Override

????protected int tryAcquireShared(int arg) {

??????for(; ;) {

????????int currentState = getState();

????????int newState = currentState - arg;

????????if(newState < 0 || compareAndSetState(currentState, newState)) {

??????????return newState;

????????}

??????}

????}

????@Override

????protected boolean tryReleaseShared(int arg) {

??????for(; ;) {

????????int currentState = getState();

????????int newState = currentState + arg;

????????if(compareAndSetState(currentState, newState)) {

??????????return true;

????????}

??????}

????}



??}

??@Override

??public void lock() {

????sync.acquireShared(1);

??}

??@Override

??public void unlock() {

????sync.releaseShared(1);

??}

??//......

}

共享式鎖

/**

?* 測試結果:輸出的線程名稱是成對的,保證同一時刻只有兩個線程能夠獲取到鎖

?*

?*/?????

public class TestLockShare {

??@Test

??public void test() {

????Lock lock = new TestAQS();

????class Worker extends Thread {

??????@Override

??????public void run() {

????????while(true) {

??????????lock.lock();

??????????try {

????????????Thread.sleep(1000);

????????????System.out.println(Thread.currentThread().getName());

????????????Thread.sleep(1000);

??????????} catch (Exception e) {

????????????e.printStackTrace();

??????????} finally {

????????????lock.unlock();

??????????}

????????}

??????}


????}


????for (int i = 0; i < 8; i++) {

??????Worker worker = new Worker();

??????worker.setDaemon(true);

??????worker.start();


????}

????for (int i = 0; i < 8; i++) {

??????try {

????????Thread.sleep(1000);

??????} catch (InterruptedException e) {

????????// TODO Auto-generated catch block

????????e.printStackTrace();

??????}

??????System.out.println();

????}

??}

}

共享式鎖測試

2、獨占式鎖的實現(xiàn)

package cn.source.concurrent;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

public class Mutex implements Lock{

??private Sync sync = new Sync();


??private static class Sync extends AbstractQueuedSynchronizer {

????/**

?????* 嘗試獲取資源,立即返回。成功則返回true,否則false。

?????*/

????@Override

????protected boolean tryAcquire(int arg) {

??????if(compareAndSetState(0, 1)) {//state為0才設置為1,不可重入!

????????setExclusiveOwnerThread(Thread.currentThread());//設置為當前線程獨占資源

????????return true;

??????}

??????return false;

????}

????/**

?????* 嘗試釋放資源,立即返回。成功則為true,否則false。

?????*/

????@Override

????protected boolean tryRelease(int arg) {

??????if(getState() == 0) { //既然來釋放,那肯定就是已占有狀態(tài)了。只是為了保險,多層判斷!

????????throw new IllegalMonitorStateException();

??????}

??????setExclusiveOwnerThread(null);

??????setState(0);

??????return true;

????}

????@Override

????protected boolean isHeldExclusively() {

??????// 判斷是否鎖定狀態(tài)

??????return getState() == 1;

????}


??}


??@Override

??public void lock() {

????sync.acquire(1);

??}

??@Override

??public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

????return sync.tryAcquire(1);

??}

??@Override

??public void unlock() {

????sync.release(1);

??}

}

獨占式鎖

public class TestMutex {

??@Test

??public void test() {

????Lock lock = new Mutex();

????class Worker extends Thread {

??????@Override

??????public void run() {

????????while(true) {

??????????lock.lock();

??????????try {

????????????Thread.sleep(1000);

????????????System.out.println(Thread.currentThread().getName());

????????????Thread.sleep(1000);

??????????} catch (Exception e) {

????????????e.printStackTrace();

??????????} finally {

????????????lock.unlock();

??????????}

????????}

??????}


????}


????for (int i = 0; i < 8; i++) {

??????Worker worker = new Worker();

??????worker.setDaemon(true);

??????worker.start();


????}

????for (int i = 0; i < 8; i++) {

??????try {

????????Thread.sleep(1000);

??????} catch (InterruptedException e) {

????????e.printStackTrace();

??????}

??????System.out.println();

????}

??}

}

獨占式鎖測試

歡迎工作一到五年的Java工程師朋友們加入Java程序員開發(fā): 721575865

群內(nèi)提供免費的Java架構學習資料(里面有高可用、高并發(fā)、高性能及分布式、Jvm性能調優(yōu)、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個交代!

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 長著長著,花兒就忘記了最初的模樣,可她欣然接受每一個階段的自己,哪怕她發(fā)現(xiàn)自己在同類中是如此不同,雖然從同樣的一片...
    文心訪藝閱讀 470評論 1 1
  • 作為前端,一直以來都知道HTTP劫持與XSS跨站腳本(Cross-site scripting)、CSRF跨站請求...
    IM魂影閱讀 643評論 0 1
  • 上河·下河·灌城村 遠心 (組詩發(fā)表在《詩選刊》上半月刊第11、12期合刊) 1.下河 到河北曲陽縣下河村了,還有...
    遠心篤行閱讀 7,146評論 38 10

友情鏈接更多精彩內(nèi)容