RocketMQ啟動(dòng)和消息收發(fā)源碼分析

前言

簡(jiǎn)單介紹一下RocketMQ的背景,RocketMQ是阿里開源的消息中間件,根據(jù)官網(wǎng)描述,RocketMQ其實(shí)是阿里發(fā)現(xiàn)ActiveMQ和Kafka無(wú)法滿足業(yè)務(wù)需求才開發(fā)的。其中很多思想有參考其他消息中間件,具體的幾個(gè)消息中間件之間的功能特點(diǎn)比較可以參考apache rocketMQ的官方文檔,這里就不贅述了。

RocketMQ在官網(wǎng)的整體架構(gòu)如下:


圖片來源于apache官方文檔

可以看到,RocketMQ主要包含四大組件:NameServer、Broker、Producer、Consumer。每一個(gè)組件都支持水平擴(kuò)展以預(yù)防單點(diǎn)故障。NameServer可以簡(jiǎn)單理解為注冊(cè)中心,可用于路由發(fā)現(xiàn)和集群管理;Broker則是我們通常意義上的服務(wù)端,負(fù)責(zé)消息的轉(zhuǎn)發(fā)、存儲(chǔ)、搜索等功能;Producer和Consumer屬于客戶端,負(fù)責(zé)消息的發(fā)送和接收。

大多數(shù)情況下,我們對(duì)于RocketMQ的使用停留在服務(wù)搭建和簡(jiǎn)單的消息收發(fā)上(如果服務(wù)搭建由運(yùn)維完成的話,這一步也省掉了)。那RocketMQ在NameServer和Broker啟動(dòng)的時(shí)候做了什么?Producer又是如何將消息發(fā)送到Broker?Consumer的消費(fèi)流程呢?

本文將基于RocketMQ的4.5.0版本源碼,以其中官方提供的example為例,分析一下RocketMQ服務(wù)啟動(dòng)和消息發(fā)送和接收的流程。

NameServer啟動(dòng)

nameServer的啟動(dòng)入口源碼如下,兩步走,第一步創(chuàng)建控制器,第二步啟動(dòng)控制器

public static void main(String[] args) {
    main0(args);
}

public static NamesrvController main0(String[] args) {

    try {
        NamesrvController controller = createNamesrvController(args);
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

創(chuàng)建控制器

看一下創(chuàng)建控制器的源碼,有幾個(gè)點(diǎn)需要注意一下:

  1. 默認(rèn)會(huì)讀取ROCKETMQ_HOME系統(tǒng)變量,如果未設(shè)置,需要手工指定或設(shè)置,否則無(wú)法啟動(dòng)
  2. 可以看到NameServer配置來源有三種:默認(rèn)、配置文件、啟動(dòng)項(xiàng)指定,優(yōu)先級(jí)依次上升(因?yàn)楹竺娴呐渲脮?huì)覆蓋前面的配置)
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    // 設(shè)置版本號(hào)為系統(tǒng)屬性
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    //PackageConflictDetect.detectFastjson();

    // 命令行參數(shù)解析的過程
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {
        System.exit(-1);
        return null;
    }
    // 初始化NameServer的配置,關(guān)鍵的幾個(gè)默認(rèn)配置見$1
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    // 初始化NettyServer的配置,因?yàn)镽ocketMQ各個(gè)組件是使用Netty通信的,主要的默認(rèn)配置見$2
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    // NameServer默認(rèn)的監(jiān)聽端口是9876
    nettyServerConfig.setListenPort(9876);
    // 如果通過 -c 的選項(xiàng)指定了配置配置,會(huì)把文件中對(duì)應(yīng)的屬性讀取到NameServer和NettyServer的配置對(duì)象中
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }
    // 如果指定了 -p 選項(xiàng),會(huì)打印NameServer和NettyServer的配置,然后正常退出
    if (commandLine.hasOption('p')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }

    // 如果啟動(dòng)時(shí)指定了某個(gè)配置,那放入NameServer的配置對(duì)象
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }
    // 初始化logback的配置
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
    // 使用NameServer和NettyServer的配置初始化控制器,同時(shí)會(huì)初始化控制器中一些其他屬性,如KVConfigManager、RouteInfoManager、BrokerHousekeepingService等
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

    // remember all configs to prevent discard
    // 將所有配置信息注冊(cè)到一個(gè)大的Configuration對(duì)象中
    // 其實(shí)在NamesrvController構(gòu)造方法中,NameServer和NettyServer的配置已經(jīng)注冊(cè)到Configuration了,而properties包含其中,不知道為什么要再次注冊(cè)一遍
    controller.getConfiguration().registerConfig(properties);

    return controller;
}

