Spring Cloud Stream 進階配置——使用延遲隊列實現(xiàn)“定時關(guān)閉超時未支付訂單”

ps: 本文所有代碼可在 這里 查看。

延遲隊列

延遲隊列 操作的對象是延遲消息,所謂 “延遲消息” 是指當(dāng)消息被發(fā)送以后,并不想讓消費者立刻消費消息,而是等待特定時間后,消費者才能拿到消息進行消費。

延遲隊列比較經(jīng)典的使用場景有:

  1. 在訂單系統(tǒng)中,用戶下單后,如果未在規(guī)定時間內(nèi)(比如30分鐘)支付,那么該訂單會被關(guān)閉,即自動取消訂單。
  2. 用戶希望通過手機遠程控制家里的智能設(shè)備在指定的時間進行工作。這時候可以將用戶指令發(fā)送到延遲隊列,當(dāng)指令時間到了,再將指令推送到智能設(shè)備。

基于 RabbitMQ 的延遲隊列

使用死信隊列實現(xiàn)延遲隊列

AMQP 協(xié)議中,或者 v3.5.8 之前的 RabbitMQ 本身并沒有直接支持延遲隊列功能,要想實現(xiàn)類似延遲隊列的功能,可以通過死信隊列的配合。即定義一組 ttl 為特定時長的隊列,比如:5秒,10秒,30秒,1分鐘等,然后再對這些隊列,分別定義死信隊列,當(dāng)消息過期時,就會轉(zhuǎn)存到相應(yīng)的死信隊列(即延遲隊列)中,這樣消費者根據(jù)業(yè)務(wù)自身的情況,分別選擇不同延遲等級的延遲隊列進行消費。

使用延遲消息交換機插件實現(xiàn)延遲隊列

上面介紹的延遲隊列實現(xiàn)方式,其實是比較繁瑣的,好在,在版本 v3.5.8之后,RabbitMQ 推出了一個延遲消息交換機插件:rabbitmq_delayed_message_exchange,當(dāng)啟用該插件后,如果有一個隊列聲明為延遲交換機,那么當(dāng)有消息發(fā)送到該交換機后,會根據(jù)延遲時長來決定投遞的順序,而如果延遲時長小于零,那么會立刻投遞到相應(yīng)的隊列。

第一種實現(xiàn)方式,不在本文的討論范圍,就不細說,下面將對第二種實現(xiàn)方式進行介紹。

ps:RabbitMQ 的版本最好是 3.6.x 及以上,Erlang/OTP 的版本要在 18.0 以上。

使用延遲消息交換機插件

下載插件

因為該插件默認是沒有在 RabbitMQ 的軟件包的 plugins 目錄下,需要自己下載然后放到 plugins 目錄下,下載地址如下:

下載下來后,解壓,然后拷貝到 plugins 目錄下,如果是通過 rpm 是方式安裝,目錄應(yīng)該是:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.15/plugins;如果是 Mac 用戶,且使用 brew 安裝,目錄則在:/usr/local/Cellar/rabbitmq/3.7.7/plugins

啟用插件

# 啟用插件 rabbitmq-delayed-message-exchange
rabbitmq-plugins enable rabbitmq-delayed-message-exchange

配合 Spring Cloud Stream 使用延遲交換機

首先來看一下延遲交換機如何配置:

spring:
  cloud:
    stream:
      bindings:
        delayedQueueOutput:
          destination: delayedQueueTopic
          content-type: application/json
          binder: rabbit

        delayedQueueInput:
          destination: delayedQueueTopic
          content-type: application/json
          group: ${spring.application.name}
          binder: rabbit

      rabbit:
        bindings:
          delayedQueueOutput:
            producer:
              delayedExchange: true # 是否將目標(biāo)exchange聲明為一個延遲消息交換機,默認false。即消息productor發(fā)布消息到延遲exchange后,延遲n長時間后才將消息推送到指定的queue中。 -RabbitMQ需要安裝/啟用插件: rabbitmq-delayed-message-exchange

          delayedQueueInput:
            consumer:
              delayedExchange: true # 是否將目標(biāo)exchange聲明為一個延遲消息交換機,默認false。即消息productor發(fā)布消息到延遲exchange后,延遲n長時間后才將消息推送到指定的queue中。 -RabbitMQ需要安裝/啟用插件: rabbitmq-delayed-message-exchange

重點關(guān)注2個配置:spring.cloud.stream.rabbit.bindings.ChannelName.producer.delayedExchangespring.cloud.stream.rabbit.bindings.ChannelName.consumer.delayedExchange。

