RocketMQ从源码到跑路

源码的环境搭建和前期准备

1.源码的获取

可以在RocketMQ的官方Git仓库地址下载源码或者,也可以到RocketMQ的官方网站上下载指定版本的源码,以下为跳转地址:

image-20220425172005044

下载后就可以解压导入到IDEA中进行解读了。我是下载当时最新的版本

源码下很多的功能模块,很容易让人迷失方向,我们只关注下几个最为重要的模块:

  • broker: broker 模块(broke 启动进程)
  • client :消息客户端,包含消息生产者、消息消费者相关类
  • example: RocketMQ 例代码
  • namesrv:NameServer实现相关类(NameServer启动进程)
  • store:消息存储实现相关类

各个模块的功能大都从名字上就能看懂。我们可以在有需要的时候再进去看源码。但是这些模块有些东西还是要关注的。例如docs文件夹下的文档,以及各个模块下都有非常丰富的junit测试代码,这些都是非常有用的

2.源码的注释和流程图的获取

RocketMQ的源码中有个非常让人头疼的事情,就是他的代码注释几乎没有。本blog的注释版本已经传入代码仓库,有需要的可以自行下载。

3.源码的调试和配置文件的准备

将源码导入IDEA后,需要先对源码进行编译。编译指令 clean install -Dmaven.test.skip=true

image-20220425172449805

编译完成后就可以开始调试代码了。调试时需要按照以下步骤:调试时,先在项目目录下创建一个conf目录,并从distribution拷贝broker.conflogback_broker.xmllogback_namesrv.xml,同时要在conf/broker.conf修改好自己的环境变量信息

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

# 自动创建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=192.168.230.133:9876;192.168.230.134:9876;192.168.230.135:9876
#存储路径
storePathRootDir=/work/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/work/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/work/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/work/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/work/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/work/rocketmq/store/abort

4.启动参数的设置

展开namesrv模块,运行NamesrvStartup类即可启动NameServer启动时,会报错,提示需要配置一个ROCKETMQ_HOME环境变量。这个环境变量我们可以在机器上配置,跟配置JAVA_HOME环境变量一样。也可以在IDEA的运行环境中配置。目录指向源码目录即可。

image-20220425173004265

ROCKETMQ_HOME=F:\MQ\rocketMQsrc\rocketmq-all-4.9.3

配置完成后,再次执行,看到以下日志内容,表示NameServer启动成功The Name Server boot success. serializeType=JSON

然后Broker的启动类是broker模块下的BrokerStartup。启动Broker时,同样需要ROCETMQ_HOME环境变量,并且还需要配置一个-c 参数,指向broker.conf配置文件。

image-20220425173152795

-c F:\MQ\rocketMQsrc\rocketmq-all-4.9.3\conf\broker.conf
ROCKETMQ_HOME=F:\MQ\rocketMQsrc\rocketmq-all-4.9.3

然后重新启动,即可启动Broker。

5.案例NameSrvAddr的配置

生产者

在源码的example模块下,提供了非常详细的测试代码。例如我们启动example模块下的org.apache.rocketmq.example.quickstart.Producer类即可发送消息。但是在测试源码中,需要指定NameServer地址。这个NameServer地址有两种指定方式,一种是配置一个NAMESRV_ADDR的环境变量。另一种是在源码中指定。我们可以在源码中加一行代码指定NameServer

producer.setNamesrvAddr("127.0.0.1:9876");

消费者

我们可以使用同一模块下的org.apache.rocketmq.example.quickstart.Consumer类来消费消息。运行时同样需要指定NameServer地址

consumer.setNamesrvAddr("192.168.232.128:9876");

NameServer的源码剖析

NameServer的启动入口为NamesrvStartup类的main方法,我们可以进行逐步调试。这次看源码,我们不要太过陷入其中的细节,我们的目的是先搞清楚NameServer的大体架构。从之前的介绍中,我们已经了解到,在RocketMQ中,实际进行消息存储、推送等核心功能的是Broker。那NameServer具体做什么用呢?

