RocketMq 的介绍说明和集群搭建
RocketMQ
是阿里巴巴开源的一个消息中间件,在阿里内部历经了双十一等很多高并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的一个顶级项目。目前RocketMQ
在阿里云上有一个购买即可用的商业版本,商业版本集成了阿里内部一些更深层次的功能及运维定制。我们这里学习的是Apache的开源版本。开源版本相对于阿里云上的商业版本,功能上略有缺失,但是大体上功能是一样的。
RocketMq 集群架构说明
RocketMQ架构上主要分为四部分,如上图所示:
-
Producer
:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。负责生产消息生产者还可以是生产者组,拥有多个生产者,用于事务消息的定期回查
-
Consumer
:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。 负责消费消息 -
NameServer
:NameServer
是一个非常简单的Topic
路由注册中心,其角色类似Dubbo
中的zookeeper
,支持Broker
的动态注册与发现。主要包括两个功能:- Broker管理,
NameServer
接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活; - 路由信息管理,每个
NameServer
将保存关于Broker
集群的整个路由信息和用于客户端查询的队列信息。然后Producer
和Conumser
通过NameServer
就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer
通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer
注册自己的路由信息,所以每一个NameServer
实例上面都保存一份完整的路由信息。当某个NameServer
因某种原因下线了,Broker仍然可以向其它NameServer
同步其路由信息,Producer
和Consumer
仍然可以动态感知Broker
的路由的信息。
- Broker管理,
-
BrokerServer
:Broker
主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker
包含了以下几个重要子模块。
Remoting Module
:整个Broker
的实体,负责处理来自Client
端的请求。Client Manager
:负责管理客户端(Producer/Consume
r)和维护Consumer
的Topic
订阅信息。Store Service
:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。HA Service
:高可用服务,提供Master Broker
和Slave Broker
之间的数据同步功能。Index Service
:根据特定的Message key
对投递到Broker
的消息进行索引服务,以提供消息的快速查询。
主题Topic
表示一类消息的集合,是一个逻辑概念,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。同一个Topic下的数据,会分片保存到不同的Broker上,而每一个分片单位,就叫做MessageQueue
。MessageQueue
是生产者发送消息与消费者消费消息的最小单位。
RocketMQ 网络部署特点
NameServer
是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。Broker
部署相对复杂,Broker
分为Master
与Slave
,一个Master
可以对应多个Slave
,但是一个Slave
只能对应一个Master
,Master
与Slave
的对应关系通过指定相同的BrokerName
,不同的BrokerId
来定义,BrokerId
为0表示Master,非0表示Slave
。Master也
可以部署多个。每个Broker
与NameServer
集群中的所有节点建立长连接,定时注册Topi
c信息到所有NameServer
。 注意:当前RocketMQ
版本在部署架构上支持一Master多Slave,但只有BrokerId=1
的从服务器才会参与消息的读负载。Producer
与NameServer
集群中的其中一个节点(随机选择)建立长连接,定期从NameServer
获取Topic
路由信息,并向提供Topic
服务的Maste
r建立长连接,且定时向Master
发送心跳。Producer
完全无状态,可集群部署。Consumer
与NameServer
集群中的其中一个节点(随机选择)建立长连接,定期从NameServer
获取Topic路由信息,并向提供Topic
服务的Master
、Slave
建立长连接,且定时向Master
、Slave
发送心跳。Consumer
既可以从Master
订阅消息,也可以从Slave
订阅消息,消费者在向Master
拉取消息时,Master
服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master
还是Slave
拉取。
结合部署架构图,描述集群工作流程:
- 启动
NameServer
,NameServer
起来后监听端口,等待Broker
、Producer
、Consumer
连上来,相当于一个路由控制中心。 Broker
启动,跟所有的NameServer
保持长连接,定时发送心跳包。心跳包中包含当前Broker
信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer
集群中就有Topic
跟Broker
的映射关系。- 收发消息前,先创建
Topic
,创建Topic
时需要指定该Topic
要存储在哪些Broker
上,也可以在发送消息时自动创建Topic
。 Producer
发送消息,启动时先跟NameServer
集群中的其中一台建立长连接,并从NameServer
中获取当前发送的Topic
存在哪些Broker
上,轮询从队列列表中选择一个队列,然后与队列所在的Broker
建立长连接从而向Broker
发消息。Consumer
跟Producer
类似,跟其中一台NameServer
建立长连接,获取当前订阅Topic
存在哪些Broker
上,然后直接跟Broker
建立连接通道,开始消费消息。
RocketMq Quick Start
搭建RcoketMq集群
1.环境和机器
IP | broker节点部署 | |
---|---|---|
192.168.230.133 | 4C8G | |
192.168.230.134 | 4C8G | broker-a, broker-b-s |
192.168.230.135 | 4C8G | broker-b,broker-a-s |
134的broker-a 和135的broker-a-s 为一组主从,135的broker-b 和134的broker-b-s为一组主从。
2.前期准备
免密登录
ssh-copy-id
关闭防火墙
systemctl stop firewalld.service
firewall-cmd --state
安装java和mvn
由于rocketmq
本身就是java来实现的,maven 是用于可视化界面
3.安装和配置RocketMQ集群
安装见3️⃣RocketMq Quick Start
环境变量
#setting java jdk
export JAVA_HOME=/work/java/jdk1.8.0_171
export ROCKETMQ_HOME=/work/rocketmq/rocketmq-4.9.3
PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME/bin:$PATH:$HOME/.local/bin:$HOME/bin:$MAVEN_HOME/bin
export PATH
我们为了便于观察,这次搭建一个2主2从异步刷盘的集群,所以我们会使用conf/2m-2s-async
下的配置文件,实际项目中,为了达到高可用,一般会使用dleger
。所以修改的配置文件是进入rocketmq
的config
目录下修改2m-2s-async
的配置文件。--只需要配置broker.conf
。
在rocketmq的config目录下可以看到rocketmq建议的各种配置方式:
- 2m-2s-async: 2主2从异步刷盘(吞吐量较大,但是消息可能丢失),
- 2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全),
- 2m-noslave:2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置。
- 而
dleger
就是用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。
配置第一组broker-a
在192.168.230.134上先配置borker-a
的master
节点。先配置2m-2s-async/broker-a.properties
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=192.168.230.133:9876;192.168.230.134:9876;192.168.230.135:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/work/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/work/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/work/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/work/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/work/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/work/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
该节点对应的从节点在192.168.230.135上。修改2m-2s-async/broker-a-s.properties
只需要修改brokerId
和brokerRole
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=192.168.230.133:9876;192.168.230.134:9876;192.168.230.135:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/work/rocketmq/storeSlave
#commitLog 存储路径
storePathCommitLog=/work/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/work/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/work/rocketmq/storeSlave/index
#checkpoint 文件存储路径
storeCheckpoint=/work/rocketmq/storeSlave/checkpoint
#abort 文件存储路径
abortFile=/work/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
配置第二组broker-b
这一组broker的主节点在192.168.230.135上,所以需要配置192.168.230.135上的config/2m-2s-async/broker-b.properties
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=192.168.230.133:9876;192.168.230.134:9876;192.168.230.135:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/work/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/work/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/work/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/work/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/work/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/work/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
然后它对应的slave在192.168.230.134
上,修改192.168.230.134
上的 conf/2m-2s-async/broker-b-s.properties
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=192.168.230.133:9876;192.168.230.134:9876;192.168.230.135:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/work/rocketmq/storeSlave
#commitLog 存储路径
storePathCommitLog=/work/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/work/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/work/rocketmq/storeSlave/index
#checkpoint 文件存储路径
storeCheckpoint=/work/rocketmq/storeSlave/checkpoint
#abort 文件存储路径
abortFile=/work/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
需要注意的配置项:
- 同一机器上两个实例的
store
目录不能相同,否则会报错Lock failed,MQ already started
- 同一机器上两个实例的
listenPort
也不能相同。否则会报端口占用的错nameserver
不需要进行配置,直接启动就行。这也看出nameserver
是无状态的。
配置参数说明-broker相关配置信息
参数名 | 默认值 | 说明 |
---|---|---|
listenPort | 10911 | 接受客户端连接的监听端口 |
namesrvAddr | null | nameServer 地址 |
brokerIP1 | 网卡的 InetAddress | 当前 broker 监听的 IP |
brokerIP2 | 跟 brokerIP1 一样 | 存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步 |
brokerName | null | broker 的名称 |
brokerClusterName | DefaultCluster | 本 broker 所属的 Cluser 名称 |
brokerId | 0 | broker id, 0 表示 master, 其他的正整数表示 slave |
storePathCommitLog | $HOME/store/commitlog/ | 存储 commit log 的路径 |
storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 |
mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |
deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |
fileReservedTime | 72 | 以小时计算的文件保留时间 |
brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。 |
配置参数说明-客户端相关配置信息
参数名 | 默认值 | 说明 |
---|---|---|
namesrvAddr | Name Server地址列表,多个NameServer地址用分号隔开 | |
clientIP | 本机IP | 客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 |
instanceName | DEFAULT | 客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等) |
clientCallbackExecutorThreads | 4 | 通信层异步回调线程数 |
pollNameServerInteval | 30000 | 轮询Name Server间隔时间,单位毫秒 |
heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 |
persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 |
配置参数说明-生产者相关配置信息
参数名 | 默认值 | 说明 |
---|---|---|
producerGroup | DEFAULT_PRODUCER | Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组 |
createTopicKey | TBW102 | 在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。 |
defaultTopicQueueNums | 4 | 在发送消息,自动创建服务器不存在的topic时,默认创建的队 列数 |
sendMsgTimeout | 10000 | 发送消息超时时间,单位毫秒 |
compressMsgBodyOverHowmuch | 4096 | 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 |
retryAnotherBrokerWhenNotStoreOK | FALSE | 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 |
retryTimesWhenSendFailed | 2 | 如果消息发送失败,最大重试次数,该参数只对同步发送模式 起作用 |
maxMessageSize | 4MB | 客户端限制的消息大小,超过报错,同时服务端也会限制,所 以需要跟服务端配合使用。 |
transactionCheckListener | 事务消息回查监听器,如果发送事务消息,必须设置 | |
checkThreadPoolMinSize | 1 | Broker回查Producer事务状态时,线程池最小线程数 |
checkThreadPoolMaxSize | 1 | Broker回查Producer事务状态时,线程池最大线程数 |
checkRequestHoldMax | 2000 | Broker回查Producer事务状态时,Producer本地缓冲请求队 列大小 |
RPCHook | null | 该参数是在Producer创建时传入的,包含消息发送前的预处理和消息响应后的处理两个接口,用户可以在第一个接口中做一 些安全控制或者其他操作。 |
配置参数说明-PushCousumer相关配置信息
参数名 | 默认值 | 说明 |
---|---|---|
consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消 费逻辑一致,则应该将它们归为同一组 |
messageModel | CLUSTERING | 消费模型支持集群消费和广播消费两种 |
consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer启动后,默认从上次消费的位置开始消费,这包含两种情况:一种是上次消费的位置未过期,则消费从上次中止的位置进行;一种是上次消 费位置已经过期,则从当前队列第一条消息开始消费 |
consumeTimestamp | 半个小时前 | 只有当consumeFromWhere值为CONSUME_FROM_TIMESTAMP时才起作 用。 |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
subscription | 订阅关系 | |
messageListener | 消息监听器 | |
offsetStore | 消费进度存储 | |
consumeThreadMin | 10 | 消费线程池最小线程数 |
consumeThreadMax | 20 | 消费线程池最大线程数 |
consumeConcurrentlyMaxSpan | 2000 | 单队列并行消费允许的最大跨度 |
pullThresholdForQueue | 1000 | 拉消息本地队列缓存消息最大数 |
pullInterval | 0 | 拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置 大于0的值,单位毫秒 |
consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 |
pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 |
配置参数说明-PullCousumer相关配置信息
参数名 | 默认值 | 说明 |
---|---|---|
consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属 于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
brokerSuspendMaxTimeMillis | 20000 | 长轮询,Consumer拉消息请求在Broker 挂起最长时间,单位毫秒 |
consumerTimeoutMillisWhenSuspend | 30000 | 长轮询,Consumer拉消息请求在Broker 挂起超过指定时间,客户端认为超时,单位毫秒 |
consumerPullTimeoutMillis | 10000 | 非长轮询,拉消息超时时间,单位毫秒 |
messageModel | BROADCASTING | 消息支持两种模式:集群消费和广播消费 |
messageQueueListener | 监听队列变化 | |
offsetStore | 消费进度存储 | |
registerTopics | 注册的topic集合 | |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
配置参数说明-Message数据结构
字段名 | 默认值 | 说明 |
---|---|---|
Topic | null | 必填,消息所属topic的名称 |
Body | null | 必填,消息体 |
Tags | null | 选填,消息标签,方便服务器过滤使用。目前只支持每个消息设置一个tag |
Keys | null | 选填,代表这条消息的业务关键词,服务器会根据keys创建哈希 索引,设置后,可以在Console系统根据Topic、Keys来查询消 息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品 Id等。 |
Flag | 0 | 选填,完全由应用来设置,RocketMQ不做干预 |
DelayTimeLevel | 0 | 选填,消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费 |
WaitStoreMsgOK | TRUE | 选填,表示消息是否在服务器落盘后才返回应答。 |
4.启动RocketMQ集群
启动就比较简单了,直接调用bin目录下的脚本就行。只是启动之前要注意看下他们的JVM内存配置,默认的配置都比较高。默认是server:4G broker:8G,如果需要调整可以通过bin/runserver.sh
文件,调整里面的jvm
内存配置。
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
也可以通过bin/runbroker.sh
文件,调整broker里面的jvm
内存配置
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
直接在三个节点上启动nameServer
。
nohup bin/mqnamesrv &
启动完成后,在nohup.out
里看到这一条关键日志就是启动成功了。
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
这里也看到,
RocketMQ
在runserver.sh中是使用的CMS垃圾回收期,而在runbroker.sh
中使用的是G1垃圾回收期。
启动broker
是使用的mqbroker
指令,只是注意启动broker时需要通过-c
指定对应的配置文件。
在134上启动broker-a的master节点和broker-b的slave节点
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &
在135上启动broker-b的master节点和broker-a的slave节点
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &
启动slave时,如果遇到报错
Lock failed,MQ already started
,那是有可能是因为有多个实例共用了同一个storePath
造成的,这时就需要调整store的路径。
通过对应的nuhut.out
日志可以看到
The broker[broker-b, 192.168.230.135:10911] boot success. serializeType=JSON and name server is 192.168.230.133:9876;192.168.230.134:9876;192.168.230.135:9876
The broker[broker-a, 192.168.230.135:11011] boot success. serializeType=JSON and name server is 192.168.230.133:9876;192.168.230.134:9876;192.168.230.135:9876
5.启动状态检查
使用jps
指令,能看到一个NameSrvStartup
进程和两个BrokerStartup
进程。
# jps
18896 BrokerStartup
18631 NamesrvStartup
19017 BrokerStartup
71230 Jps
nohup.out
中也有启动成功的日志。对应的日志文件:
# 查看nameServer日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log
测试mqadmin管理工具
RocketMQ
的源代码中并没有为我们提供类似于Nacos
或者RabbitMQ
那样的控制台,只提供了一个mqadmin
指令来管理RocketMQ
,命令在bin目录下。使用方式是 ./mqadmin {command} {args}
1.Topic相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
updateTopic | 创建更新Topic配置 | -b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port |
-c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询) | ||
-h- | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
-p | 指定新topic的读写权限( W=2|R=4|WR=6 ) | ||
-r | 可读队列数(默认为 8) | ||
-w | 可写队列数(默认为 8) | ||
-t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) | ||
deleteTopic | 删除Topic | -c | cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询) |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) | ||
topicList | 查看 Topic 列表信息 | -h | 打印帮助 |
-c | 不配置-c只返回topic列表,增加-c返回clusterName, topic, consumerGroup信息,即topic的所属集群和订阅关系,没有参数 | ||
-n | NameServer 服务地址,格式 ip:port | ||
topicRoute | 查看 Topic 路由信息 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
topicStatus | 查看 Topic 消息队列offset | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
topicClusterList | 查看 Topic 所在集群列表 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
updateTopicPerm | 更新 Topic 读写权限 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port | ||
-p | 指定新 topic 的读写权限( W=2|R=4|WR=6 ) | ||
-c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b优先,如果没有-b,则对集群中所有Broker执行命令 | ||
updateOrderConf | 从NameServer上创建、删除、获取特定命名空间的kv配置,目前还未启用 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic,键 | ||
-v | orderConf,值 | ||
-m | method,可选get、put、delete | ||
allocateMQ | 以平均负载算法计算消费者列表负载消息队列的负载结果 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-i | ipList,用逗号分隔,计算这些ip去负载Topic的消息队列 | ||
statsAll | 打印Topic订阅关系、TPS、积累量、24h读写总量等信息 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-a | 是否只打印活跃topic | ||
-t | 指定topic |
2.集群相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
clusterList | 查看集群信息,集群、BrokerName、BrokerId、TPS等信息 | -m | 打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday) |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-i | 打印间隔,单位秒 | ||
clusterRT | 发送消息检测集群各Broker RT。消息发往${BrokerName} Topic。 | -a | amount,每次探测的总数,RT = 总时间 / amount |
-s | 消息大小,单位B | ||
-c | 探测哪个集群 | ||
-p | 是否打印格式化日志,以|分割,默认不打印 | ||
-h | 打印帮助 | ||
-m | 所属机房,打印使用 | ||
-i | 发送间隔,单位秒 | ||
-n | NameServer 服务地址,格式 ip:port |
3.Broker相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
updateBrokerConfig | 更新 Broker 配置文件,会修改Broker.conf | -b | Broker 地址,格式为ip:port |
-c | cluster 名称 | ||
-k | key 值 | ||
-v | value 值 | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
brokerStatus | 查看 Broker 统计信息、运行状态(你想要的信息几乎都在里面) | -b | Broker 地址,地址为ip:port |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
brokerConsumeStats | Broker中各个消费者的消费情况,按Message Queue维度返回Consume Offset,Broker Offset,Diff,TImestamp等信息 | -b | Broker 地址,地址为ip:port |
-t | 请求超时时间 | ||
-l | diff阈值,超过阈值才打印 | ||
-o | 是否为顺序topic,一般为false | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
getBrokerConfig | 获取Broker配置 | -b | Broker 地址,地址为ip:port |
-n | NameServer 服务地址,格式 ip:port | ||
wipeWritePerm | 从NameServer上清除 Broker写权限 | -b | Broker 地址,地址为ip:port |
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
cleanExpiredCQ | 清理Broker上过期的Consume Queue,如果手动减少对列数可能产生过期队列 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | Broker 地址,地址为ip:port | ||
-c | 集群名称 | ||
cleanUnusedTopic | 清理Broker上不使用的Topic,从内存中释放Topic的Consume Queue,如果手动删除Topic会产生不使用的Topic | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | Broker 地址,地址为ip:port | ||
-c | 集群名称 | ||
sendMsgStatus | 向Broker发消息,返回发送状态和RT | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | BrokerName,注意不同于Broker地址 | ||
-s | 消息大小,单位B | ||
-c | 发送次数 |
4.消息相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
queryMsgById | 根据offsetMsgId查询msg,如果使用开源控制台,应使用offsetMsgId,此命令还有其他参数,具体作用请阅读QueryMsgByIdSubCommand。 | -i | msgId |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
queryMsgByKey | 根据消息 Key 查询消息 | -k | msgKey |
-t | Topic 名称 | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
queryMsgByOffset | 根据 Offset 查询消息 | -b | Broker 名称,(这里需要注意 填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到) |
-i | query 队列 id | ||
-o | offset 值 | ||
-t | topic 名称 | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
queryMsgByUniqueKey | 根据msgId查询,msgId不同于offsetMsgId,区别详见常见运维问题。-g,-d配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-i | uniqe msg id | ||
-g | consumerGroup | ||
-d | clientId | ||
-t | topic名称 | ||
checkMsgSendRT | 检测向topic发消息的RT,功能类似clusterRT | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-a | 探测次数 | ||
-s | 消息大小 | ||
sendMessage | 发送一条消息,可以根据配置发往特定Message Queue,或普通发送。 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-p | body,消息体 | ||
-k | keys | ||
-c | tags | ||
-b | BrokerName | ||
-i | queueId | ||
consumeMessage | 消费消息。可以根据offset、开始&结束时间戳、消息队列消费消息,配置不同执行不同消费逻辑,详见ConsumeMessageCommand。 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-b | BrokerName | ||
-o | 从offset开始消费 | ||
-i | queueId | ||
-g | 消费者分组 | ||
-s | 开始时间戳,格式详见-h | ||
-d | 结束时间戳 | ||
-c | 消费多少条消息 | ||
printMsg | 从Broker消费消息并打印,可选时间段 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-c | 字符集,例如UTF-8 | ||
-s | subExpress,过滤表达式 | ||
-b | 开始时间戳,格式参见-h | ||
-e | 结束时间戳 | ||
-d | 是否打印消息体 | ||
printMsgByQueue | 类似printMsg,但指定Message Queue | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-i | queueId | ||
-a | BrokerName | ||
-c | 字符集,例如UTF-8 | ||
-s | subExpress,过滤表达式 | ||
-b | 开始时间戳,格式参见-h | ||
-e | 结束时间戳 | ||
-p | 是否打印消息 | ||
-d | 是否打印消息体 | ||
-f | 是否统计tag数量并打印 | ||
resetOffsetByTime | 按时间戳重置offset,Broker和consumer都会重置 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-g | 消费者分组 | ||
-t | topic名称 | ||
-s | 重置为此时间戳对应的offset | ||
-f | 是否强制重置,如果false,只支持回溯offset,如果true,不管时间戳对应offset与consumeOffset关系 | ||
-c | 是否重置c++客户端offset |
5.消费者和消费者组相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
consumerProgress | 查看订阅组消费状态,可以查看具体的client IP的消息积累量 | -g | 消费者所属组名 |
-s | 是否打印client IP | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
consumerStatus | 查看消费者状态,包括同一个分组中是否都是相同的订阅,分析Process Queue是否堆积,返回消费者jstack结果,内容较多,使用者参见ConsumerStatusSubCommand | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-g | consumer group | ||
-i | clientId | ||
-s | 是否执行jstack | ||
getConsumerStatus | 获取 Consumer 消费进度 | -g | 消费者所属组名 |
-t | 查询主题 | ||
-i | Consumer 客户端 ip | ||
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
updateSubGroup | 更新或创建订阅关系 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | Broker地址 | ||
-c | 集群名称 | ||
-g | 消费者分组名称 | ||
-s | 分组是否允许消费 | ||
-m | 是否从最小offset开始消费 | ||
-d | 是否是广播模式 | ||
-q | 重试队列数量 | ||
-r | 最大重试次数 | ||
-i | 当slaveReadEnable开启时有效,且还未达到从slave消费时建议从哪个BrokerId消费,可以配置备机id,主动从备机消费 | ||
-w | 如果Broker建议从slave消费,配置决定从哪个slave消费,配置BrokerId,例如1 | ||
-a | 当消费者数量变化时是否通知其他消费者负载均衡 | ||
deleteSubGroup | 从Broker删除订阅关系 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | Broker地址 | ||
-c | 集群名称 | ||
-g | 消费者分组名称 | ||
cloneGroupOffset | 在目标群组中使用源群组的offset | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-s | 源消费者组 | ||
-d | 目标消费者组 | ||
-t | topic名称 | ||
-o | 暂未使用 |
6.连接相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
consumerConnec tion | 查询 Consumer 的网络连接 | -g | 消费者所属组名 |
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
producerConnec tion | 查询 Producer 的网络连接 | -g | 生产者所属组名 |
-t | 主题名称 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 |
7.NameServer相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
updateKvConfig | 更新NameServer的kv配置,目前还未使用 | -s | 命名空间 |
-k | key | ||
-v | value | ||
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
deleteKvConfig | 删除NameServer的kv配置 | -s | 命名空间 |
-k | key | ||
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
getNamesrvConfig | 获取NameServer配置 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
updateNamesrvConfig | 修改NameServer配置 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-k | key | ||
-v | value |
8.其他
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
startMonitoring | 开启监控进程,监控消息误删、重试队列消息数等 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 |
注意:
1、几乎所有指令都需要通过-n参数配置nameServer地址,格式为ip:port
2、几乎所有执行都可以通过-h参数获得帮助
3、当既有Broker地址(-b)又有集群名称clustername(-c)配合项,则优先以Broker地址执行指令。如果不配置Broker地址,则对集群中所有主机执行指令。
命令行快速验证
在RocketMQ
的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ
服务。我们在134上进入RocketMQ
的安装目录:
发送消息:默认会发1000条消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息:
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
注意,这是官方提供的Demo,但是官方的源码中,这两个类都是没有指定nameServer的,所以运行会有点问题。要指定NameServer地址,可以配置一个环境变量NAMESRV_ADDR,这样默认会读取这个NameServer地址。可以配到.bash_profile里或者直接临时指定。
export NAMESRV_ADDR='192.168.230.133:9876;192.168.230.134:9876;192.168.230.135:9876'
然后就可以正常执行了。
这个NameServer
地址的读取方式见源码中org.apache.rocketmq.common.utils.NameServerAddressUtils
public static String getNameServerAddresses() {
return System.getProperty("rocketmq.namesrv.addr", System.getenv("NAMESRV_ADDR"));
}
这个方法就是在DefaultMQProducer中默认的设置NameServer地址的方式,这个rokcetmq.namesrv.addr属性可以在java中使用System.setproperties指定,也可以在SpringBoot中配到配置文件里。
这个tools.sh就封装了一个简单的运行
RocketMQ
的环境,可以运行源码中的其他示例,然后自己的例子也可以放到RocketMQ
的lib目录下去执行。
命令行快速验证
RocketMQ
源代码中并没有提供控制台,但是有一个Rocket的社区扩展项目中提供了一个控制台,GITHUB地址
下载下来后,进入其中的rocket-console
目录,使用maven
进行编译
mvn clean package -Dmaven.test.skip=true
编译完成后,获取target下的jar包,就可以直接执行。但是这个时候要注意,在这个项目的application.properties中需要指定nameserver的地址。默认这个属性是空的。
那我们可以在jar包的当前目录下增加一个application.properties
文件,覆盖jar包中默认的一个属性:
rocketmq.config.namesrvAddr=192.168.230.133:9876;192.168.230.134:9876;192.168.230.135:9876
然后执行:
java -jar rocketmq-console-ng-1.0.1.jar
启动完成后,可以访问 http://192.168.230.133:8080看到管理页面
在管理页面的右上角可以选择语言。
Dleger高可用集群搭建
1.Dleger介绍
当brokerServer中,的三个2m-2s-async
、2m-2s-sync
、2m-noslave
,都不具备真实的高可用,在4.5版本以后引入了dledger
,它是一个三方技术,不是rocketMQ自己的,类似kakfa和zk的关系。Dledger技术做的事情:
- 接管Broker的CommitLog消息存储
- 从集群中选举出master节点
- 完成master节点往slave节点的消息同步。
2.Dleger的选举机制
首先:每个节点有三个状态,Leader
,follower
和candidate
(候选人)。正常运行的情况下,集群中会有一个leader
,其他都是follower
,follower
只响应Leader
和Candidate
的请求,而客户端的请求全部由Leader
处理,即使有客户端请求到了一个follower
,也会将请求转发到leader
。
集群刚启动时,每个节点都是follower
状态,之后集群内部会发送一个timeout
信号,所有follower
就转成candidate
去拉取选票,获得大多数选票的节点选为leader,其他候选人转为follower。如果一个timeout信号发出时,没有选出leader,将会重新开始一次新的选举。而Leader节点会往其他节点发送心跳信号,确认他的leader状态。
然后会启动定时器,如果在指定时间内没有收到Leader的心跳,就会转为Candidate状态,然后向其他成员发起投票请求,如果收到半数以上成员的投票,则Candidate会晋升为Leader。然后leader也有可能会退化成follower。
在Raft协议中,会将时间分为一些任意时间长度的时间片段,叫做term
。term
会使用一个全局唯一,连续递增的编号作为标识,也就是起到了一个逻辑时钟的作用。
在每一个term时间片里,都会进行新的选举,每一个Candidate
都会努力争取成为leader
。获得票数最多的节点就会被选举为Leader
。被选为Leader
的这个节点,在一个term
时间片里就会保持leader
状态。这样,就会保证在同一时间段内,集群中只会有一个Leader。在某些情况下,选票可能会被各个节点瓜分,形成不了多数派,那这个term可能直到结束都没有leader,直到下一个term再重新发起选举,这也就没有了Zookeeper中的脑裂问题。而在每次重新选举的过程中, leader也有可能会退化成为follower。也就是说,在这个集群中, leader节点是会不断变化的。
然后,每次选举的过程中,每个节点都会存储当前`term编号`,并在节点之间进行交流时,都会带上自己的term编号。如果一个节点发现他的编号比另外一个小,那么他就会将自己的编号更新为较大的那一个。而如果leader或者candidate发现自己的编号不是最新的,他就会自动转成follower。如果接收到的请求term编号小于自己的编号,term将会拒绝执行。
在选举过程中,Raft协议会通过心跳机制发起leader选举。节点都是从follower状态开始的,如果收到了来自leader或者candidate的心跳RPC请求,那他就会保持follower状态,避免争抢成为candidate。而leader会往其他节点发送心跳信号,来确认自己的地位。如果follower一段时间(两个timeout信号)内没有收到Leader的心跳信号,他就会认为leader挂了,发起新一轮选举。
选举开始后,每个`follower`会增加自己当前的`term`,并将自己转为`candidate`。然后向其他节点发起投票请求,请求时会带上自己的编号和term,也就是说都会默认投自己一票。之后`candidate`状态可能会发生以下三种变化:
- 赢得选举,成为leader: 如果它在一个term内收到了大多数的选票,将会在接下的剩余term时间内称为leader,然后就可以通过发送心跳确立自己的地位。(每一个server在一个term内只能投一张选票,并且按照先到先得的原则投出)
- 其他节点成为leader: 在等待投票时,可能会收到其他server发出心跳信号,说明其他leader已经产生了。这时通过比较自己的term编号和RPC过来的term编号,如果比对方大,说明leader的term过期了,就会拒绝该RPC,并继续保持候选人身份; 如果对方编号不比自己小,则承认对方的地位,转为follower。
- 选票被瓜分,选举失败: 如果没有candidate获取大多数选票, 则没有leader产生, candidate们等待超时后发起另一轮选举. 为了防止下一次选票还被瓜分,必须采取一些额外的措施, raft采用随机election timeout(随机休眠时间)的机制防止选票被持续瓜分。通过将timeout随机设为一段区间上的某个值, 因此很大概率会有某个candidate率先超时然后赢得大部分选票。
所以以三个节点的集群为例,选举过程会是这样的:
- 集群启动时,三个节点都是follower,然后都变成竞选者Candidate发起投票后,三个节点都会给自己投票。这样一轮投票下来,三个节点的term都是1,是一样的,这样是选举不出Leader的。
- 当一轮投票选举不出Leader后,三个节点会进入随机休眠,例如10秒,50秒,100秒。
- 一秒后,第一个节点醒来,会把自己的term加一票,投为2。然后2秒时,第二个节点醒来,发现第一个的term已经是2,比自己的1大,就会承认第一个是Leader,把自己的term也更新为2。实际上这个时候,第一个已经获得了集群中的多数票,2票,第一个就会被选举成Leader。这样,一般经过很短的几轮选举,就会选举出一个Leader来。
- 到100秒时,最后一个节点会醒来,他也同样会承认第一个的term最大,他是Leader,自己的term也会更新为2。这样集群中的所有Candidate就都确定成了leader和follower.
- 然后在一个任期内,第一个会不断发心跳给另外两个节点。当leader挂了后,follow没有收到leader的心跳,就会都转化成Candidate状态,重新发起选举。
Dledger还会采用Raft协议进行多副本的消息同步:
简单来说,数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。
Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。
接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。
再接下来, Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步。
3.Dleger的配置
要搭建高可用的Broker集群,我们只需要配置conf/dleger
下的配置文件就行。
这种模式是基于Raft协议的,是一个类似于Zookeeper的
paxos
协议的选举协议,也是会在集群中随机选举出一个leader,其他的就是follower。只是他选举的过程跟paxos
有点不同。Raft协议基于随机休眠机制的,选举过程会比paxos
相对慢一点。
首先:我们同样是需要修改runserver.sh和runbroker.sh,对JVM内存进行定制。 然后:我们需要修改conf/dleger
下的配置文件。 跟dleger
相关的几个配置项如下:
name | 含义 | 举例 |
---|---|---|
enableDLegerCommitLog | 是否启动 DLedger | true |
dLegerGroup | DLedger Raft Group的名字,建议和 brokerName 保持一致 | RaftNode00 |
dLegerPeers | DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致 | n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 |
dLegerSelfId | 节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 | n0 |
sendMessageThreadPoolNums | 发送线程个数,建议配置成 Cpu 核数 | 16 |
配置完后,同样是使用 nohup bin/mqbroker -c $conf_name & 的方式指定实例文件。
在bin/dleger下有个fast-try.sh,这个脚本是在本地启动三个RocketMQ实例,搭建一个高可用的集群,读取的就是conf/dleger下的broker-no.conf,broker-n1.conf和broker-n2.conf。使用这个脚本同样要注意定制下JVM内存,他给每个实例默认定制的是1G内存,虚拟机肯定是不够的。
这种单机三实例的集群搭建完成后,可以使用 bin/mqadmin clusterList -n worker1.conf的方式查看集群状态。
单机状态下一般一次主从切换需要大概10S的时间。
调整系统参数
到这里,我们的整个RocketMQ的服务就搭建完成了。但是在实际使用时,我们说RocketMQ的吞吐量、性能都很高,那要发挥RocketMQ的高性能,还需要对RocketMQ以及服务器的性能进行定制
1、配置RocketMQ的JVM内存大小
之前提到过,在runserver.sh
中需要定制nameserver
的内存大小,在runbroker.sh
中需要定制broker
的内存大小。这些默认的配置可以认为都是经过检验的最优化配置,但是在实际情况中都还需要根据服务器的实际情况进行调整。这里以runbroker.sh
中对G1GC的配置举例,在runbroker.sh
中的关键配置:
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_broker_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
-XX:+UseG1GC
: 使用G1垃圾回收器, -XX:G1HeapRegionSize=16m
将G1的region块大小设为16M,
-XX:G1ReservePercent
:在G1的老年代中预留25%空闲内存,这个默认值是10%,RocketMQ把这个参数调大了。
-XX:InitiatingHeapOccupancyPercent=30
:当堆内存的使用率达到30%之后就会启动G1垃圾回收器尝试回收垃圾,默认值是45%,RocketMQ把这个参数调小了,也就是提高了GC的频率,但是避免了垃圾对象过多,一次垃圾回收时间太长的问题。
然后,后面定制了GC的日志文件,确定GC日志文件的地址、打印的内容以及控制每个日志文件的大小为30M并且只保留5个文件。这些在进行性能检验时,是相当重要的参考内容。
2、RocketMQ的其他一些核心参数
例如在conf/dleger/broker-n0.conf
中有一个参数:sendMessageThreadPoolNums=16
。这一个参数是表明RocketMQ内部用来发送消息的线程池的线程数量是16个,其实这个参数可以根据机器的CPU核心数进行适当调整,例如如果你的机器核心数超过16个,就可以把这个参数适当调大。
3、Linux内核参数定制
我们在部署RocketMQ的时候,还需要对Linux内核参数进行一定的定制。例如
- ulimit,需要进行大量的网络通信和磁盘IO。
- vm.extra_free_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)
- vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。
- vm.max_map_count,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。
- vm.swappiness,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。
- File descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350。
这些参数在CentOS7中的配置文件都在
/proc/sys/vm
目录下。
另外,RocketMQ的bin目录下有个os.sh里面设置了RocketMQ建议的系统内核参数,可以根据情况进行调整。
评论区