看了https://mp.weixin.qq.com/s/8DFA36YvDdRSaM4JwNBWpQ之后,也仿寫(xiě)了一個(gè)秒殺系統(tǒng),稱(chēng)為v1版。最近對(duì)openresty和lua有了一些技術(shù)儲(chǔ)備、就開(kāi)發(fā)了一套新的v2版秒殺。
seckill-v2秒殺系統(tǒng)
一、功能介紹
秒殺系統(tǒng)v2,主要提供3個(gè)接口方法:
-
/seckill/rest/OutletAndStock GET
查詢(xún)接口,返回當(dāng)前開(kāi)放的預(yù)約網(wǎng)點(diǎn)以及庫(kù)存信息。
-
/seckill/rest/appointment POST
下單預(yù)約接口,接收姓名、身份證、手機(jī)號(hào)、預(yù)約網(wǎng)點(diǎn)等信息,執(zhí)行預(yù)約邏輯。
-
/seckill/rest/refreshRation POST
更新了數(shù)據(jù)庫(kù)里的配給和庫(kù)存之后,同步到redis里。給管理端調(diào)用的。
二、軟件架構(gòu)設(shè)計(jì)

1、負(fù)責(zé)均衡層
openresty + lua 來(lái)做負(fù)載均衡層,可以集群部署,上面用F5或lvs來(lái)做接入。
這一層主要是解決限流(nginx限流),防刷邏輯(比如同一個(gè)ip+token每x秒只允許一次請(qǐng)求),還一個(gè)是做庫(kù)存感知、沒(méi)貨以后馬上阻擋(比如返回一個(gè)靜態(tài)頁(yè)面)接下來(lái)的無(wú)效請(qǐng)求到后端核心服務(wù)。
(1)庫(kù)存感知timer 每1s查詢(xún)r(jià)edis里網(wǎng)點(diǎn)的剩余庫(kù)存,更新到openresty本地share-dict,share-dict里邊存放剩余庫(kù)存給前端展示用,另外下單預(yù)約請(qǐng)求先查一下share-dict、如果庫(kù)存沒(méi)了則直接返回前端“活動(dòng)已結(jié)束”。
(2)網(wǎng)點(diǎn)和庫(kù)存查詢(xún)接口 返回本地share-dict里的剩余庫(kù)存給前端。
(3)預(yù)約購(gòu)買(mǎi)次數(shù)限制校驗(yàn) 每人每5天只能預(yù)約購(gòu)買(mǎi)一次。redis里邊維護(hù)一份預(yù)約購(gòu)買(mǎi)列表,由timer加載到share_dict。(核心服務(wù)層還會(huì)做一次這個(gè)校驗(yàn))
(4)限流模塊
雖然前面的庫(kù)存感知和預(yù)約次數(shù)限制校驗(yàn)可以在這里限制大部分無(wú)效請(qǐng)求進(jìn)入,但是考慮到極端情況,比如1秒內(nèi)有10萬(wàn)這種級(jí)別的流量涌入、庫(kù)存感知和次數(shù)校驗(yàn)還來(lái)不及與redis同步一致,所以這些請(qǐng)求流量會(huì)穿過(guò)openresty到達(dá)后面的核心服務(wù)層,對(duì)于核心服務(wù)層的java應(yīng)用來(lái)說(shuō)這個(gè)級(jí)別的請(qǐng)求處理起來(lái)是比較吃力的,所以我們需要在負(fù)載均衡層這里做一下限流。這里采用nginx自帶的限流功能:
nginx.conf http:
limit_req_zone $binary_remote_addr zone=perip_rps:10m rate=5r/s; #單ip每秒限制5個(gè)請(qǐng)求
limit_req_zone $server_name zone=perserver_rps:10m rate=3000r/s; #每個(gè)server每秒限制處理3000個(gè)請(qǐng)求
server:
limit_req zone=perserver_rps burst=2000 nodelay; #server每秒請(qǐng)求限流
location:
limit_req zone=perip_rps burst=10 nodelay; #每個(gè)ip每秒請(qǐng)求如果超過(guò)limit_req_zone的配置,最多可以緩沖10個(gè)
這里我們利用limit_req做了兩個(gè)維度的限流,首先是單個(gè)ip限制每秒5個(gè)請(qǐng)求、突發(fā)最多允許10個(gè),這里配置了nodelay意思是正常情況下應(yīng)該是200ms漏桶通過(guò)一個(gè)請(qǐng)求,但如果一下來(lái)了10個(gè)請(qǐng)求的話也是可以給通過(guò),只不過(guò)后續(xù)要過(guò)2s才可以通過(guò)下一個(gè)請(qǐng)求。
然后是整個(gè)server限制3000的rps,允許突發(fā)2000。
2、核心服務(wù)層
由springboot + redis組成,redis里邊存訂單和庫(kù)存信息。
(1)下單預(yù)約接口,由redis lua script保證扣庫(kù)存操作的原子性,然后將訂單生成請(qǐng)求提交給mq。
除了庫(kù)存判斷之外,這里還要做一次預(yù)約購(gòu)買(mǎi)次數(shù)限制的校驗(yàn),原因在于nginx上面是定時(shí)(比如500ms一次)去redis讀取blacklist的。在扣庫(kù)存之前一定要校驗(yàn)一下預(yù)約次數(shù)的規(guī)則。
再一個(gè)可選的查詢(xún)商品信息和庫(kù)存的接口、之所以可選是因?yàn)槿绻翘禺惢拿霘⑾到y(tǒng),完全可以把商品信息靜態(tài)化到cdn上的頁(yè)面上去。另外庫(kù)存不顯示也沒(méi)太大問(wèn)題,庫(kù)存沒(méi)了會(huì)由負(fù)載均衡層動(dòng)態(tài)判定并攔截掉、或者活動(dòng)結(jié)束以后直接掛一個(gè)活動(dòng)結(jié)束的頁(yè)面上去。
3、異步服務(wù)層
核心服務(wù)層我們盡量做到功能單一化,把可以異步處理的邏輯用RocketMQ從核心服務(wù)中剝離出來(lái),只保留必要的邏輯供負(fù)載均衡層過(guò)來(lái)的流量進(jìn)行同步調(diào)用。RocketMQ這里起到的便是削峰緩沖的作用了,提高整體的吞吐能力。這樣異步邏輯由于不直接承載C端的流量,并且異步服務(wù)作為末端業(yè)務(wù)邏輯相比最前端的負(fù)載均衡層流量要下降幾個(gè)數(shù)量級(jí)(想象10萬(wàn)人搶100個(gè)商品,最后走到異步服務(wù)層去生成訂單落庫(kù)),可以作為mq的消費(fèi)端以較少的算力資源進(jìn)行部署。這些異步邏輯可能包括訂單寫(xiě)入數(shù)據(jù)庫(kù)等等。
(1)異步訂單入庫(kù): 從RocketMQ中拿訂單,然后寫(xiě)入MySQL。消費(fèi)和入庫(kù)都使用批量處理,以提高效率。
(2)每天去redis更新維護(hù)一下購(gòu)買(mǎi)記錄,做每日購(gòu)買(mǎi)次數(shù)限制用。
三、數(shù)據(jù)結(jié)構(gòu)
redis數(shù)據(jù)結(jié)構(gòu)
某網(wǎng)點(diǎn)庫(kù)存
key:outlet:{id}:date:{date}:stock
value:String類(lèi)型,存放網(wǎng)點(diǎn)的庫(kù)存
例如,key: outlet:1:date:2021-10-01:stock , value: 3000
某日內(nèi)已預(yù)約的身份證名單
key:appointment:idNos:{date}
value:Set類(lèi)型 idNo1 ... idNon
mysql數(shù)據(jù)結(jié)構(gòu)
網(wǎng)點(diǎn)表
CREATE TABLE `t_seckill_outlet` (
`outlet_id` bigint(20) NOT NULL AUTO_INCREMENT,
`address` varchar(150) DEFAULT NULL COMMENT '網(wǎng)點(diǎn)地址',
`outlet_name` varchar(50) DEFAULT NULL COMMENT '網(wǎng)點(diǎn)名稱(chēng)',
PRIMARY KEY (`outlet_id`)
)
預(yù)約記錄明細(xì)表
CREATE TABLE `t_seckill_appointment` (
`appointment_id` bigint(20) NOT NULL AUTO_INCREMENT,
`date` varchar(10) DEFAULT NULL COMMENT '日期',
`id_card` varchar(18) DEFAULT NULL COMMENT '身份證號(hào)',
`mobile` varchar(11) DEFAULT NULL COMMENT '手機(jī)號(hào)',
`name` varchar(15) DEFAULT NULL COMMENT '姓名',
`outlet_id` mediumtext COMMENT '網(wǎng)點(diǎn)ID',
PRIMARY KEY (`appointment_id`)
)
四、核心代碼
openresty上的lua代碼和nginx配置文件:
定時(shí)timer我們放在nginx.conf的http位置
init_worker_by_lua_file lua/wangan/seckill/task_timer.lua;
task_timer.lua代碼如下:
--[[
定時(shí)從redis加載網(wǎng)點(diǎn)和庫(kù)存到本地內(nèi)存
]]
local redis = require "wangan.common.redis_iresty"
local cache = require "wangan.common.share_cache"
local red = redis:new({
ip = "122.51.114.183",
port = 6379,
password = "7474@628",
timeout = 2000,
db_index = 0,
max_idle_ms = 60000,
pool_size = 32
})
local delay = 1 --每delay秒跟redis同步一次數(shù)據(jù)
local handlerRepeat
handlerRepeat = function ( ... )
--ngx.log(ngx.INFO, "從redis加載網(wǎng)點(diǎn)和庫(kù)存到本地內(nèi)存...")
local len = red:llen("outlets:ids")
local outlets = red:lrange("outlets:ids", 0, len)
local today = ngx.today()
for _, v in pairs(outlets) do --遍歷outlet id
local stock_key = "outlet:" .. v .. ":date:" ..today .. ":stock"
local stock = red:get(stock_key) --從redis查到當(dāng)日這個(gè)網(wǎng)點(diǎn)的庫(kù)存
if stock then
--ngx.log(ngx.INFO, stock_key)
--ngx.log(ngx.INFO, stock)
local cache_v = cache.get_from_cache(stock_key)
local ok, err = cache.set_to_cache(stock_key, stock, 30) --緩存在本地內(nèi)存shared_dict
if not ok then
ngx.log(ngx.ERR , "寫(xiě)入本地緩存失敗:", err)
end
--[[
if cache_v then
ngx.log(ngx.INFO, "shared_dict中" .. stock_key .. "的庫(kù)存是" .. cache_v)
end
if not cache_v then
ngx.log(ngx.INFO, cache_v)
end
]]
end
end
end
local ok, err = ngx.timer.every(delay, handlerRepeat)
if not ok then
ngx.log(ngx.ERR, "創(chuàng)建timer.every(delay, handlerRepeat)失?。?, err)
return
end
如上面這樣openresty的本地內(nèi)存里邊就有每個(gè)網(wǎng)點(diǎn)的剩余庫(kù)存了,且1秒跟redis同步一次,數(shù)據(jù)比較實(shí)時(shí)。當(dāng)庫(kù)存沒(méi)了可以直接從本地內(nèi)存查到并返回客戶(hù)端,不用再去redis或者去后邊的核心服務(wù)去查了。很大程度上提高了性能。
所以接下來(lái)就是在請(qǐng)求的access階段去做這個(gè)庫(kù)存校驗(yàn):
--[[
預(yù)約校驗(yàn)
]]
local cache = require "wangan.common.share_cache"
local json = require "cjson"
--先讀request body
ngx.req.read_body()
--從request body里獲取參數(shù)
--local args = ngx.req.get_post_args()
--獲取request body data
local request_body_data = ngx.req.get_body_data()
if not request_body_data then
ngx.say("request body is nil")
return
end
ngx.log(ngx.INFO, "request body string", request_body_data)
--將request body data解析為json
local request_body_json = json.decode(request_body_data)
--ngx.log(ngx.INFO, "request body json", request_body_json)
--請(qǐng)求參數(shù)校驗(yàn)
local outletId = request_body_json.outletId
if not outletId then
ngx.say("網(wǎng)點(diǎn)id不可為空")
return
end
--庫(kù)存剩余校驗(yàn)
local stock_key = "outlet:" .. outletId .. ":date:" .. ngx.today() .. ":stock"
ngx.log(ngx.INFO, "stock_key ", stock_key)
local stock = cache.get_from_cache(stock_key);
if not stock then
ngx.say("未查到庫(kù)存,該網(wǎng)點(diǎn)尚未開(kāi)始預(yù)約")
return
end
ngx.log(ngx.INFO, stock_key .. " , 當(dāng)前庫(kù)存: ", stock)
if tonumber(stock)<=0 then
ngx.say("庫(kù)存已空,已預(yù)約完畢,感謝參與");
return
end
關(guān)于openresty的執(zhí)行階段,可以進(jìn)一步參考:https://blog.51cto.com/lisea/2425794
然后是nginx.conf配置:
lua_shared_dict my_cache 128m;
init_worker_by_lua_file lua/wangan/seckill/task_timer.lua;
upstream seckillcore {
server 127.0.0.1:8080;
}
server {
listen 80;
server_name localhost;
#開(kāi)發(fā)調(diào)試模式、關(guān)閉lua代碼緩存,生產(chǎn)環(huán)境請(qǐng)勿關(guān)閉
lua_code_cache off;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
root html;
index index.html index.htm;
}
location /seckill/rest/appointment {
default_type text/html;
access_by_lua_file lua/wangan/seckill/appointment_check.lua;
proxy_pass http://seckillcore;
proxy_redirect default;
}
}
核心服務(wù)里的java代碼:
/**
* 核心業(yè)務(wù)邏輯
* */
@Slf4j
@Service
public class SeckillService {
@Autowired
private RedisDao redisDao;
@Autowired
private DefaultRedisScript<List> deductMyStock;
@Autowired
private GeneralMqProducer generalMqProducer;
/**
* 預(yù)約業(yè)務(wù)邏輯
*
* */
public String appointment(@RequestBody AppointmentDto dto) {
AppointmentDetail appointDetail = new AppointmentDetail();
appointDetail.setName(dto.getName());
appointDetail.setDate(LocalDate.now().toString());
appointDetail.setIdCard(dto.getIdCard());
appointDetail.setMobile(dto.getMobile());
appointDetail.setOutletId(dto.getOutletId());
log.info(JSON.toJSONString(appointDetail));
//使用redis script扣庫(kù)存, 如成功則添加此身份證號(hào)到redis里的已預(yù)約列表
List<String> keys = new ArrayList<>();
keys.add("outlet:"+dto.getOutletId()+":date:"+LocalDate.now().toString()+":stock");
Map<String, Object> args = new HashMap<>();
args.put("buyNum", 5);
args.put("idCard", dto.getIdCard());
args.put("dates", dateList5());
List result = redisDao.executeScript(deductMyStock, keys, args);
Long errCode = (Long)result.get(0);
String errMsg = (String)result.get(1);
//預(yù)約,扣庫(kù)存。
if (errCode.longValue()==0) {
//用rocketmq異步寫(xiě)預(yù)約記錄
sendAppointmentToMq(appointDetail);
return "預(yù)約成功";
} else {
if(errCode.longValue()==1) {
return "您" + errMsg + "預(yù)約過(guò)";
}else if(errCode.longValue()==2) {
return "網(wǎng)點(diǎn)尚未開(kāi)放預(yù)約,請(qǐng)耐心等待";
}else if(errCode.longValue()==3) {
return "庫(kù)存不足";
}
return "預(yù)約失敗";
}
}
/**
* 異步寫(xiě)預(yù)約記錄
* */
private void sendAppointmentToMq(AppointmentDetail appointDetail) {
String appointJson = JSON.toJSONString(appointDetail);
EventMessage eventMsg = new EventMessage();
eventMsg.setTopic("order");
eventMsg.setTag("newOrder");
eventMsg.setMsgBody(appointJson);
generalMqProducer.asyncPublish(eventMsg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("預(yù)約訂單入庫(kù)消息寫(xiě)入rocketmq成功,消息ID:{}", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
//如果與mq通信故障了,那么可以從日志文件里找到預(yù)約記錄,手工執(zhí)行寫(xiě)入mysql
log.error("預(yù)約訂單寫(xiě)入rocketmq失敗:{}, exception detail:{}" , appointJson , e.getMessage());
}
});
}
/**
* 返回從今天還是算往前5天的日期列表
* */
private List<String> dateList5(){
List<String> dates = new ArrayList<>();
LocalDate today = LocalDate.now();
for(int i=0; i<5; i++) {
dates.add(today.minusDays(i).toString());
}
return dates;
}
}
扣庫(kù)存、進(jìn)行5日內(nèi)已預(yù)約校驗(yàn)的redis lua腳本:
--[[
扣減redis庫(kù)存lua script
KEYS[1] 庫(kù)存key名稱(chēng),例如outlet:1:date:2021-10-01:stock
ARGV[1] 參數(shù),json字符串
buyNum表示一次扣多少庫(kù)存
idCard表示預(yù)約人身份證號(hào)
dates:從當(dāng)日開(kāi)始往前倒排5天的日期的一個(gè)列表{"2021-10-06","2021-10-05","2021-10-04","2021-10-03","2021-10-02"}
返回 {int, string}
0成功, 1已經(jīng)5天內(nèi)預(yù)約過(guò), 2網(wǎng)點(diǎn)尚未開(kāi)放, 3庫(kù)存不足
]]
local stock_key = KEYS[1]
local args = ARGV[1]
redis.log(redis.LOG_NOTICE, stock_key)
redis.log(redis.LOG_NOTICE, args)
local args_json = cjson.decode(args)
local buy_num = args_json.buyNum
local id_card = args_json.idCard
local dates = args_json.dates
--查詢(xún)?cè)撋矸葑C是否已預(yù)約過(guò), 5日內(nèi)
for _,v in pairs(dates) do
local is= redis.call("sismember", "appointment:idNos:" .. v, id_card)
if is==1 then
return {1, v} --返回在哪天預(yù)約過(guò)
end
end
--扣庫(kù)存
local current_stock = redis.call("get", stock_key)
--redis.log(redis.LOG_NOTICE, type(current_stock))
if not current_stock then
return {2, "該網(wǎng)點(diǎn)尚未正式開(kāi)放預(yù)約"}
end
if tonumber(current_stock) >= buy_num then
redis.call("set", stock_key, tonumber(current_stock) - buy_num) --庫(kù)存減去buy_num
redis.call("sadd", "appointment:idNos:" .. dates[1], id_card) --把身份證號(hào)寫(xiě)入當(dāng)日預(yù)約記錄
return {0, "ok"}
end
return {3, "庫(kù)存不足"} --庫(kù)存不足
異步服務(wù)批量處理一次從RocketMQ輪詢(xún)到的訂單、批量入庫(kù):
//注冊(cè)consumer,并使其訂閱相應(yīng)的topic、tag
private void registConsumer(MQMsgHandler msgHandler, String consumerGroup, String topic, String tag) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup, getAclRPCHook(), new AllocateMessageQueueAveragely());
try {
consumer.setNamesrvAddr(mqurl);
consumer.setConsumeThreadMin(consumeThreadCorePoolSize);
consumer.setConsumeThreadMax(consumeThreadCorePoolSize);
consumer.setPullBatchSize(32); //一次長(zhǎng)輪詢(xún)最多從mq里拿多少個(gè)消息,默認(rèn)32
consumer.subscribe(topic, tag);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
List<EventMessage> eventMsgs = new ArrayList<>();
String msgContent = null;
try {
for(MessageExt msg : msgs) {
msgContent = new String(msg.getBody(),"utf-8");
EventMessage eventMsg = JSON.parseObject(msgContent, EventMessage.class);
log.debug(JSON.toJSONString(eventMsg));
eventMsgs.add(eventMsg);
}
msgHandler.handleMsg(eventMsgs); //批量處理本次拉取的消息,執(zhí)行業(yè)務(wù)邏輯
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
log.error("消息編碼錯(cuò)誤:" + e.getMessage(), e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}catch(Exception e) {
log.error("注冊(cè)消費(fèi)者出錯(cuò)" + e.getMessage(), e);
}
}
批量入庫(kù):
@Slf4j
@Component
@MsgConsumer(consumerGroup = "newOrder-consumer-group", tag = "newOrder", topic = "order")
public class NewOrderMsgHandler implements MQMsgHandler{
@Autowired
private AppointmentDetailRepository appointmentDetailRepository;
@Override
public void handleMsg(List<EventMessage> eventMessages) {
log.debug("收到mq消息: {}", JSON.toJSONString(eventMessages));
List<AppointmentDetail> appointmentDetails = new ArrayList<>();
for(EventMessage eventMsg : eventMessages) {
AppointmentDetail appointmentDetail = JSON.parseObject(eventMsg.getMsgBody(), AppointmentDetail.class);
appointmentDetails.add(appointmentDetail);
}
appointmentDetailRepository.saveAll(appointmentDetails); //批量入庫(kù)
}
}
五、改進(jìn)與優(yōu)勢(shì)
相比V1版,相當(dāng)于把原來(lái)本地java內(nèi)存里的操作搬到redis上,然后一些個(gè)接口服務(wù)提前:由openresty調(diào)用redis,把一些業(yè)務(wù)邏輯直接在負(fù)載均衡層做掉。
V2版的優(yōu)勢(shì)還在于可以橫向擴(kuò)展算力來(lái)增加整體系統(tǒng)的性能。其實(shí)如果v1版單機(jī)承受范圍內(nèi)的話,直接讀寫(xiě)本地內(nèi)存不見(jiàn)得比v2版性能差、可能還略好。但是請(qǐng)求量再大的話,v1就沒(méi)辦法了單機(jī)算力配置是有限的,而v2的優(yōu)勢(shì)就發(fā)揮出來(lái)了、因?yàn)榭梢岳^續(xù)擴(kuò)容算力,也就是說(shuō)是可以橫向擴(kuò)展的架構(gòu)。