一是维护Broker的服务地址并进行及时的更新。
二是给Producer和Consumer提供服务获取Broker列表。

整体流程

image-20220425174422914

整个NameServer的核心就是一个NamesrvController对象。这个controller对象就跟java Web开发中的Controller功能类似,都是响应客户端请求的。

NamesrvController controller = createNamesrvController(args);

在创建NamesrvController对象时,有两个关键的配置文件NamesrvConfig这个是NameServer自己运行需要的配置信息,还一个NettyServerConfig包含Netty服务端的配置参数,固定的占用了9876端口。

final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);

比较有意思的是这个9876端口并没有提供覆盖的方法

NamesrvConfig类中 主要存放一些算核心配置,但又不算核心配置的配置信息,比如RocketMQ的主目录、RocketMQ核心配置的文件路径kvConfigPath、环境名称等等。

而在NettyServerConfig类中,预先配置好了有关于Netty的网络配置信息:

//端口号,默认是8888,但是启动时被默认覆盖成了9876
    private int listenPort = 8888;
    //Netty工作线程数
    private int serverWorkerThreads = 8;
    //Netty的public线程池的线程数,默认是0
    private int serverCallbackExecutorThreads = 0;
    //Netty的IO线程池线程数量。主要负责处理网络请求,解析请求包,再转发到各个业务线程池。最后返回结果
    private int serverSelectorThreads = 3;
    //Broker端的两个配置参数
    private int serverOnewaySemaphoreValue = 256;
    private int serverAsyncSemaphoreValue = 64;
    //网络连接最大空闲时间
    private int serverChannelMaxIdleTimeSeconds = 120;
    //网络Socket发送缓冲区大小
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    //接收端缓存区大小
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    //写缓冲区高低水位线的设置
    private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
    private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
    //设置socket套接字备份日志的大小 默认1024
    private int serverSocketBacklog = NettySystemConfig.socketBacklog;
    //是否启用Epoll模型
    private boolean serverPooledByteBufAllocatorEnable = true;

在注入了两份默认配置config后和设置了9876的端口后,还可以接受 -c-p 的指定设置参数的逻辑功能.

//-c:指定配置文件
 if (commandLine.hasOption('c')){...}
 //-p:指定属性值
 if (commandLine.hasOption('p')){...}

再之后就是创建NamesrvController,并将上面的几个config作为参数传入。

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

为什么前面说整个NameServer的核心就是一个NamesrvController对象呢?结合NameServer的主要功能,在看看NamesrvController代码,我们可以发现在这里就完成了路由的相关注入和borker信息的注入!

public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }

至此,nameServercontroller的创建的源码大致就讲完了!接下来就看下start(controller);

public static NamesrvController start(final NamesrvController controller) throws Exception {
        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }
        // init 定时任务
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        // 服务关闭钩子
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));
        //启动服务
        controller.start();
        return controller;
    }

里面到底具体做了什么吧。总的来说主要是几个步骤:

  1. controller.initialize()

    从名字自上看主要是控制器的初始化,那么具体是初始化什么呢?看看部分代码吧。

    //加载KV配置
            this.kvConfigManager.load();
            //处理netty server网络处理请求的对象
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
            //Netty服务器的工作线程池
            this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
            //注册Processor,把remotingExecutor注入到remotingServer中
            this.registerProcessor();
             //开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker
            this.scheduledExecutorService.scheduleAtFixedRate(....)
            //开启定时任务:每隔10min打印一次KV配置
            this.scheduledExecutorService.scheduleAtFixedRate(....)
    

    从代码中我们看到了 有一个步骤是创建了一个处理器,哦吼有点东西,在这个处理器里面有个一个remotingServer

    ,它的主要作用就是处理netty的网络请求。和broker有点关系

    private void registerProcessor() {
            if (namesrvConfig.isClusterTest()) {
                //测试集群,先不用管
                this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                    this.remotingExecutor);
            } else {
                //NettyServer接收到的网络请求,就会由这个组件来处理。
                this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
            }
        }
    
  2. controller.start()

    start方法里面主要的是启动remotingServer,但是这个server是什么鬼呢?我们可以发现它是一个接口,通过接口实现我们可以到一个新大陆:clientserver ,在想想nameServer的作用,既然要为生产者和消费者提供broker的信息,那么肯定要通信的对吧,通行就有两个端了呗~ 这个就是用来响应请求的。

    this.remotingServer.start();
    

