(1)先來一個小案例作為切入點
/*
這里記為代碼一
*/
@RestController
public class IndexController {
@Autowired
private Redisson redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate; //組件spring-boot-starter-data-redis
@RequestMapping("/deduct_stock")
public String deductStock(){
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //可以理解為jedis.get("stock")
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("Stock", realStock + ""); //可以理解為jedis.set(key,value)
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
return "end";
}
}
然后在redis中搞一個庫存為200

現(xiàn)在很明顯,代碼一 存在線程安全問題,會有可能讀到都是200,然后都減1后設(shè)置為199,就不對了。
很多同學(xué)都會想到加一把鎖
(2)synchronized
/*
代碼二
*/
public String deductStock(){
synchronized (this){
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //可以理解為jedis.get("stock")
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("Stock", realStock + ""); //可以理解為jedis.set(key,value)
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
}
return "end";
}
這樣的確是只能有一個線程執(zhí)行操作,確實是線程安全了。但是它只能在單機環(huán)境下運行,只能鎖住一個tomcat,分布式的時候就不行了。

(3)分布式鎖
這時,應(yīng)該考慮分布式鎖。SETNX(SET if Not eXists)。和set的區(qū)別是:
set tuling A
set tuling B
結(jié)果會是B
setnx tuling A
setnx tuling B
結(jié)果會是A
/*
代碼三
*/
public String deductStock(){
String lockKey = "product_101";
//如果返回false,說明redis中有這個key了,不做任何操作。如果返回true說明執(zhí)行這個命令之前沒有這個key,并設(shè)置成功了
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling"); //就理解為jedis.setnx(key,value)
if(!result){
return "error_code"; //給前端錯誤碼,當(dāng)前系統(tǒng)繁忙,請稍后再試
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //可以理解為jedis.get("stock")
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("Stock", realStock + ""); //可以理解為jedis.set(key,value)
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
stringRedisTemplate.delete(lockKey);
return "end";
}

redis那邊是單線程操作的,會排隊,只有排隊頭的可以設(shè)置成功,后面的設(shè)置不成功,這樣入門級的分布式鎖設(shè)計完了。大家想想還有沒有問題?
這個時候還是存在問題,當(dāng)獲取到鎖的線程有異常,導(dǎo)致沒法刪除key,就會導(dǎo)致其他線程獲取不到鎖,就算能捕獲異常,但如果是系統(tǒng)掛了呢,運維重啟呢
/*
代碼四
*/
public String deductStock(){
String lockKey = "product_101";
//如果返回false,說明redis中有這個key了,不做任何操作。如果返回true說明執(zhí)行這個命令之前沒有這個key,并設(shè)置成功了
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling"); //就理解為jedis.setnx(key,value)
if(!result){
return "error_code"; //給前端錯誤碼,當(dāng)前系統(tǒng)繁忙,請稍后再試
}
try{
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //可以理解為jedis.get("stock")
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("Stock", realStock + ""); //可以理解為jedis.set(key,value)
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
}finally {
stringRedisTemplate.delete(lockKey);
}
return "end";
}
示例 代碼四 還是存在問題,大家先想想解決方法。
(4)鎖超時
這樣的話,可以加一個超時時間來解決,給key一個超時時間,即使系統(tǒng)掛了,一段時間之后,其他機器還是能正常訪問
/*
代碼五
*/
public String deductStock(){
String lockKey = "product_101";
//如果返回false,說明redis中有這個key了,不做任何操作。如果返回true說明執(zhí)行這個命令之前沒有這個key,并設(shè)置成功了
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling"); //就理解為jedis.setnx(key,value)
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
if(!result){
return "error_code"; //給前端錯誤碼,當(dāng)前系統(tǒng)繁忙,請稍后再試
}
try{
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //可以理解為jedis.get("stock")
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("Stock", realStock + ""); //可以理解為jedis.set(key,value)
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
}finally {
stringRedisTemplate.delete(lockKey);
}
return "end";
}
大家想想 代碼五 還有問題嗎?
(5)加鎖操作原子性
假設(shè)設(shè)置了key之后,正準(zhǔn)備設(shè)置超時時間,但系統(tǒng)掛了,那還是回到之前的問題了,得保證原子性。應(yīng)該使用setIfAbsent的其他重載方法,有一個是可以同時設(shè)置超時時間的
/*
代碼六
*/
public String deductStock(){
String lockKey = "product_101";
//如果返回false,說明redis中有這個key了,不做任何操作。如果返回true說明執(zhí)行這個命令之前沒有這個key,并設(shè)置成功了
// Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling"); //就理解為jedis.setnx(key,value)
// stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling", 10, TimeUnit.SECONDS);
if(!result){
return "error_code"; //給前端錯誤碼,當(dāng)前系統(tǒng)繁忙,請稍后再試
}
try{
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //可以理解為jedis.get("stock")
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("Stock", realStock + ""); //可以理解為jedis.set(key,value)
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
}finally {
stringRedisTemplate.delete(lockKey);
}
return "end";
}
大家思考一下,代碼六 還有沒有問題?
遇到高并發(fā)的時候,通常執(zhí)行會比較慢,慢執(zhí)行啊,中間sql語句執(zhí)行很慢這樣,假設(shè)執(zhí)行完這個方法需要15秒,當(dāng)線程執(zhí)行了10秒的時候,由于設(shè)置了超時時間是10秒,并且是高并發(fā)場景,這個時候key就刪除了,另外的線程就獲取了鎖

這樣就相當(dāng)于鎖永久失效。雖然把過期時間放大是可以避免,但還是無法徹底解決問題。
本質(zhì)是自己加的鎖被別人解掉了,所以解決就是鎖只能自己解鎖
/*
代碼七
*/
public String deductStock(){
String lockKey = "product_101";
//如果返回false,說明redis中有這個key了,不做任何操作。如果返回true說明執(zhí)行這個命令之前沒有這個key,并設(shè)置成功了
// Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling"); //就理解為jedis.setnx(key,value)
// stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
String clientId = UUID.randomUUID().toString();
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
if(!result){
return "error_code"; //給前端錯誤碼,當(dāng)前系統(tǒng)繁忙,請稍后再試
}
try{
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //可以理解為jedis.get("stock")
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("Stock", realStock + ""); //可以理解為jedis.set(key,value)
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
}finally {
if(clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){
stringRedisTemplate.delete(lockKey);
}
}
return "end";
}
代碼七 按上面的例子,鎖是自己過期的,這代碼只是能保證線程1無法刪除線程2的鎖,但線程1和線程2還是同時在跑啊。這個時間還有問題,但是先不管,先放放,因為不是想要引申的內(nèi)容,要繼續(xù)思考這個代碼還有除時間外的什么其他問題?
就是finally中的兩行代碼非原子,寫并發(fā)代碼和寫高并發(fā)代碼時的區(qū)別,應(yīng)該要習(xí)慣性的在代碼之間空幾行,表明這里執(zhí)行有時間差,非原子。
假設(shè)執(zhí)行判斷完clientId確實是等于當(dāng)前線程的value,假設(shè)這時剛好是9.9秒,突然發(fā)生卡頓,但這個if判斷已經(jīng)是true了,正準(zhǔn)備delete的時候,卡頓了,這時已經(jīng)過了10秒,線程2已經(jīng)獲取了鎖,然后線程1執(zhí)行delete,又出問題了,仍然是線程1刪除了線程2的鎖。
怎么處理?
(6)鎖續(xù)命
鎖續(xù)命:通常是這樣處理的,有一個分線程定時任務(wù),用來監(jiān)測線程還是否持有鎖,還持有的就延長鎖的過期時間,例如鎖超時是30秒,那么分線程每10秒判斷一下,線程還是否持有鎖,還持有就更新過期時間,不能說是延長,是按當(dāng)前時間又重新設(shè)置30秒過期,當(dāng)不持有了,定時任務(wù)就結(jié)束,分線程也結(jié)束。
redisson:操作redis的客戶端,有很多分布式功能,其中就有分布式鎖。想起了吧?代碼一 中就已經(jīng)引入了redisson
/*
代碼八
*/
public String deductStock(){
String lockKey = "product_101";
//如果返回false,說明redis中有這個key了,不做任何操作。如果返回true說明執(zhí)行這個命令之前沒有這個key,并設(shè)置成功了
// Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling"); //就理解為jedis.setnx(key,value)
// stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
/*String clientId = UUID.randomUUID().toString();
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
if(!result){
return "error_code"; //給前端錯誤碼,當(dāng)前系統(tǒng)繁忙,請稍后再試
}*/
RLock redissonLock = redisson.getLock(lockKey);
try{
//加鎖
redissonLock.lock(); //理解為執(zhí)行了setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS)
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //可以理解為jedis.get("stock")
if(stock > 0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("Stock", realStock + ""); //可以理解為jedis.set(key,value)
System.out.println("扣減成功,剩余庫存:" + realStock);
}else{
System.out.println("扣減失敗,庫存不足");
}
}finally {
redissonLock.unlock();
/*if(clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){
stringRedisTemplate.delete(lockKey);
}*/
}
return "end";
}
redisson加鎖核心lua腳本

KEYS[1]:product_101
ARGV[2]:getLockName(threadId)
ARGV[1]:internalLockLeaseTime(初始化是30秒)
可以看到第250行和251行,就相當(dāng)于 代碼五 中的
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling"); //就理解為jedis.setnx(key,value)
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
而這兩行代碼是不具有原子性的,線程不安全。Lua腳本可以保證原子性
鎖續(xù)命:
/*
https://github.com/redisson/redisson/blob/redisson-3.6.5/redisson/src/main/java/org/redisson/RedissonLock.java
redisson-3.6.5 RedissonLock.java,其他版本會不太不一樣,但原理應(yīng)該不變吧
*/
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
它延遲 internalLockLeaseTime / 3 秒執(zhí)行run方法,為它重新設(shè)置expire為 internalLockLeaseTime。
commandExecutor.evalWriteAsync返回了一個future,然后future又添加監(jiān)聽器,最后執(zhí)行當(dāng)前方法scheduleExpirationRenewal(threadId);,就是一直重復(fù)續(xù)命,又再延遲調(diào)用,相當(dāng)于定時任務(wù)。
到目前為止,基本就沒有什么坑了,redisson已經(jīng)是填了很多坑,可以放心使用 代碼八 進行實現(xiàn)。
但是,還有點問題,假設(shè)有多個請求在執(zhí)行redissonLock.lock()加鎖,只能有一個線程在處理,其他都得等著,系統(tǒng)就會很慢,存在性能問題,該怎么優(yōu)化能做到雙十一能用的級別?
(7)zookeeper
redis一般都是有主從架構(gòu)的,基本不會是單機使用

redis主節(jié)點馬上告訴客戶端加鎖成功,線程1就執(zhí)行業(yè)務(wù)代碼邏輯,然后redis準(zhǔn)備把key同步給從節(jié)點時候,結(jié)果主節(jié)點掛了,某個從節(jié)點選舉成為新的Master主節(jié)點,來了個線程3訪問新的主節(jié)點加鎖,線程3就發(fā)現(xiàn)沒有product_101這個key,又可以加鎖成功了,線程1業(yè)務(wù)邏輯還沒執(zhí)行完畢,線程3就開始執(zhí)行,就又出現(xiàn)了問題

主從架構(gòu)鎖失效的問題,可以用zookeeper來實現(xiàn)分布式鎖,和redis類似,是樹形結(jié)構(gòu)。redis更多的實現(xiàn)是AP架構(gòu),zookeeper更多的實現(xiàn)是CAP架構(gòu)。
zookeeper的話,當(dāng)要寫一個key,不是就立即返回成功的,會先把key同步給集群的其他節(jié)點,子節(jié)點會返回同步成功的信息,主節(jié)點會判斷是否已經(jīng)有超過半數(shù)的子節(jié)點都同步成功,這時才告訴客戶端成功了,是為了保證一致性,犧牲了及時響應(yīng),但它能保證那些已經(jīng)同步了子節(jié)點才能成功leader,redis就沒有這個機制,也就是線程3再來請求leader的時候,必然會有key,加鎖就不成功,解決了上述問題。
但如果不使用zookeeper,就是要使用redis來解決呢?(因為redis的并發(fā)比zookeeper高不少)如果要高并發(fā),就用redis,就有上述主從鎖問題,如果要保證健壯性就用zookeeper,但犧牲了并發(fā)數(shù)。
(8)Redlock
硬是要使用redis的話,看看Redlock

redis沒有主從關(guān)系,是對等的,往每個節(jié)點發(fā)送加鎖命令,只有超過半數(shù)的節(jié)點返回成功才認(rèn)為客戶端加鎖成功,和zookeeper原理類似。但這種方式不推薦,原來是一個redis節(jié)點,現(xiàn)在搞多個,要半數(shù)加鎖成功,對我們加鎖性能受一定影響,這樣的話,還不如用zookeeper,因為redlock還有不少問題。
@RequestMapping("/redlock")
public String redlock(){
String lockKey = "product_101";
RLock lock1 = redisson.getLock(lockKey);
RLock lock2 = redisson.getLock(lockKey);
RLock lock3 = redisson.getLock(lockKey);
//根據(jù)多個RLock對象構(gòu)件RedissonRedLock
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try{
/*
* waitTimeout 嘗試獲取鎖的最大等待時間,超過這個數(shù),則認(rèn)為獲取鎖失敗
* leaseTime 鎖的持有時間,超過這個時間鎖會自動失?。ㄖ祽?yīng)設(shè)置為大于業(yè)務(wù)處理的時間,確保在鎖有效期內(nèi)業(yè)務(wù)能處理完)
* */
boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);
if(res){
//成功獲取鎖,處理業(yè)務(wù)
}
}catch (Exception e) {
throw new RuntimeException("lock fail");
}finally {
//無論如何,最后都要解鎖
redLock.unlock();
}
return "end";
}
回到 代碼八,redissonLock.lock();會導(dǎo)致其他線程等待,也就是分布式鎖把并行請求變串行化執(zhí)行了。那么如何提升分布式鎖性能?
(9)分段鎖
模仿ConcurrentHashMap,分段鎖。
假設(shè)product_101的數(shù)量是200,那么可以分十段,
product_101_1=20
product_101_2=20
product_101_3=20
......
product_101_10=20
200個庫存分10個key存到redis中去,讓每個線程去減不同的段位的庫存,如果不夠減的話就減一下個段位,實現(xiàn)的話有點難,但可以理解這個思想,就不再去擴展了。
(10)緩存數(shù)據(jù)庫雙寫不一致
接下來說redis作為緩存使用的時候,常見問題有:緩存無底洞、緩存穿透、緩存雪崩、緩存失效、熱點key傾斜、熱點key重建、緩存數(shù)據(jù)庫雙寫不一致。
這里針對緩存數(shù)據(jù)庫雙寫不一致的問題說一下。
什么是緩存數(shù)據(jù)庫雙寫不一致?

