基于Redis實(shí)現(xiàn)消息隊(duì)列

基于Redis實(shí)現(xiàn)消息隊(duì)列

1.業(yè)務(wù)場(chǎng)景

假設(shè)在沒(méi)有專業(yè)消息中間件的情況下,又要通過(guò)消息隊(duì)列去解耦。redis是個(gè)更好的選擇。

2.實(shí)現(xiàn)方式

簡(jiǎn)要說(shuō)明實(shí)現(xiàn)方式,這里只做個(gè)大概的概括

  • 發(fā)布與訂閱(缺點(diǎn):典型的一對(duì)一,不支持多個(gè)消費(fèi)者公平消費(fèi)消息,消息無(wú)法持久化,如果出現(xiàn)網(wǎng)絡(luò)斷開(kāi)、Redis 宕機(jī)等,消息就會(huì)被丟棄等問(wèn)題)

  • list隊(duì)列(缺點(diǎn):沒(méi)有很好 ACK 機(jī)制,沒(méi)有 ConsumerGroup 消費(fèi)組,不支持一對(duì)多消費(fèi)等問(wèn)題)

  • stream隊(duì)列(推薦)官方:https://redis.io/docs/data-types/streams/

3.概念

Redis5.0帶來(lái)了Stream類型。其實(shí)就是Redis對(duì)消息隊(duì)列(MQ,Message Queue)的完善實(shí)現(xiàn)。

主要有幾個(gè)概念:

1.消費(fèi)者組(Consumer Group):一個(gè)消費(fèi)組有多個(gè)消費(fèi)者(Consumer), 這些消費(fèi)者之間是競(jìng)爭(zhēng)關(guān)系。也就是說(shuō)不會(huì)出現(xiàn)重復(fù)消費(fèi)的場(chǎng)景。

2.pending_ids :消費(fèi)者(Consumer)的狀態(tài)變量,作用是維護(hù)消費(fèi)者的未確認(rèn)的 id。 pending_ids 記錄了當(dāng)前已經(jīng)被客戶端讀取的消息,但是還沒(méi)有 ack (Acknowledge character:確認(rèn)字符)。

3.last_delivered_id :游標(biāo),每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo) last_delivered_id,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng)。

4.消息ID: 消息ID的形式是timestampInMillis-sequence,例如1527846880572-5

這里簡(jiǎn)要貼出Redis中Stream操作的相關(guān)指令

其實(shí)像代碼,都是基于命令的高度封裝

消息隊(duì)列相關(guān)命令:

  • XADD - 添加消息到末尾

  • XTRIM - 對(duì)流進(jìn)行修剪,限制長(zhǎng)度

  • XDEL - 刪除消息

  • XLEN - 獲取流包含的元素?cái)?shù)量,即消息長(zhǎng)度

  • XRANGE - 獲取消息列表,會(huì)自動(dòng)過(guò)濾已經(jīng)刪除的消息

  • XREVRANGE - 反向獲取消息列表,ID 從大到小

  • XREAD - 以阻塞或非阻塞方式獲取消息列表

消費(fèi)者組相關(guān)命令:

  • XGROUP CREATE - 創(chuàng)建消費(fèi)者組

  • XREADGROUP GROUP - 讀取消費(fèi)者組中的消息

  • XACK - 將消息標(biāo)記為"已處理"

  • XGROUP SETID - 為消費(fèi)者組設(shè)置新的最后遞送消息ID

  • XGROUP DELCONSUMER - 刪除消費(fèi)者

  • XGROUP DESTROY - 刪除消費(fèi)者組

  • XPENDING - 顯示待處理消息的相關(guān)信息

  • XCLAIM - 轉(zhuǎn)移消息的歸屬權(quán)

  • XINFO - 查看流和消費(fèi)者組的相關(guān)信息;

  • XINFO GROUPS - 打印消費(fèi)者組的信息;

  • XINFO STREAM - 打印流信息

4.代碼實(shí)現(xiàn)

stream相關(guān)配置,這里主要配置消費(fèi)組和消費(fèi)者相關(guān)信息,以及消息的監(jiān)聽(tīng)機(jī)制

@Slf4j
@Configuration
public class RedisStreamConfig {
    @Autowired
    private MyListener myListener;

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 實(shí)際生產(chǎn)環(huán)境中  我們應(yīng)該把消費(fèi)者組等信息  寫(xiě)入配置環(huán)境中 
     */
//    @Autowired
//    private StreamProperty streamProperty;