image-20220425200427119

  1. addShutdownHook()controller.shutdown()

    自己看源码注释吧,具体的不知道,就是和JVM的有点关系,网络上大牛的原话是:注册钩子函数,在jvm退出的时候释放Controller常量,优雅的释放资源,在关闭服务时,关闭了四个东西remotingServer,响应请求的服务;remotingExecutor Netty服务线程池; scheduledExecutorService 定时任务;fileWatchService 这个是用来跟踪acl配置的**(acl的配置文件是实时热加载的)**。

    public void shutdown() {
            this.remotingServer.shutdown();
            this.remotingExecutor.shutdown();
            this.scheduledExecutorService.shutdown();
            if (this.fileWatchService != null) {
                this.fileWatchService.shutdown();
            }
        }
    

主要结构

image-20220425201434919

Broker心跳注册的源码剖析

整体流程

看个大概,broker的主要流程还是很多,毕竟broker的功能是很多的,看不清可以在1.2处下载高清思维导图

image-20220425211548983

Broker启动的入口在BrokerStartup这个类,可以从他的main方法开始调试。启动过程关键点:重点也是围绕一个BrokerController对象,先创建,然后再启动。在BrokerStartup.createBrokerController方法中可以看到Broker的几个核心配置:

  • BrokerConfig
  • NettyServerConfig :Netty服务端占用了10911端口。又是一个神奇的端口。
  • NettyClientConfig :网络安全TLS 相关
  • MessageStoreConfig:这个明显是Broker用来存储消息的一些配置信息。

也就是说在BrokerStartup.createBrokerController方法中最开始就是注入配置信息,然后再new BrokerController。那么集合broker功能的铺垫和在这个里面的代码我们可以看到

//四个核心组件保存起来。
        this.brokerConfig = brokerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.messageStoreConfig = messageStoreConfig;
        //管理consumer消费offset
        this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
        //管理Topic配置
        this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
        //处理Consumer拉取消息的请求的
        this.pullMessageProcessor = new PullMessageProcessor(this);

然后初始化controller.initialize(),进去就看到4个load,主要是加载磁盘上的配置信息。这些配置信息就用到了MessageStoreConfig里面的信息了

boolean result = this.topicConfigManager.load();
        result = result && this.consumerOffsetManager.load();
        result = result && this.subscriptionGroupManager.load();
        result = result && this.consumerFilterManager.load();

当这四个load加载之后就是对result进行处理了,那么具体是处理什么呢?,下面的代码可以发现,他去处理了管理磁盘上的消息的,还记有个场景就是上一次的RocketMQ非正常的关闭的时候会产生一个文件吧,第二次正常开启的时候,MQ是会去处理的对吧~ 然后值得注意的是在后就是把信息存储相关的给load了,在这里主要的作用对应的应用场景就是:commitLog,index等文件读入到内存中

this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                        this.brokerConfig);
           result = result && this.messageStore.load();

如果设置了dleger模式,就会处理一些相关的组件,还有部分插件的加载,并做出了大量的操作。

277	       if(result){...}
505        return result;
506        }

由于代码量大就不整体复制了。我们主要说下他干了什么把。首先值得我们注意的事情:**Broker本身既是服务端(接收Producer和Consumer的请求),又是客户端(要往NameServer和Producer发送请求)。**总的来说在400行中主要创建了网络组件和业务组件,最后进行了初始化。