看上去線程1寫數(shù)據(jù)庫,然后更新緩存,線程2寫數(shù)據(jù)庫,然后更新緩存,沒有什么問題,但如果線程1操作較慢(小卡頓)呢?

有些人就會說,通常不會直接更新緩存,而是把緩存刪掉,即更新就刪緩存,讀數(shù)據(jù)的時候再設(shè)置緩存,的確這樣是比較好,因為每次寫完就更新緩存的話,如果不讀緩存,相當(dāng)于白更新。

但這樣還是有問題

還有什么方法解決?
(11)解決雙寫不一致的方法
延遲雙刪:刪緩存刪兩次,刪除之后sleep(一段時間)后再刪一次
但這種方法只能說是減少,并不能解決問題,并且還讓所有的寫請求都得sleep一段時間
內(nèi)存隊列:用hash運算把操作路由到某個隊列中順序執(zhí)行。是可以解決,但復(fù)雜,寫不好很可能有性能問題或是bug
還有沒有其他解決方法?
問題的本質(zhì)就是操作過程中不是原子性,如果(寫數(shù)據(jù)庫-刪除緩存)是不可分割的操作,(查緩存-查數(shù)據(jù)庫-更新緩存)是不可分割的操作,即在操作前加分布式鎖,操作完后解鎖,所有線程的操作為隊列,把多個并發(fā)執(zhí)行的線程串行化

