java.util.concurrent下的包
volatile關(guān)鍵字
定義:當(dāng)多個(gè)線程操作共享數(shù)據(jù)時(shí),可以保證內(nèi)存中的數(shù)據(jù)可見.
用法:對(duì)共享的數(shù)據(jù)添加volatile關(guān)鍵字.保證內(nèi)存可見性.
解釋:當(dāng)兩個(gè)線程共享一個(gè)數(shù)據(jù)時(shí),每個(gè)線程會(huì)將共享數(shù)據(jù)放入到自己的內(nèi)存中進(jìn)行操作,完成后再對(duì)數(shù)據(jù)進(jìn)行修改,假定A線程一直對(duì)共享數(shù)據(jù)讀取,B線程對(duì)共享數(shù)據(jù)進(jìn)行修改,當(dāng)B線程將數(shù)據(jù)取出來還沒有來得及將數(shù)據(jù)放回共享數(shù)據(jù)中時(shí),A線程將還未改變的數(shù)據(jù)取出來進(jìn)行操作,這樣就會(huì)造成讀取的結(jié)果和期望值不一致,給屬性添加volatile關(guān)鍵字,這樣多個(gè)線程是直接對(duì)線程進(jìn)行操作而不是自己取出來操作完成后再對(duì)數(shù)據(jù)進(jìn)行修改.
特點(diǎn):與synchronized相比是一種輕量級(jí)的同步策略.
注意:
1.volatile不具備"互斥性".
2.volatile不能保證變量的"原子性".
CAS算法(Compare and Swap)
定義:CAS有3個(gè)操作數(shù),內(nèi)存值V,舊的預(yù)期值A(chǔ),要修改的新值B。當(dāng)且僅當(dāng)預(yù)期值A(chǔ)和內(nèi)存值V相同時(shí),將內(nèi)存值V修改為B,否則什么都不做。
synchronized是獨(dú)占鎖,是一種悲觀鎖.
在java.util.concurrent包中,使用的樂觀鎖就是CAS機(jī)制.
concurrentHashMap
hashMap: 普通的Map實(shí)現(xiàn)類
hashTable:給整個(gè)表添加鎖,當(dāng)所有線程訪問到hashTable的時(shí)候,由并行轉(zhuǎn)化為串行和訪問,效率很低.
concurrentHashMap:鎖分段機(jī)制,并發(fā)訪問
CountDownLatch
重要的兩個(gè)方法:
countDown() :倒數(shù)一次,將計(jì)數(shù)器減一.
await(long timeout,TimeUnit unit) :等待倒數(shù)到0,如果沒有到達(dá)0,會(huì)一直阻塞,第一個(gè)參數(shù)表示設(shè)置超時(shí)時(shí)間,第二個(gè)參數(shù)為超時(shí)時(shí)間單位.
如果計(jì)數(shù)到達(dá)零,則返回 true;如果在計(jì)數(shù)到達(dá)零之前超過了等待時(shí)間,則返回 false.
一個(gè)簡單的countDownLatch例子
public static void main(String[] args) throws InterruptedException {
// 開始的倒數(shù)計(jì)時(shí)
CountDownLatch start = new CountDownLatch(1);
// 準(zhǔn)備的倒數(shù)計(jì)時(shí)
CountDownLatch end = new CountDownLatch(10);
// 創(chuàng)建一個(gè)容量為10的線程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
final int num = i;
Runnable runnable = new Runnable() {
public void run() {
try {
// 開始倒計(jì)時(shí)暫停等待
start.await(10000,TimeUnit.SECONDS);
Thread.sleep((long) (Math.random() * 10000));
System.out.println("number of " + num + " over!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 每次一個(gè)線程完成后,結(jié)束的計(jì)數(shù)器就減一
end.countDown();
}
}
};
// 線程池執(zhí)行線程
threadPool.execute(runnable);
}
System.out.println("start....");
start.countDown();// 將開始的計(jì)數(shù)設(shè)為0,所有線程同時(shí)出發(fā)
end.await();// 準(zhǔn)備的倒計(jì)時(shí)阻塞,當(dāng)end.countDown()為0時(shí),放行
System.out.println("GG");
threadPool.shutdown();//關(guān)閉線程池
}
CyclicBarrier:柵欄
定義:一個(gè)同步輔助類,它允許一組線程互相等待,直到到達(dá)某個(gè)公共屏障點(diǎn) (common barrier point)
一個(gè)簡單的柵欄例子:
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.submit(new Thread(new Runner(barrier, "hehe1")));
threadPool.submit(new Thread(new Runner(barrier, "hehe2")));
threadPool.submit(new Thread(new Runner(barrier, "hehe3")));
}
}
class Runner implements Runnable {
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
super();
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(8));
System.out.println(name + " 準(zhǔn)備好了...");
// barrier的await方法,在所有參與者都已經(jīng)在此 barrier 上調(diào)用 await 方法之前,將一直等待。
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + ":GO!" + System.currentTimeMillis());
}
}
CyclicBarrier和CountDownLatch的區(qū)別:
1.CountDownLatch : 一個(gè)線程(或者多個(gè)), 等待另外N個(gè)線程完成某個(gè)事情之后才能執(zhí)行。
2.CyclicBarrier : N個(gè)線程相互等待,任何一個(gè)線程完成之前,所有的線程都必須等待。這樣應(yīng)該就清楚一點(diǎn)了,對(duì)于CountDownLatch來說,重點(diǎn)是那個(gè)“一個(gè)線程”, 是它在等待, 而另外那N的線程在把“某個(gè)事情”做完之后可以繼續(xù)等待,可以終止。而對(duì)于CyclicBarrier來說,重點(diǎn)是那N個(gè)線程,他們之間任何一個(gè)沒有完成,所有的線程都必須等待。
CountDownLatch 是計(jì)數(shù)器, 線程完成一個(gè)就記一個(gè), 就像 報(bào)數(shù)一樣, 只不過是遞減的.
而CyclicBarrier更像一個(gè)水閘, 線程執(zhí)行就想水流, 在水閘處都會(huì)堵住, 等到水滿(線程到齊)了, 才開始泄流.CountDownLatch是不能夠重用的,而CyclicBarrier是可以重用的。
Semaphore 信號(hào)量
主要方法:
acquire() :用來獲取一個(gè)許可,若無許可能夠獲得,則會(huì)一直等待,直到獲得許可。
release() :用來釋放許可。注意,在釋放許可之前,必須先獲獲得許可。
實(shí)例:
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 創(chuàng)建一個(gè)容量為5的信號(hào)量池,表示只能最大五個(gè)線程池訪問
final Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 20; i++) {
final int NO = i;
Runnable runnable = new Runnable() {
public void run() {
try {
//獲取許可
semaphore.acquire();
System.out.println("允許..." + NO);
Thread.sleep(1000);
// 訪問完后,釋放
semaphore.release();
Thread.sleep(500);
System.out.println("可用:" + semaphore.availablePermits());
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
cachedThreadPool.execute(runnable);
}
cachedThreadPool.shutdown();
}