image-20220426133910081

在broker中还是有和对外通信的事情把,也是在上面这个部分实中的brokerOuterAPI来实现

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                        } catch (Throwable e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);

最后就是三个初始化了,这三行一看就是一些初始化的东西(事务、ACL权限控制、远程调用钩子),暂时先不关注。

initialTransaction();
            initialAcl();
            initialRpcHooks();

至此BrokerController的所有初始化的源码就结束了。然后我们可以看看start的方法了,start里面废话很多,不是打印就是try兜底,终究的核心还是controller.start();BrokerController.start方法可以看到启动了一大堆Broker的核心服务,我们挑一些重要的

this.messageStore.start();启动核心的消息存储组件
 this.remotingServer.start();
 this.fastRemotingServer.start(); 启动两个Netty服务
 this.brokerOuterAPI.start();启动客户端,往外发请求
 BrokerController.this.registerBrokerAll: 向NameServer注册心跳。
 this.brokerStatsManager.start();
 this.brokerFastFailure.start();这也是一些负责具体业务的功能组件

从上面可以知道Broker中有一大堆的功能组件负责具体的业务

主要结构

image-20220425202456132
那么从图中我们可以知道broker其实最主要的功能之一:注册心跳,也就是BrokerController.this.registerBrokerAll方法会发起向NameServer注册心跳。启动时会立即注册,同时也会启动一个线程池,以10秒延迟,默认30秒的间隔 持续向NameServer发送心跳。

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

BrokerController.this.registerBrokerAll这个方法就是注册心跳的入口。那么进来就是先对Topic的配置进行相关操作然后就是判断是否需要注册,然后调用doRegisterBrokerAll方法真正去注册,判断注册的条件是needRegister这个方法

if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }

brokerOuterAPI.needRegister的方法中我们可以看到一个有意识的事情,虽然代码很多我也看不懂,但是我可以发现依据的判断条件是在nameServerAddressList中获取的

269        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {...}
那么既然判断要注册了,broker就会往所有的NameServer发送注册心跳,如果有多个NameSrv就会返回多个注册心跳,所有是一个大list。
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.getHAServerAddr(),
            topicConfigWrapper,
            this.filterServerManager.buildNewFilterServerList(),
            oneway,
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isCompressedRegister());

还记得前面说NameSrv是无状态的吗?从下面的代码就可以看出来,broker只要收到了来自NameSrv的返回信息,就默认进行处理了

if (registerBrokerResultList.size() > 0) {
            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);}

最后在概述一下整个注册的具体方法

//初始化一个list,用来放向每个NameServer注册的结果,并获取NameSrv的地址列表
        final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();

//构建Broker注册的网络请求 同时生成请求体
            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            RegisterBrokerBody requestBody = new RegisterBrokerBody();

//untDownLatch确保全部的Nameserver都进行了注册后再往下走
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
		   RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);

image-20220425202936356

Producer的源码剖析

整体流程

Producer有两种。

  • 一个是普通发送者DefaultMQProducer。这个只需要构建一个Netty客户端。
  • 另一个是事务消息发送者: TransactionMQProducer。这个需要构建一个Netty客户端同时也要构建Netty服务端。

源码的水太深了,孩子,你把握不住的,所以还是看个大概,以普通的Producer来举例,看不清可以在1.2处下载高清思维导图

image-20220426141919917

对于整个Producer的流程,其实还是挺复杂的,大致分两个步骤, start()方法,准备一大堆信息,send发送消息。我们先抽取下主线。首先,关于Borker路由信息的管理: Producer需要拉取Broker列表,然后跟Broker建立连接等等很多核心的流程,其实都是在发送消息时建立的。因为在启动时,还不知道要拉取哪个TopicBroker列表呢。所以对于这个问题,我们关注的重点,不应该是start方法,而是send方法。Send方法中,首先需要获得Topic的路由信息。这会从本地缓存中获取,如果本地缓存中没有,就从NameServer中去申请。