$1 默認(rèn)的NameServer配置

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
// kv配置文件路徑
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
// 全局配置文件路徑
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
// 默認(rèn)不開啟順序消息
private boolean orderMessageEnable = false;

$2 默認(rèn)的NettyServer配置

// netty的監(jiān)聽端口(根據(jù)代碼,會(huì)被9876的默認(rèn)值覆蓋)
private int listenPort = 8888;
// Netty默認(rèn)事件處理線程池,處理如broker注冊(cè),topic路由信息查詢、topic刪除等
private int serverWorkerThreads = 8;
// Netty服務(wù)異步回調(diào)線程池線程數(shù)量
private int serverCallbackExecutorThreads = 0;
// Selector線程數(shù)量
private int serverSelectorThreads = 3;
// 控制單向的信號(hào)量
private int serverOnewaySemaphoreValue = 256;
// 控制異步信號(hào)量
private int serverAsyncSemaphoreValue = 64;
// 服務(wù)空閑心跳檢測(cè)時(shí)間間隔 單位秒
private int serverChannelMaxIdleTimeSeconds = 120;
// 發(fā)送緩沖區(qū)大小
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
// 接受緩沖區(qū)大小
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
// 是否使用Netty內(nèi)存池
private boolean serverPooledByteBufAllocatorEnable = true;
// 是否啟用Epoll IO模型,Linux環(huán)境建議開啟
private boolean useEpollNativeSelector = false;

接著就是控制器的啟動(dòng)了,start方法如下,==省略了一些基礎(chǔ)校驗(yàn)和簡(jiǎn)單邏輯==,其中最重要的就是controller.initialize()方法

public static NamesrvController start(final NamesrvController controller) throws Exception {
    // controller初始化
    boolean initResult = controller.initialize();
    // controller初始化失敗就退出
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }

    // 注冊(cè)鉤子函數(shù),系統(tǒng)正常退出會(huì)shutdown掉controller
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            controller.shutdown();
            return null;
        }
    }));
    // 其實(shí)就是控制器內(nèi)部remotingServer的啟動(dòng),同時(shí)異步起一個(gè)文件監(jiān)聽服務(wù),注冊(cè)監(jiān)聽器,詳見$5
    controller.start();

    return controller;
}

這里有幾個(gè)關(guān)鍵點(diǎn):

  1. 請(qǐng)求處理器有兩種,DefaultRequestProcessor和ClusterTestRequestProcessor,由NameServer中的clusterTest字段決定,ClusterTestRequestProcessor繼承自DefaultRequestProcessor,唯一的不同就是無(wú)法從本地讀取路由信息時(shí)會(huì)從集群中讀取
  2. 這里創(chuàng)建了很多的線程池,具體每個(gè)線程池的作用見$6
public boolean initialize() {
    // 如果kvConfigPath路徑下存在相關(guān)文件,會(huì)把文件加載到kvConfigManager的配置表中
    this.kvConfigManager.load();
    // 初始化remotingServer,包含了很多nettyServer的相關(guān)配置,詳見$3
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    // 創(chuàng)建一個(gè)固定大小的線程池,就是Netty中的work線程池,用于處理請(qǐng)求的
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    // 這里會(huì)注冊(cè)一個(gè)默認(rèn)的處理器DefaultRequestProcessor,綁定上面創(chuàng)建的線程池,具體能處理哪些請(qǐng)求見$4
    this.registerProcessor();
    // 定時(shí)掃描Broker信息并清除失效的Broker路由信息,周期是10秒
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
    // 定時(shí)日志輸出kv配置信息,周期10秒
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);
    // 如果沒有關(guān)閉TLS,那么會(huì)開啟一個(gè)異步文件監(jiān)聽線程,監(jiān)聽TLS相關(guān)配置文件的變化,默認(rèn)可選的
    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
        // Register a listener to reload SslContext
        // 略
    }

    return true;
}

$3 NettyRemotingServer構(gòu)造方法