直接這樣上鎖,性能肯定是有問題的,怎么優(yōu)化?
(12)讀多寫少的情況
直接上分布式鎖會有問題,使用讀寫鎖
讀寫鎖:讀操作加讀鎖,寫操作加寫鎖,讀操作不互斥,寫鎖跟讀鎖、寫鎖跟寫鎖互斥。
由于很多系統(tǒng)都是讀多寫少的情況,所以可以提高性能
/*
代碼九
*/
@RequestMapping("/get_stock")
public String getStock(@RequestParam("clientId") Long clientId) throws InterruptedException{
String lockKey = "product_101";
RReadWriteLock readWriteLock = redisson.getReadWriteLock(lockKey);
RLock rLock = readWriteLock.readLock();
rLock.lock();
System.out.println("獲取讀鎖成功:client="+clientId);
String stock = stringRedisTemplate.opsForValue().get("stock");
if(StringUtils.isEmpty(stock)){
System.out.println("查詢數(shù)據(jù)庫庫存為10。。。");
Thread.sleep(5000);
stringRedisTemplate.opsForValue().set("stock", 10);
}
rLock.unlock();
System.out.println("釋放讀鎖成功:client="+clientId);
return "end";
}
@RequestMapping("/update_stock")
public String updateStock(@RequestParam("clientId") Long clientId) throws InterruptedException{
String lockKey = "product_101";
RReadWriteLock readWriteLock = redisson.getReadWriteLock(lockKey);
RLock writeLock = readWriteLock.writeLock();
writeLock.lock();
System.out.println("獲取寫鎖成功:client="+clientId);
System.out.println("修改商品101的數(shù)據(jù)庫庫存為6。。。");
stringRedisTemplate.delete("stock");
Thread.sleep(5000);
writeLock.unlock();
System.out.println("釋放寫鎖成功:client="+clientId);
return "end";
}
原理就是lua腳本為每個key設(shè)置一個mode的值來記錄是read還是write。
RedissonWriteLock.java