路由信息大致的管理流程:

image-20220426143752570

然后 获取路由信息后,会选出一个MessageQueue去发送消息。这个选MessageQueue的方法就是一个索引自增然后取模的方式。然后 封装Netty请求发送消息。消息发从到Borker后,会由一个CommitLog类写入到CommitLog文件中。

那么我们来回顾下producer的使用代码删头去尾主要就是以下几个步骤:

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        SendResult sendResult = producer.send(msg);
        producer.shutdown();

首当其冲,start() 里面的setProducerGroup ,它其实是做了一个检查,不能为空;同时也是来判断名称是否合法;长度不能大于255;组名称不能为DEFAULT_PRODUCER

@Override
    public void start() throws MQClientException {
        this.setProducerGroup(withNamespace(this.producerGroup));
        this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

然后就是this.defaultMQProducerImpl.start();,看到了impl 实现类的命名也就知道了这个才是主要逻辑,我们可以看到在start里面创建了一个MQ的实例并注册,然后再是启动,同时我们还发现了Producer是有4种状态的:

  • CREATE_JUST :服务刚刚创建,未启动
  • RUNNING :服务运行
  • SHUTDOWN_ALREADY:服务关闭
  • START_FAILED :服务启动失败
//检查生产者组是否符合要求
                this.checkConfig();
                //获取MQ实例 往外发送请求
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                //注册MQClientInstance实例,方便后续调用
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                //启动实例
                if (startFactory) {
                    mQClientFactory.start();
                }

mQClientFactory.start() 里面可以发现,通过对应的状态完成对应业务实例,最后将状态改为RUNNING

case CREATE_JUST:  
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    //客户端负载均衡,生产者的负载均衡是在发送消息那里体现的哦,所以这里指向的都是consumer
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;

既然生产者已经开始RUNNING了,那么后面就是发送消息了 我们看下send方法,

send--->this.defaultMQProducerImpl.send(msg)--->send(msg, this.defaultMQProducer.getSendMsgTimeout())---> this.sendDefaultImpl

//生产者获取Topic的公开信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
.....    

//Producer计算把消息发到哪个MessageQueue中。
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

那么我们可以看到Produce 把消息发送到MessageQueue的源码方法了,我们发现了它把 topic的主题相关信息和上一次的broker名字作为参数传入了,有点意思,点进去看看具体的代码发现

//Producer选择MessageQueue的方法就是自增,然后取模。并且只有这一种方法。                
int index = tpInfo.getSendWhichQueue().incrementAndGet();

也就是说Producer 把每个MessageQueue都做了类似索引的数字,然后进行自增取模运算,来确定轮询发送到每个队列里面去。同时我们还发现了一个有趣的命名延迟容错,其实就是如果轮询发送了的消息到了某个队列没有成功,那么下次的轮询发送尽量的不再往这个队列里面发送消息了

final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();

然后我们回到this.sendDefaultImpl中往下走:既然找到了队列,那么就是发送消息了呗

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

先去寻找Broker地址。找不到就去NameServer上获取,

String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

然后就是构建了请求参数,太多了就不展示了,不过最后是用mQClientFactory.getMQClientAPIImpl()来发送具体的消息的

SendMessageContext context = null;
if (brokerAddr != null) {
    ......
    ......
 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage()
                        }

通过Topic 去找所有的broker(对应的MessageQueue[轮询找到的队列] 所在的broker去发消息),消息发从到Borker后,会由一个CommitLog类写入到CommitLog文件中。

Broker消息存储的源码剖析

书接上回,消息的存储,存到那里?肯定是文件里面,那么记得有哪些文件类型吗?

  • commitLog:消息存储目录,所有的消息都以二进制的形式存储在这里,大小是固定的,存不下就会新建
  • config:以json的格式存储运行期间一些配置信息
  • consumerqueue:消息消费队列存储目录
  • index:消息索引文件存储目录
  • abort:如果存在改文件寿命Broker非正常关闭
  • checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerquueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。
源码入口:org/apache/rocketmq/store/DefaultMessageStore.java

1.commitLog写入

CommitLog的doAppend方法就是Broker写入消息的实际入口。这个方法最终会把消息追加到MappedFile映射的一块内存里,并没有直接写入磁盘。写入消息的过程是串行的,一次只会允许一个线程写入。

最新版本的方法入口asyncPutMessages,已经和老版本的PutMessages不一样了,但是基本大同小异

public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch)
     
     
      @Override
    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        return waitForPutResult(asyncPutMessage(msg));
    }