這里的NettyRemotingServer可以認(rèn)為就是Netty中的Server,內(nèi)部其實(shí)初始化的基本都是Netty的配置。

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
    final ChannelEventListener channelEventListener) {
    super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
    this.serverBootstrap = new ServerBootstrap();
    this.nettyServerConfig = nettyServerConfig;
    this.channelEventListener = channelEventListener;
    // 前面的默認(rèn)配置是0,如果未曾手動(dòng)設(shè)置,這里會(huì)設(shè)置為4
    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }
    // 新建一個(gè)公用的線程池,如果某種消息處理沒有注冊(cè)其他線程則會(huì)使用這個(gè)
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        // 略
        }
    });
    // 根據(jù)是否啟用epoll來初始化Netty中的EventLoopGroup
    if (useEpoll()) {
        this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
            // 略
        });

        this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            // 略
        });
    } else {
        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
            // 略
        });

        this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            // 略
        });
    }
    // 加載TLS的上下文信息
    loadSslContext();
}

$4 DefaultRequestProcessor對(duì)請(qǐng)求的處理

RemotingCommand有一個(gè)code字段用來區(qū)分命令類型,可以看到,DefaultRequestProcessor針對(duì)不同的RemotingCommand有不同的處理(這里只是一部分命令類型,還有其他命令由其他處理器處理)。常量的名稱比較直觀,就不一一細(xì)說了。==注意,這里用的線程池就是NamesrvController.remotingExecutor==

public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {

    if (ctx != null) {
        log.debug("receive request, {} {} {}",
            request.getCode(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            request);
    }

    switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        case RequestCode.QUERY_DATA_VERSION:
            return queryBrokerTopicConfig(ctx, request);
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            break;
    }
    return null;
}

$5 remotingServer的啟動(dòng)(start方法)

public void start() {
    // 創(chuàng)建線程池處理特定的channel事件(編解碼、空閑檢測(cè)、連接管理等,查看下面具體的Handler)
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
            }
        });
    // 設(shè)置Netty啟動(dòng)類的一些參數(shù)
    ServerBootstrap childHandler =
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .option(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                            new HandshakeHandler(TlsSystemConfig.tlsMode))
                        .addLast(defaultEventExecutorGroup,
                            new NettyEncoder(),
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                            new NettyConnectManageHandler(),
                            new NettyServerHandler()
                        );
                }
            });
    // 內(nèi)存池的配置
    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    try {
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        this.port = addr.getPort();
    } catch (InterruptedException e1) {
        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }
    // 這里nettyEventExecutor可以認(rèn)為是一個(gè)線程
    // 會(huì)有專門的監(jiān)聽器處理channel事件,這里的監(jiān)聽器就是BrokerHousekeepingService,用來處理Broker的上下線
    // 這里處理的隊(duì)列任務(wù)來源就是NettyConnectManageHandler,NettyConnectManageHandler會(huì)針對(duì)不同的channel事件將不同任務(wù)放入nettyEventExecutor的隊(duì)列
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }

    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                // 定時(shí)掃描和釋放無(wú)效的responseFuture
        // responseFuture是一個(gè)響應(yīng)的占位符,如果超時(shí)未收到response則清除這個(gè)占位符
        // 但是依然會(huì)執(zhí)行響應(yīng)綁定的回調(diào)函數(shù)(可能含有超時(shí)的處理邏輯)
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

$6 每個(gè)線程池的作用(包含Netty線程池)

// 線程數(shù)NettyServerConfig.serverWorkerThreads,處理Broker注冊(cè)、kv配置增刪等事件
NamesrvController.remotingExecutor
// 線程數(shù)NettyServerConfig.serverCallbackExecutorThreads,從代碼來看是Broker處理的時(shí)候會(huì)用到,如果某個(gè)業(yè)務(wù)處理器未關(guān)聯(lián)線程池就會(huì)用這個(gè)
NettyRemotingServer.publicExecutor
// 線程數(shù)1,Netty中的Boss線程池
NettyRemotingServer.eventLoopGroupBoss
// 線程數(shù)NettyServerConfig.serverSelectorThreads,處理channel IO事件
NettyRemotingServer.eventLoopGroupSelector
// 線程數(shù)NettyServerConfig.serverWorkerThreads,業(yè)務(wù)處理線程,很多ChannelHandler綁定的線程池,具體命令處理還會(huì)往后提交給其他線程池
NettyRemotingServer.defaultEventExecutorGroup

整個(gè)NameServer的啟動(dòng)流程如下:

image

未完待續(xù)。。。

Broker啟動(dòng)

消費(fèi)者啟動(dòng)

生產(chǎn)者啟動(dòng)

生產(chǎn)者發(fā)送消息

Broker處理消息

消費(fèi)者消費(fèi)消息

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容