但如果讀多寫也多的情況呢,怎么處理?
不采用上面的方法,仍然是給緩存過期時間,然后操作的時候直接操作數(shù)據(jù)庫。例如在頁面上看到的庫存,其實很多時候都是和數(shù)據(jù)庫的值不一致的,就是為了實現(xiàn)高并發(fā),又要用數(shù)據(jù)庫又要用緩存,只能犧牲一致性,犧牲一致性其實關(guān)系并不大,想一想,假設(shè)一致的話,加入購物車、下訂單,中間是有時間差的,這個時候可能就沒有了庫存了,對用戶來說是不一致,但對程序來說,程序以及保證了一致,只是意義不大,所以犧牲一致性來提高性能。假設(shè)過期時間是一分鐘,那在這一分鐘內(nèi)可能是不一致,但如果一分鐘后庫存不變,又讀取更新了緩存,這個時候就變一致了,只需要確保在下單的時候是用db的數(shù)據(jù)即可。
(13)讀多寫多的情況
如果是讀多寫多,又要保證緩存數(shù)據(jù)庫一致性,怎么辦?
對讀多寫多的場景,就不應(yīng)該用緩存,直接操作數(shù)據(jù)庫就好了,對吧。
也有方法既使用緩存,又應(yīng)對讀多寫多的場景,中間件canal。后面就學(xué)不著了,需要報課。。。。以后再看看