那么在这个方法里面 先是设置了一些属性 并处理了处理事务消息 和 非事务消息判断入口

CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

然后就会去处理延迟消息, 写入SCHEDULE_TOPIC_XXXX这个Topic中

625         if (msg.getDelayTimeLevel() > 0) {....}

做了一些设置处理后,最后就把消息topic 和 队列改了,改成了SCHEDULE_TOPIC_XXXX

630				  topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
638                msg.setTopic(topic);
639                msg.setQueueId(queueId);
public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";

接着完成了部分实例属性的获取并完成对应的赋值后,执行mappedFile 有关于零拷贝的实现,一个mappedFile完成一个零拷贝的实现,在用户态读取信息的时候去读映射了,在操作的时候时候,其实是在操作这些映射,不够MappedFile 不能太大,一般都是1.5G左右

663        MappedFile unlockMappedFile = null;
	    .....
667	    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

直接以Append的方式写入文件

result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);

不过我们还是要看看追加commitLog文件的具体写入方法mappedFile.appendMessage ,他的核心实现其实是doAppend,看名字就知道了。在这个方法里面做了大量的数据计算,但是有关于零拷贝的就是和下面这个有关:你可以理解把那些映射文件写到了内存当中

byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);

写入文件之后就开始获取状态,其中有一下几种

  • PUT_OK 成功
  • END_OF_FILE 文件写满了 就创建一个新文件,重写消息
  • MESSAGE_SIZE_EXCEEDED 消息过大的错误
  • PROPERTIES_SIZE_EXCEEDED 属性大小超出
  • UNKNOWN_ERROR

最后在写入成功之前的一点点,开始统计成功的信息并记录,然后完成文件刷盘主从同步两个过程,所有东西都执行成功就返回成功的结果

storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);

我们可以看下文件的刷盘方式,源码中有点小注释了,

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
    // 同步刷盘
	if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {...}
	// 异步刷盘
	else {...}

在同步刷盘的代码中,只是直接构建了一个GroupCommitRequest 请求,然后提交了一个任务去完成真正的文件刷盘,也就是说,当消息写入到了内存中,就立即生成一个请求 ,然后完成文件刷盘的操作

GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),

在异步刷盘的代码中,对应了里面的一个配置isTransientStorePoolEnable,如果我们配置了True,那么就会去申请一块堆外内存,以便于用来异步刷盘。=

if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else  {
                commitLogService.wakeup();
            }

2.分发ConsumeQueue和IndexFile

当CommitLog写入一条消息后,会有一个后台线程reputMessageService每隔1毫秒就会去拉取CommitLog中最新更新的一批消息,然后分别转发到ComsumeQueue和IndexFile里去,这就是他底层的实现逻辑。

并且,如果服务异常宕机,会造成CommitLog和ConsumeQueue、IndexFile文件不一致,有消息写入CommitLog后,没有分发到索引文件,这样消息就丢失了。DefaultMappedStore的load方法提供了恢复索引文件的方法,入口在load方法。

整个文件存储的核心入口入口在DefaultMessageStore的start方法中。

源码入口:org/apache/rocketmq/store/DefaultMessageStore.java  -->  this.reputMessageService.start();