這2個配置分別屬于生產(chǎn)者和消費者的配置,但都是用于告訴 Spring Cloud Stream 是否將交換機聲明為一個延遲消息交換機。這2個是成對出現(xiàn),如果少配置了一個,服務(wù)啟動時會報一個警告,下文會說明。

延遲消息交換機的相關(guān)配置就這么簡單,接下來通過測試用例來看一下效果。

ScasDelayedTest

@Data
@NoArgsConstructor
@AllArgsConstructor
public class DelayModel {

    /**
     * 延遲投遞的時長. 單位: ms
     */
    private long delay;

}
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("delayed")
@EnableBinding({ScasDelayedTest.MessageSink.class, ScasDelayedTest.MessageSource.class})
public class ScasDelayedTest {

    @Autowired
    private DelayedQueueProducer delayedQueueProducer;

    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 5; i++) {
            // 隨機延遲 3-8 秒
            long delay = RandomUtil.randomLong(3, 8) * 1000;
            delayedQueueProducer.publish(new DelayModel(delay));
        }

        Thread.sleep(1000000);
    }

    @Component
    public static class DelayedQueueProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(DelayModel model) {
            long delay = model.getDelay();
            Message<DelayModel> message = MessageBuilder.withPayload(model).setHeader("x-delay", delay).build();
            messageSource.delayedQueueOutput().send(message);
            log.info("發(fā)布延遲隊列消息: {}", model);
        }

    }

    @Component
    public static class DelayedQueueHandler {

        @StreamListener("delayedQueueInput")
        public void handle(DelayModel model) throws InterruptedException {
            log.info("消費延遲隊列的消息. model: [{}].", model);
        }

    }

    public interface MessageSink {

        @Input("delayedQueueInput")
        SubscribableChannel delayedQueueInput();

    }

    public interface MessageSource {

        @Output("delayedQueueOutput")
        MessageChannel delayedQueueOutput();

    }

}

上面的代碼很簡單,重點是在構(gòu)建消息時,比平常多了一個步驟,即 .setHeader("x-delay", delay),其中變量 delay 為該消息需要延遲多久才被消費。

很好理解,通過 setHeader 方法,對 Message 添加一個名為 x-delay 的頭部,對應(yīng)的值則為延遲時長,單位為 ms。當(dāng)該消息被投遞到延遲交換機后,獲取頭部 x-delay 的值,如果小于0,那么立即將消息路由到相應(yīng)的隊列被消費,如果大于0,則延遲對應(yīng)時間。

啟動 ScasDelayedTest

啟動測試用例后,控制臺會出現(xiàn)類似如下圖的輸出:


ScasDelayedTest

查看延遲投遞的消息數(shù)量

怎么查看延遲投遞的消息數(shù)量?可以在 RabbitMQ Management 的對應(yīng)交換機頁面查看,
http://localhost:15672/#/exchanges/%2F/delayedQueueTopic

延遲投遞的消息數(shù)量

ps: 為達到查看效果,可以適當(dāng)增加延遲時長。

使用延遲隊列實現(xiàn)“定時關(guān)閉超時未支付訂單”

上面簡單介紹了延遲交換機的使用方法,現(xiàn)在回到正題,如何使用延遲隊列來實現(xiàn) “定時關(guān)閉超時未支付訂單” 呢?

針對上面的場景,一般的思路是:定義一個定時任務(wù),比如每分鐘查詢一下訂單表,找出接下來1、2鐘內(nèi)需要關(guān)閉的訂單,然后再對每一筆訂單執(zhí)行 關(guān)閉訂單 操作,當(dāng)然在關(guān)閉之前需要再次確認訂單是否 “已支付”。

為了簡單,再通過一個測試用例來模擬一下具體場景。

ScasCloseUnpaidOrderTest

@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderModel {

    /**
     * 訂單id
     */
    private Long id;

    /**
     * 訂單失效時間
     */
    private Long expireTime;

    @Override
    public String toString() {
        return "OrderModel{" +
                "id=" + id +
                ", expireTime=" + TimeUtil.format(TimeUtil.toLocalDateTime(expireTime)) +
                '}';
    }
}
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("delayed")
@EnableBinding({ScasCloseUnpaidOrderTest.MessageSink.class, ScasCloseUnpaidOrderTest.MessageSource.class})
public class ScasCloseUnpaidOrderTest {

    @Autowired
    private CloseUnpaidOrderProducer closeUnpaidOrderProducer;