    /**
     * 收到消息后不自動(dòng)確認(rèn),需要用戶選擇合適的時(shí)機(jī)確認(rèn)
     * 當(dāng)某個(gè)消息被ACK,PEL列表就會(huì)減少
     * 如果忘記確認(rèn)(ACK),則PEL列表會(huì)不斷增長(zhǎng)占用內(nèi)存
     * 如果服務(wù)器發(fā)生意外,重啟連接后將再次收到PEL中的消息ID列表
     */
    @Bean
    public Subscription subscription(RedisConnectionFactory factory) {
        initGroup("mystream", "group1");
        // 創(chuàng)建Stream消息監(jiān)聽(tīng)容器配置
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                // 讀取超時(shí)時(shí)間
                .pollTimeout(Duration.ofSeconds(3))
                // 配置消息類型
                .targetType(String.class)
                // 異常處理器
                .errorHandler(t -> log.info("redis listener error", t))
                .build();
        // 創(chuàng)建Stream消息監(jiān)聽(tīng)容器
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options);
        // 設(shè)置消費(fèi)手動(dòng)提交配置
        Subscription subscription = listenerContainer.receive(
                // 設(shè)置消費(fèi)者分組和名稱
                Consumer.from("group1","consumer-1"),
                // 設(shè)置訂閱Stream的key和獲取偏移量,以及消費(fèi)處理類
                StreamOffset.create("mystream", ReadOffset.lastConsumed()),
                agendaListener);
        // 監(jiān)聽(tīng)容器啟動(dòng)
        listenerContainer.start();
        return subscription;
    }

    /**
     * 初始化分組
     */
    private void initGroup(String key, String group) {
        Boolean aBoolean = redisTemplate.hasKey(key);
        // 創(chuàng)建不存在的分組
        if (Boolean.FALSE.equals(aBoolean)) {
            redisTemplate.opsForStream().createGroup(key, group);
        }
    }

}

實(shí)現(xiàn)消息的監(jiān)聽(tīng)

@Slf4j
@Component
public class MyListener implements StreamListener<String, ObjectRecord<String, String>> {
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public void onMessage(ObjectRecord<String, String> record) {
        try {
            String value = record.getValue();
            log.info("stream name :{}, body:{}", record.getStream(), value);
            if (StrUtil.isBlank(value)) {
                return;
            }
            // todo 業(yè)務(wù)邏輯
            // 手動(dòng)確認(rèn)消息 如果不ack 消息就會(huì)進(jìn)入到pending隊(duì)列中 這個(gè)隊(duì)列都是維護(hù)消費(fèi)者的未確認(rèn)的消息
            redisTemplate.opsForStream().acknowledge("mystream", "group1", record.getId().getValue());
        } catch (Exception e) {
            log.error("error message:{}", e.getMessage());
        }
    }

}

這里說(shuō)一下消息體類型 Record 官方解釋:流中的單個(gè)條目,由條目 ID 和實(shí)際條目值(通常是字段值對(duì)的集合)組成

我們就是可以理解為消息體類型。Record接口,常用的就是

  • MapRecord(鍵值對(duì)類型)

  • ObjectRecord(對(duì)象類型)

測(cè)試

@PostMapping("/addStream")
public ResponseResult<String> addStream(){
    // 這里的消息體都是string類型
    ObjectRecord<String, String> record = StreamRecords.objectBacked("1234567").withStreamKey("mystream");
    // 這里是消息id,消息id在隊(duì)列里是唯一的
    RecordId recordId = stringRedisTemplate.opsForStream().add(record);
    // 裁剪隊(duì)列,因?yàn)殛?duì)列即使被消費(fèi)者消費(fèi)后任然不會(huì)刪除,所以我們隊(duì)列設(shè)定最大容量,也就是上面提到的 XTRIM  命令
    Long count = stringRedisTemplate.opsForStream().trim("mystream", 100000);
    System.out.println("trimCount" + count);
    if (recordId != null) {
        // 返回打印消息id
        return ResponseResult.success(recordId.getValue());
    }
    return ResponseResult.success();
}

基于redisson實(shí)現(xiàn)

相關(guān)消息監(jiān)聽(tīng)和消費(fèi)者配置同上

測(cè)試

RStream<Object, Object> stream = redissonClient.getStream("mystream", new SerializationCodec());
StreamAddArgs<Object, Object> entry = StreamAddArgs.entry("a","1");
stream.add(entry);
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容