Broker 启动的时候会启动一个线程来更新ConsumerQueue索引文件,那么既然是线程那么就去看看他的run方法

this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();

每间隔1s去做一次doReput的操作,那么doReput的作用其实就是处理消息的分发和转发

@Override
        public void run() {
				.....
                try {
                    Thread.sleep(1);
                    this.doReput();
				.....}

doReput的先是从commitLog的那个 DispatchRequest请求中获取需要分发的消息:

DispatchRequest dispatchRequest =                              DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);

然后通过doDispatch中看到dispatch的实现接口方法里面具有两个具体转发consumeQueueindex的方法

DefaultMessageStore.this.doDispatch(dispatchRequest);

image-20220427124206733

随机查看一个,那consumeQueue来说,把请求的参数(比如说偏移量offset)存入

@Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }

我们可以做定制也就在这里入手了,比如定制dispatch 把具体的业务消息分发到出去到业务定制

3.文件同步刷盘与异步刷盘

老版本(4.7.X)入口:CommitLog.putMessage -> CommitLog.handleDiskFlush

其中主要涉及到是否开启了对外内存。TransientStorePoolEnable。如果开启了堆外内存,会在启动时申请一个跟CommitLog文件大小一致的堆外内存,这部分内存就可以确保不会被交换到虚拟内存中。具体代码可以跳回commitLog写入的尾部分

4.过期文件删除

入口: DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()

默认情况下, Broker会启动后台线程,每60秒,检查CommitLog、ConsumeQueue文件。然后对超过72小时的数据进行删除。也就是说,默认情况下, RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配置。注意他删除时,并不会检查消息是否被消费了。

// 定时删除过期的消息的任务
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

cleanFilesPeriodically删除已经过去的文件,不过你的消息是否被消费了

private void cleanFilesPeriodically() {
        this.cleanCommitLogService.run();
        this.cleanConsumeQueueService.run();
    }

5.文件存储部分的总结

image-20220426171017235

RocketMQ的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、Hash索引文件(IndexFile)、监测点文件(checkPoint)、abort(关闭异常文件)。单个消息存储文件、消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。RocketMQ组织文件以文件的起始偏移量来命令文件,这样根据偏移量能快速定位到真实的物理文件。RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。

CommitLog,消息存储文件,RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为此RocketMQ为了方便消息消费构建了消息消费队列文件,基于主题与队列进行组织,同时RocketMQ为消息实现了Hash索引,可以为消息设置索引键,根据所以能够快速从CommitLog文件中检索消息。

当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ引入abort文件,记录Broker的停机是否是正常关闭还是异常关闭,在重启Broker时为了保证CommitLog文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件。

RocketMQ不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。

Consume的源码剖析

整体流程

image-20220427130145714

消费者以消费者组的模式开展。消费者组之间有集群模式广播模式两种消费模式。然后消费模式有推模式和拉模式。推模式是由拉模式封装组成。

集群模式下,消费队列负载均衡的通用原理:一个消费队列同一时间只能被一个消费者消费,而一个消费者可以同时消费多个队列。

消息顺序:RocketMQ只支持一个队列上的局部消息顺序,不保证全局消息顺序。 要实现顺序消息,可以把有序的消息指定为一个queue,或者给Topic只指定一个Queue,这个不推荐。

源码入口:DefaultMQPushConsumer.start

@Override
    public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        // 主要看实现类
        this.defaultMQPushConsumerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

首先我们会发现关闭是消费者也好,还是生产者也好,都有对应具体的启动状态,对应的检查,以及工作工厂

case CREATE_JUST:
....
this.checkConfig();
....
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

消息负载均衡

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());          this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

注册本地消费者组的缓存

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
//消费者的核心启动过程
                mQClientFactory.start();

客户端的负载均衡this.rebalanceService.start(),又是一个线程,那么找他的run方法,看到this.mqClientFactory.doRebalance(),那么就看默认的PUll,真正的负载都是根据主题来进行的

