RocketMQ 代碼示例 - 批量發(fā)送

批量發(fā)送可以提高發(fā)送性能,但有一定的限制:

  • topic 相同
  • waitStoreMsgOK 相同
  • 不支持延時(shí)發(fā)送
  • 一批消息的大小不能大于 1M

大小限制需要特殊注意,因?yàn)橄⑹莿?dòng)態(tài)的,不注意的話就可能超限,就會(huì)報(bào)錯(cuò):

org.apache.rocketmq.client.exception.MQClientException: CODE: 13  DESC: the message body size over max value, MAX: 4194304

解決辦法也比較簡(jiǎn)單,如果一批消息超過了1M,可以進(jìn)行分割,切成多塊兒,分塊兒發(fā)送。

分割幫助類:

package com.rocketmq.test;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.rocketmq.common.message.Message;

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
            this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {
                if (nextIndex - currIndex == 0) {
                   nextIndex++;  
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
    
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

生產(chǎn)者中的用法:

package com.rocketmq.test;

import java.util.ArrayList;
import java.util.List;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class BatchSplitProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        for (int i = 0; i < 100000; i++) {
            messages.add(new Message(topic, "TagA", "OrderID001", ("[Hello world big list] item:" + i).getBytes()));
        }
        
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
           try {
               List<Message>  listItem = splitter.next();
               producer.send(listItem);
           } catch (Exception e) {
               e.printStackTrace();
           }
        }
        producer.shutdown();
    }
}

消費(fèi)者還一樣:

package com.rocketmq.test;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

public class BatchConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchConsumer");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("BatchTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
                    final ConsumeConcurrentlyContext context) {
                for(int i=0; i<msgs.size(); i++) {
                    MessageExt msg = msgs.get(i);
                    String str = new String(msg.getBody());
                    System.out.println("===" + str);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

運(yùn)行:

$ mvn exec:java -Dexec.mainClass="com.rocketmq.test.BatchSplitProducer"
$ mvn exec:java -Dexec.mainClass="com.rocketmq.test.BatchConsumer"
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容