    @Test
    public void test() throws InterruptedException {

        // 模擬每分鐘的0秒執(zhí)行定時任務(wù)
        long toSleep = 60000 - System.currentTimeMillis() % 60000;
        Thread.sleep(toSleep);

        List<OrderModel> models = buildUnpaidOrderModel();
        for (OrderModel model : models) {
            closeUnpaidOrderProducer.publish(model);
        }

        Thread.sleep(1000000);

    }

    private List<OrderModel> buildUnpaidOrderModel() {

        long now = System.currentTimeMillis();

        List<OrderModel> models = new ArrayList<>(5);
        for (int i = 0; i < 5; i++) {

            long id = RandomUtil.randomLong(10000, 100000);
            // 模擬 訂單將在小于60s內(nèi)過期
            long expireTime = now + RandomUtil.randomLong(0, 60) * 1000;

            OrderModel model = new OrderModel();
            model.setId(id);
            model.setExpireTime(expireTime);
            models.add(model);
        }
        return models;
    }

    @Component
    public static class CloseUnpaidOrderProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(OrderModel model) {
            long now = System.currentTimeMillis();
            long delay = model.getExpireTime() - now;
            Message<OrderModel> message = MessageBuilder.withPayload(model).setHeader("x-delay", delay).build();
            messageSource.closeUnpaidOrderOutput().send(message);
            log.info("發(fā)布 [關(guān)閉超時未支付訂單] 消息. delay: {}, model: {}", delay, model);
        }

    }

    @Component
    public static class CloseUnpaidOrderHandler {

        private Random random = new Random();

        @StreamListener("closeUnpaidOrderInput")
        public void handle(OrderModel model) throws InterruptedException {

            log.info("檢查訂單狀態(tài), 關(guān)閉支付超時訂單. model: {}", model);

            if (isPaySuccess()) {
                log.info("訂單 [{}] 支付超時. 關(guān)閉訂單.", model.getId());
            } else {
                log.info("訂單 [{}] 支付完成.", model.getId());
            }
        }

        private boolean isPaySuccess() {
            // 模擬從支付系統(tǒng)查詢支付狀態(tài).
            return random.nextInt(10) % 3 == 0;
        }

    }

    public interface MessageSource {

        @Output("closeUnpaidOrderOutput")
        MessageChannel closeUnpaidOrderOutput();

    }

    public interface MessageSink {

        @Input("closeUnpaidOrderInput")
        SubscribableChannel closeUnpaidOrderInput();

    }

}

配置文件跟上一個測試用例基本一樣:

spring:
  cloud:
    stream:
      bindings:
        closeUnpaidOrderOutput:
          destination: closeUnpaidOrderTopic
          content-type: application/json
          binder: rabbit

        closeUnpaidOrderInput:
          destination: closeUnpaidOrderTopic
          content-type: application/json
          group: ${spring.application.name}
          binder: rabbit

      rabbit:
        bindings:
          closeUnpaidOrderOutput:
            producer:
              delayedExchange: true # 是否將目標(biāo)exchange聲明為一個延遲消息交換機,默認false。即消息productor發(fā)布消息到延遲exchange后,延遲n長時間后才將消息推送到指定的queue中。 -RabbitMQ需要安裝/啟用插件: rabbitmq-delayed-message-exchange

          closeUnpaidOrderInput:
            consumer:
              delayedExchange: true # 是否將目標(biāo)exchange聲明為一個延遲消息交換機,默認false。即消息productor發(fā)布消息到延遲exchange后,延遲n長時間后才將消息推送到指定的queue中。 -RabbitMQ需要安裝/啟用插件: rabbitmq-delayed-message-exchange

啟動 ScasCloseUnpaidOrderTest

啟動后,可以看到控制臺有類似輸出:


ScasCloseUnpaidOrderTest

相信上面的代碼對應(yīng)各位看官來說,理解起來肯定是毫無壓力的,這里就不在贅述。

相關(guān)鏈接

https://www.rabbitmq.com/community-plugins.html
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

推薦閱讀

Spring Cloud Stream 進階配置——高可用(二)——死信隊列
Spring Cloud Stream 進階配置——高可用(一)——失敗重試
Spring Cloud Stream 進階配置——高吞吐量(三)——批量預(yù)取消息(prefetch)
Spring Cloud Stream 進階配置——高吞吐量(二)——彈性消費者數(shù)量
Spring Cloud Stream 進階配置——高吞吐量(一)——多消費者

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容