image-20220427134512961

this.rebalanceByTopic(topic, isOrder);

广播模式是不需要处理负载均衡的,对于每个消费者而言都是要消费的,源码里面基本都是更新一些信息我们就不做过多的说明了

switch (messageModel) {
	case BROADCASTING:{...}
	case CLUSTERING:{...}
    default:
      break;
        }

那么集群模式就有点意思了,他先是根据topic获取消费者Id然后在获取所有的MessageQueue

List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();

然后在排序,然后获取AllocateMessageQueueStrategy类型,也就是用户set的策略,具体的流程就不看了,可以看下我的高级特性里面的5消息负载均衡

Collections.sort(mqAll);
Collections.sort(cidAll);

AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

最后获取了结果信息就做保存处理,至此负载均衡就差不多了!

消息的拉取

image-20220427141544011

this.pullMessageService.start();

又是线程,那么找实现的run方法,这里有两个做事情的代码 一个是拉群消息的队列,还有一个就是处理拉取消息的请求,那么在拉群消息的请求中,看到了一个实现类

PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);

我们可以发现不管是那个消费者类型,最终都会变成拉模式去拉取消息

DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);

impl.pullMessage(pullRequest),中就是消息具体的拉取了,不过我们能看到对于消息的流控也是在这里进行的,从消息的数量、大小、跨度上面进行了流量控制。关于流量的控制可以不看源码,在docs/msg_trace/features.md中看官方说明

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }

        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }

在处理玩流量控制之后,定义了一个庞大的回调函数,在拉取到消息后就会进入到这个方法里面,有一个onSuccess方法

PullCallback pullCallback = new PullCallback(){...}

image-20220427141427454

里面主要处理一些并发的请求啊、消息的处理等 可以自己关注下

然后就开始拉取消息了

this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback
            );

进入pullKernelImpl后,发现它先是去找broker

FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);

然后看到少量注释说明// check version做了一些broker的校验,在构建netty的拉取请求

PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(....)

延迟消息

延迟消息的处理入口在scheduleMessageService这个组件中。 他会在broker启动时也一起加载。整个延迟消息的实现方式是这样的:

image-20220427130428014

消息写入时,会将延迟消息转为写入到SCHEDULE_TOPIC_XXXX这个Topic中。这个系统内置的Topic有18个队列,对应18个延迟级别。代码见CommitLog.putMessage方法。然后ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务,将消息从延迟队列中写入正常Topic中。 代码见ScheduleMessageService中的DeliverDelayedMessageTimerTask.executeOnTimeup方法。

消费者的小结

RocketMQ消息消费方式分别为集群模式、广播模式。

消息队列负载由RebalanceService线程默认每隔20s进行一次消息队列负载,根据当前消费者组内消费者个数与主题队列数量按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消费队列,同一个消息消费队列同一个时间只会分配给一个消费者。

消息拉取由PullMessageService线程根据RebalanceService线程创建的拉取任务进行拉取,默认每次拉取32条消息,提交给消费者消费线程后继续下一次消息拉取。如果消息消费过慢产生消息堆积会触发消息消费拉取流控。

并发消息消费指消费线程池中的线程可以并发对同一个消息队列的消息进行消费,消费成功后,取出消息队列中最小的消息偏移量作为消息消费进度偏移量存储在于消息消费进度存储文件中,集群模式消息消费进度存储在Broker(消息服务器),广播模式消息消费进度存储在消费者端。

RocketMQ不支持任意精度的定时调度消息,只支持自定义的消息延迟级别,例如1s、2s、5s等,可通过在broker配置文件中设置messageDelayLevel

顺序消息一般使用集群模式,是指对消息消费者内的线程池中的线程对消息消费队列只能串行消费。并并发消息消费最本质的区别是消息消费时必须成功锁定消息消费队列,在Broker端会存储消息消费队列的锁占用情况。