基于Redis實(shí)現(xiàn)消息隊(duì)列
1.業(yè)務(wù)場(chǎng)景
假設(shè)在沒(méi)有專業(yè)消息中間件的情況下,又要通過(guò)消息隊(duì)列去解耦。redis是個(gè)更好的選擇。
2.實(shí)現(xiàn)方式
簡(jiǎn)要說(shuō)明實(shí)現(xiàn)方式,這里只做個(gè)大概的概括
發(fā)布與訂閱(缺點(diǎn):典型的一對(duì)一,不支持多個(gè)消費(fèi)者公平消費(fèi)消息,消息無(wú)法持久化,如果出現(xiàn)網(wǎng)絡(luò)斷開(kāi)、Redis 宕機(jī)等,消息就會(huì)被丟棄等問(wèn)題)
list隊(duì)列(缺點(diǎn):沒(méi)有很好 ACK 機(jī)制,沒(méi)有 ConsumerGroup 消費(fèi)組,不支持一對(duì)多消費(fèi)等問(wèn)題)
stream隊(duì)列(推薦)官方:https://redis.io/docs/data-types/streams/
3.概念
Redis5.0帶來(lái)了Stream類型。其實(shí)就是Redis對(duì)消息隊(duì)列(MQ,Message Queue)的完善實(shí)現(xiàn)。
主要有幾個(gè)概念:
1.消費(fèi)者組(Consumer Group):一個(gè)消費(fèi)組有多個(gè)消費(fèi)者(Consumer), 這些消費(fèi)者之間是競(jìng)爭(zhēng)關(guān)系。也就是說(shuō)不會(huì)出現(xiàn)重復(fù)消費(fèi)的場(chǎng)景。
2.pending_ids :消費(fèi)者(Consumer)的狀態(tài)變量,作用是維護(hù)消費(fèi)者的未確認(rèn)的 id。 pending_ids 記錄了當(dāng)前已經(jīng)被客戶端讀取的消息,但是還沒(méi)有 ack (Acknowledge character:確認(rèn)字符)。
3.last_delivered_id :游標(biāo),每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo) last_delivered_id,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng)。
4.消息ID: 消息ID的形式是timestampInMillis-sequence,例如1527846880572-5
這里簡(jiǎn)要貼出Redis中Stream操作的相關(guān)指令
其實(shí)像代碼,都是基于命令的高度封裝
消息隊(duì)列相關(guān)命令:
XADD - 添加消息到末尾
XTRIM - 對(duì)流進(jìn)行修剪,限制長(zhǎng)度
XDEL - 刪除消息
XLEN - 獲取流包含的元素?cái)?shù)量,即消息長(zhǎng)度
XRANGE - 獲取消息列表,會(huì)自動(dòng)過(guò)濾已經(jīng)刪除的消息
XREVRANGE - 反向獲取消息列表,ID 從大到小
XREAD - 以阻塞或非阻塞方式獲取消息列表
消費(fèi)者組相關(guān)命令:
XGROUP CREATE - 創(chuàng)建消費(fèi)者組
XREADGROUP GROUP - 讀取消費(fèi)者組中的消息
XACK - 將消息標(biāo)記為"已處理"
XGROUP SETID - 為消費(fèi)者組設(shè)置新的最后遞送消息ID
XGROUP DELCONSUMER - 刪除消費(fèi)者
XGROUP DESTROY - 刪除消費(fèi)者組
XPENDING - 顯示待處理消息的相關(guān)信息
XCLAIM - 轉(zhuǎn)移消息的歸屬權(quán)
XINFO - 查看流和消費(fèi)者組的相關(guān)信息;
XINFO GROUPS - 打印消費(fèi)者組的信息;
XINFO STREAM - 打印流信息
4.代碼實(shí)現(xiàn)
stream相關(guān)配置,這里主要配置消費(fèi)組和消費(fèi)者相關(guān)信息,以及消息的監(jiān)聽(tīng)機(jī)制
@Slf4j
@Configuration
public class RedisStreamConfig {
@Autowired
private MyListener myListener;
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 實(shí)際生產(chǎn)環(huán)境中 我們應(yīng)該把消費(fèi)者組等信息 寫(xiě)入配置環(huán)境中
*/
// @Autowired
// private StreamProperty streamProperty;
/**
* 收到消息后不自動(dòng)確認(rèn),需要用戶選擇合適的時(shí)機(jī)確認(rèn)
* 當(dāng)某個(gè)消息被ACK,PEL列表就會(huì)減少
* 如果忘記確認(rèn)(ACK),則PEL列表會(huì)不斷增長(zhǎng)占用內(nèi)存
* 如果服務(wù)器發(fā)生意外,重啟連接后將再次收到PEL中的消息ID列表
*/
@Bean
public Subscription subscription(RedisConnectionFactory factory) {
initGroup("mystream", "group1");
// 創(chuàng)建Stream消息監(jiān)聽(tīng)容器配置
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
// 讀取超時(shí)時(shí)間
.pollTimeout(Duration.ofSeconds(3))
// 配置消息類型
.targetType(String.class)
// 異常處理器
.errorHandler(t -> log.info("redis listener error", t))
.build();
// 創(chuàng)建Stream消息監(jiān)聽(tīng)容器
StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options);
// 設(shè)置消費(fèi)手動(dòng)提交配置
Subscription subscription = listenerContainer.receive(
// 設(shè)置消費(fèi)者分組和名稱
Consumer.from("group1","consumer-1"),
// 設(shè)置訂閱Stream的key和獲取偏移量,以及消費(fèi)處理類
StreamOffset.create("mystream", ReadOffset.lastConsumed()),
agendaListener);
// 監(jiān)聽(tīng)容器啟動(dòng)
listenerContainer.start();
return subscription;
}
/**
* 初始化分組
*/
private void initGroup(String key, String group) {
Boolean aBoolean = redisTemplate.hasKey(key);
// 創(chuàng)建不存在的分組
if (Boolean.FALSE.equals(aBoolean)) {
redisTemplate.opsForStream().createGroup(key, group);
}
}
}
實(shí)現(xiàn)消息的監(jiān)聽(tīng)
@Slf4j
@Component
public class MyListener implements StreamListener<String, ObjectRecord<String, String>> {
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public void onMessage(ObjectRecord<String, String> record) {
try {
String value = record.getValue();
log.info("stream name :{}, body:{}", record.getStream(), value);
if (StrUtil.isBlank(value)) {
return;
}
// todo 業(yè)務(wù)邏輯
// 手動(dòng)確認(rèn)消息 如果不ack 消息就會(huì)進(jìn)入到pending隊(duì)列中 這個(gè)隊(duì)列都是維護(hù)消費(fèi)者的未確認(rèn)的消息
redisTemplate.opsForStream().acknowledge("mystream", "group1", record.getId().getValue());
} catch (Exception e) {
log.error("error message:{}", e.getMessage());
}
}
}
這里說(shuō)一下消息體類型 Record 官方解釋:流中的單個(gè)條目,由條目 ID 和實(shí)際條目值(通常是字段值對(duì)的集合)組成
我們就是可以理解為消息體類型。Record接口,常用的就是
MapRecord(鍵值對(duì)類型)
ObjectRecord(對(duì)象類型)
測(cè)試
@PostMapping("/addStream")
public ResponseResult<String> addStream(){
// 這里的消息體都是string類型
ObjectRecord<String, String> record = StreamRecords.objectBacked("1234567").withStreamKey("mystream");
// 這里是消息id,消息id在隊(duì)列里是唯一的
RecordId recordId = stringRedisTemplate.opsForStream().add(record);
// 裁剪隊(duì)列,因?yàn)殛?duì)列即使被消費(fèi)者消費(fèi)后任然不會(huì)刪除,所以我們隊(duì)列設(shè)定最大容量,也就是上面提到的 XTRIM 命令
Long count = stringRedisTemplate.opsForStream().trim("mystream", 100000);
System.out.println("trimCount" + count);
if (recordId != null) {
// 返回打印消息id
return ResponseResult.success(recordId.getValue());
}
return ResponseResult.success();
}
基于redisson實(shí)現(xiàn)
相關(guān)消息監(jiān)聽(tīng)和消費(fèi)者配置同上
測(cè)試
RStream<Object, Object> stream = redissonClient.getStream("mystream", new SerializationCodec());
StreamAddArgs<Object, Object> entry = StreamAddArgs.entry("a","1");
stream.add(entry);