分布式鎖的解決方式
- 基于數(shù)據(jù)庫表做樂觀鎖,用于分布式鎖。(適用于小并發(fā))
- 使用memcached的add()方法,用于分布式鎖。
- 使用memcached的cas()方法,用于分布式鎖。(不常用)
- 使用redis的setnx()、expire()方法,用于分布式鎖。
- 使用redis的setnx()、get()、getset()方法,用于分布式鎖。
- 使用redis的watch、multi、exec命令,用于分布式鎖。(不常用)
- 使用zookeeper,用于分布式鎖。(不常用)
這里主要介紹第四種和第五種:
使用redis的setnx()、expire()方法,用于分布式鎖
原理
對于使用redis的setnx()、expire()來實現(xiàn)分布式鎖,這個方案相對于memcached()的add()方案,redis占優(yōu)勢的是,其支持的數(shù)據(jù)類型更多,而memcached只支持String一種數(shù)據(jù)類型。除此之外,無論是從性能上來說,還是操作方便性來說,其實都沒有太多的差異,完全看你的選擇,比如公司中用哪個比較多,你就可以用哪個。
首先說明一下setnx()命令,setnx的含義就是SET if Not Exists,其主要有兩個參數(shù) setnx(key, value)。該方法是原子的,如果key不存在,則設置當前key成功,返回1;如果當前key已經(jīng)存在,則設置當前key失敗,返回0。但是要注意的是setnx命令不能設置key的超時時間,只能通過expire()來對key設置。
具體的使用步驟如下:
- setnx(lockkey, 1) 如果返回0,則說明占位失敗;如果返回1,則說明占位成功
- expire()命令對lockkey設置超時時間,為的是避免死鎖問題。
- 執(zhí)行完業(yè)務代碼后,可以通過delete命令刪除key。
為了保證在某個Redis節(jié)點不可用的時候算法能夠繼續(xù)運行,這個獲取鎖的操作還有一個超時時間(timeOut),它要遠小于鎖的有效時間(幾十毫秒量級)。
可能存在的問題
這個方案其實是可以解決日常工作中的需求的,但從技術方案的探討上來說,可能還有一些可以完善的地方。比如,如果在第一步setnx執(zhí)行成功后,在expire()命令執(zhí)行成功前,發(fā)生了宕機的現(xiàn)象,那么就依然會出現(xiàn)死鎖的問題,所以如果要對其進行完善的話,可以使用redis的setnx()、get()和getset()方法來實現(xiàn)分布式鎖。
具體實現(xiàn)
鎖具體實現(xiàn)RedisLock:
package com.xiaolyuh.lock;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
public class RedisLock {
private static Logger logger = LoggerFactory.getLogger(RedisLock.class);
//////////////////// 靜態(tài)常量定義開始///////////////////////
/**
* 存儲到redis中的鎖標志
*/
private static final String LOCKED = "LOCKED";
/**
* 默認請求鎖的超時時間(ms 毫秒)
*/
private static final long TIME_OUT = 100;
/**
* 默認鎖的有效時間(s)
*/
public static final int EXPIRE = 60;
//////////////////// 靜態(tài)常量定義結束///////////////////////
/**
* 鎖標志對應的key
*/
private String key;
/**
* 鎖的有效時間(s)
*/
private int expireTime = EXPIRE;
/**
* 請求鎖的超時時間(ms)
*/
private long timeOut = TIME_OUT;
/**
* 鎖flag
*/
private volatile boolean isLocked = false;
/**
* Redis管理模板
*/
private StringRedisTemplate redisTemplate;
/**
* 構造方法
*
* @param redisTemplate Redis管理模板
* @param key 鎖定key
* @param expireTime 鎖過期時間 (秒)
* @param timeOut 請求鎖超時時間 (毫秒)
*/
public RedisLock(StringRedisTemplate redisTemplate, String key, int expireTime, long timeOut) {
this.key = key;
this.expireTime = expireTime;
this.timeOut = timeOut;
this.redisTemplate = redisTemplate;
}
/**
* 構造方法
*
* @param redisTemplate Redis管理模板
* @param key 鎖定key
* @param expireTime 鎖過期時間
*/
public RedisLock(StringRedisTemplate redisTemplate, String key, int expireTime) {
this.key = key;
this.expireTime = expireTime;
this.redisTemplate = redisTemplate;
}
/**
* 構造方法(默認請求鎖超時時間30秒,鎖過期時間60秒)
*
* @param redisTemplate Redis管理模板
* @param key 鎖定key
*/
public RedisLock(StringRedisTemplate redisTemplate, String key) {
this.key = key;
this.redisTemplate = redisTemplate;
}
public boolean lock() {
// 系統(tǒng)當前時間,納秒
long nowTime = System.nanoTime();
// 請求鎖超時時間,納秒
long timeout = timeOut * 1000000;
final Random random = new Random();
// 不斷循環(huán)向Master節(jié)點請求鎖,當請求時間(System.nanoTime() - nano)超過設定的超時時間則放棄請求鎖
// 這個可以防止一個客戶端在某個宕掉的master節(jié)點上阻塞過長時間
// 如果一個master節(jié)點不可用了,應該盡快嘗試下一個master節(jié)點
while ((System.nanoTime() - nowTime) < timeout) {
// 將鎖作為key存儲到redis緩存中,存儲成功則獲得鎖
if (redisTemplate.opsForValue().setIfAbsent(key, LOCKED)) {
isLocked = true;
// 設置鎖的有效期,也是鎖的自動釋放時間,也是一個客戶端在其他客戶端能搶占鎖之前可以執(zhí)行任務的時間
// 可以防止因異常情況無法釋放鎖而造成死鎖情況的發(fā)生
redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
// 上鎖成功結束請求
break;
}
// 獲取鎖失敗時,應該在隨機延時后進行重試,避免不同客戶端同時重試導致誰都無法拿到鎖的情況出現(xiàn)
// 睡眠10毫秒后繼續(xù)請求鎖
try {
Thread.sleep(10, random.nextInt(50000));
} catch (InterruptedException e) {
logger.error("獲取分布式鎖休眠被中斷:", e);
}
}
return isLocked;
}
public boolean isLock() {
redisTemplate.getConnectionFactory().getConnection().time();
return redisTemplate.hasKey(key);
}
public void unlock() {
// 釋放鎖
// 不管請求鎖是否成功,只要已經(jīng)上鎖,客戶端都會進行釋放鎖的操作
if (isLocked) {
redisTemplate.delete(key);
}
}
}
調(diào)用鎖:
public void redisLock(int i) {
RedisLock redisLock = new RedisLock(redisTemplate, "redisLockKey:"+i % 10, 5*60 , 500);
try {
long now = System.currentTimeMillis();
if (redisLock.lock()) {
logger.info("=" + (System.currentTimeMillis() - now));
// TODO 獲取到鎖要執(zhí)行的代碼塊
logger.info("j:" + j ++);
} else {
logger.info("k:" + k ++);
}
} catch (Exception e) {
logger.info(e.getMessage(), e);
} finally {
// 一定要釋放鎖
redisLock.unlock();
}
}
使用redis的setnx()、get()、getset()方法,用于分布式鎖
原理
這個方案的背景主要是在setnx()和expire()的方案上針對可能存在的死鎖問題,做了一版優(yōu)化。
那么先說明一下這三個命令,對于setnx()和get()這兩個命令,相信不用再多說什么。那么getset()命令?這個命令主要有兩個參數(shù) getset(key,newValue)。該方法是原子的,對key設置newValue這個值,并且返回key原來的舊值。假設key原來是不存在的,那么多次執(zhí)行這個命令,會出現(xiàn)下邊的效果:
- getset(key, "value1") 返回nil 此時key的值會被設置為value1
- getset(key, "value2") 返回value1 此時key的值會被設置為value2
- 依次類推!
介紹完要使用的命令后,具體的使用步驟如下:
setnx(lockkey, 當前時間+過期超時時間) ,如果返回1,則獲取鎖成功;如果返回0則沒有獲取到鎖,轉向2。
get(lockkey)獲取值oldExpireTime ,并將這個value值與當前的系統(tǒng)時間進行比較,如果小于當前系統(tǒng)時間,則認為這個鎖已經(jīng)超時,可以允許別的請求重新獲取,轉向3。
計算newExpireTime=當前時間+過期超時時間,然后getset(lockkey, newExpireTime) 會返回當前l(fā)ockkey的值currentExpireTime。
判斷currentExpireTime與oldExpireTime 是否相等,如果相等,說明當前getset設置成功,獲取到了鎖。如果不相等,說明這個鎖又被別的請求獲取走了,那么當前請求可以直接返回失敗,或者繼續(xù)重試。
在獲取到鎖之后,當前線程可以開始自己的業(yè)務處理,當處理完畢后,比較自己的處理時間和對于鎖設置的超時時間,如果小于鎖設置的超時時間,則直接執(zhí)行delete釋放鎖;如果大于鎖設置的超時時間,則不需要再鎖進行處理。
可能存在的問題
問題1: 在“get(lockkey)獲取值oldExpireTime ”這個操作與“getset(lockkey, newExpireTime) ”這個操作之間,如果有N個線程在get操作獲取到相同的oldExpireTime后,然后都去getset,會不會返回的newExpireTime都是一樣的,都會是成功,進而都獲取到鎖???
我認為這套方案是不存在這個問題的。依據(jù)有兩條: 第一,redis是單進程單線程模式,串行執(zhí)行命令。 第二,在串行執(zhí)行的前提條件下,getset之后會比較返回的currentExpireTime與oldExpireTime 是否相等。
問題2: 在“get(lockkey)獲取值oldExpireTime ”這個操作與“getset(lockkey, newExpireTime) ”這個操作之間,如果有N個線程在get操作獲取到相同的oldExpireTime后,然后都去getset,假設第1個線程獲取鎖成功,其他鎖獲取失敗,但是獲取鎖失敗的線程它發(fā)起的getset命令確實執(zhí)行了,這樣會不會造成第一個獲取鎖的線程設置的鎖超時時間一直在延長???
我認為這套方案確實存在這個問題的可能。但我個人認為這個微笑的誤差是可以忽略的,不過技術方案上存在缺陷,大家可以自行抉擇哈。
問題3: 這個方案必須要保證分布式服務器的時間一定要同步,否則這個鎖就會出問題。
具體實現(xiàn)
鎖具體實現(xiàn)RedisLock:
package com.xiaolyuh.lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* Redis分布式鎖(這種方式服務器時間一定要同步,否則會出問題)
*
* @author yuhao.wangwang
* @version 1.0
* @date 2017年11月3日 上午10:21:27
*/
public class RedisLock2 {
/**
* 默認請求鎖的超時時間(ms 毫秒)
*/
private static final long TIME_OUT = 100;
/**
* 默認鎖的有效時間(s)
*/
public static final int EXPIRE = 60;
private static Logger logger = LoggerFactory.getLogger(RedisLock2.class);
private StringRedisTemplate redisTemplate;
/**
* 鎖標志對應的key
*/
private String lockKey;
/**
* 鎖的有效時間(s)
*/
private int expireTime = EXPIRE;
/**
* 請求鎖的超時時間(ms)
*/
private long timeOut = TIME_OUT;
/**
* 鎖的有效時間
*/
private long expires = 0;
/**
* 鎖標記
*/
private volatile boolean locked = false;
final Random random = new Random();
/**
* 使用默認的鎖過期時間和請求鎖的超時時間
*
* @param redisTemplate
* @param lockKey 鎖的key(Redis的Key)
*/
public RedisLock2(StringRedisTemplate redisTemplate, String lockKey) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey + "_lock";
}
/**
* 使用默認的請求鎖的超時時間,指定鎖的過期時間
*
* @param redisTemplate
* @param lockKey 鎖的key(Redis的Key)
* @param expireTime 鎖的過期時間(單位:秒)
*/
public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, int expireTime) {
this(redisTemplate, lockKey);
this.expireTime = expireTime;
}
/**
* 使用默認的鎖的過期時間,指定請求鎖的超時時間
*
* @param redisTemplate
* @param lockKey 鎖的key(Redis的Key)
* @param timeOut 請求鎖的超時時間(單位:毫秒)
*/
public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, long timeOut) {
this(redisTemplate, lockKey);
this.timeOut = timeOut;
}
/**
* 鎖的過期時間和請求鎖的超時時間都是用指定的值
*
* @param redisTemplate
* @param lockKey 鎖的key(Redis的Key)
* @param expireTime 鎖的過期時間(單位:秒)
* @param timeOut 請求鎖的超時時間(單位:毫秒)
*/
public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, int expireTime, long timeOut) {
this(redisTemplate, lockKey, expireTime);
this.timeOut = timeOut;
}
/**
* @return 獲取鎖的key
*/
public String getLockKey() {
return lockKey;
}
/**
* 獲得 lock.
* 實現(xiàn)思路: 主要是使用了redis 的setnx命令,緩存了鎖.
* reids緩存的key是鎖的key,所有的共享, value是鎖的到期時間(注意:這里把過期時間放在value了,沒有時間上設置其超時時間)
* 執(zhí)行過程:
* 1.通過setnx嘗試設置某個key的值,成功(當前沒有這個鎖)則返回,成功獲得鎖
* 2.鎖已經(jīng)存在則獲取鎖的到期時間,和當前時間比較,超時的話,則設置新的值
*
* @return true if lock is acquired, false acquire timeouted
* @throws InterruptedException in case of thread interruption
*/
public boolean lock() {
// 請求鎖超時時間,納秒
long timeout = timeOut * 1000000;
// 系統(tǒng)當前時間,納秒
long nowTime = System.nanoTime();
while ((System.nanoTime() - nowTime) < timeout) {
// 分布式服務器有時差,這里給1秒的誤差值
expires = System.currentTimeMillis() + expireTime + 1;
String expiresStr = String.valueOf(expires); //鎖到期時間
if (redisTemplate.opsForValue().setIfAbsent(lockKey, expiresStr)) {
locked = true;
// 設置鎖的有效期,也是鎖的自動釋放時間,也是一個客戶端在其他客戶端能搶占鎖之前可以執(zhí)行任務的時間
// 可以防止因異常情況無法釋放鎖而造成死鎖情況的發(fā)生
redisTemplate.expire(lockKey, expireTime, TimeUnit.SECONDS);
// 上鎖成功結束請求
return true;
}
String currentValueStr = redisTemplate.opsForValue().get(lockKey); //redis里的時間
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
//判斷是否為空,不為空的情況下,如果被其他線程設置了值,則第二個條件判斷是過不去的
// lock is expired
String oldValueStr = redisTemplate.opsForValue().getAndSet(lockKey, expiresStr);
//獲取上一個鎖到期時間,并設置現(xiàn)在的鎖到期時間,
//只有一個線程才能獲取上一個線上的設置時間,因為jedis.getSet是同步的
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
//防止誤刪(覆蓋,因為key是相同的)了他人的鎖——這里達不到效果,這里值會被覆蓋,但是因為什么相差了很少的時間,所以可以接受
//[分布式的情況下]:如過這個時候,多個線程恰好都到了這里,但是只有一個線程的設置值和當前值相同,他才有權利獲取鎖
// lock acquired
locked = true;
return true;
}
}
/*
延遲10 毫秒, 這里使用隨機時間可能會好一點,可以防止饑餓進程的出現(xiàn),即,當同時到達多個進程,
只會有一個進程獲得鎖,其他的都用同樣的頻率進行嘗試,后面有來了一些進行,也以同樣的頻率申請鎖,這將可能導致前面來的鎖得不到滿足.
使用隨機的等待時間可以一定程度上保證公平性
*/
try {
Thread.sleep(10, random.nextInt(50000));
} catch (InterruptedException e) {
logger.error("獲取分布式鎖休眠被中斷:", e);
}
}
return locked;
}
/**
* 解鎖
*/
public synchronized void unlock() {
// 只有加鎖成功并且鎖還有效才去釋放鎖
if (locked && expires > System.currentTimeMillis()) {
redisTemplate.delete(lockKey);
locked = false;
}
}
}
調(diào)用方式:
public void redisLock2(int i) {
RedisLock2 redisLock2 = new RedisLock2(redisTemplate, "redisLock:" + i % 10, 5 * 60, 500);
try {
long now = System.currentTimeMillis();
if (redisLock2.lock()) {
logger.info("=" + (System.currentTimeMillis() - now));
// TODO 獲取到鎖要執(zhí)行的代碼塊
logger.info("j:" + j++);
} else {
logger.info("k:" + k++);
}
} catch (Exception e) {
logger.info(e.getMessage(), e);
} finally {
redisLock2.unlock();
}
}
對于上面兩種redis實現(xiàn)分布式鎖的方案都有一個問題:
- 就是你獲取鎖后執(zhí)行業(yè)務邏輯的代碼只能在redis鎖的有效時間之內(nèi),因為,redis的key到期后會自動清除,這個鎖就算釋放了。所以這個鎖的有效時間一定要結合業(yè)務做好評估。
- 這兩種方式解鎖的時候是直接刪除key,假如C1獲取到了鎖,這個時候redis掛了,并且數(shù)據(jù)沒有持久化,等redis服務啟動起來,C2請求過來獲取到了鎖。但是C1請求現(xiàn)在執(zhí)行完了刪除了key,這個時候就把C2的鎖刪掉了。(在下一篇文章中有解決方案)
使用redis的SET resource-name anystring NX EX max-lock-time方式來實現(xiàn)分布式鎖
下一篇文章介紹
Spring-data-redis + redis 分布式鎖(二)
源碼: https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-data-redis-distributed-lock 工程
參考: