如何在adc中制定航线:消息轨迹ACL与多副本搭建
如何在adc中制定航线:消息轨迹ACL与多副本搭建默认参数属性消息轨迹使用1. Broker 设置RocketMQ 的默认消息轨迹功能默认是关闭的,可以将 Broker 属性 traceTopicEnable 设置为 true 开启。消息轨迹默认存储在 RMQ_SYS_TRACE_TOPIC 的主题中,可以通过 msgTraceTopicName 修改。
消息轨迹
消息轨迹含义
一条消息什么时候由哪台机器产生的、发送的耗时、消息大小、发送状态、存储在哪个 Broker 上、什么时候存储的以及存储在哪台 Broker 上、什么时候消费的、消费状态等信息,这些信息即消息轨迹,用于追踪消息从诞生到被消费的整个生命周期。
这些信息对于业务同学排查定位有着重要的意义,发送和消费往往在不同的业务部门。有了消息轨迹后一条消息有没有发送,发送成功了没,有没有消费一目了然,降低了彼此的沟通成本。
消息轨迹使用
1. Broker 设置
RocketMQ 的默认消息轨迹功能默认是关闭的,可以将 Broker 属性 traceTopicEnable 设置为 true 开启。消息轨迹默认存储在 RMQ_SYS_TRACE_TOPIC 的主题中,可以通过 msgTraceTopicName 修改。
属性 |
默认参数 |
traceTopicEnable |
false |
msgTraceTopicName |
RMQ_SYS_TRACE_TOPIC |
2. 发送端使用
发送轨迹 API
public DefaultMQProducer(final String producerGroup boolean enableMsgTrace final String customizedTraceTopic){
this(null producerGroup null enableMsgTrace customizedTraceTopic);
}
说明:enableMsgTrace 是否开启发送轨迹,默认 false;customizedTraceTopic 设置收集消息轨迹的自定义主题,默认为 RMQ_SYS_TRACE_TOPIC。
发送代码示例
public static void main(String[] args) throws MQClientEXCEPTION InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName" true);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i )
try {
Message msg = new Message("TopicTest"
"TagA"
"OrderID111"
"Hello world".getBytes(RemotingHelper.DEFAULT_charSET));
SendResult SendResult = producer.send(msg);
System.out.printf("%s%n" sendResult);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
说明:创建 DefaultMQProducer 时将 enableMsgTrace 设置为 true 开启发送消息轨迹。
3. 消费端使用
消费轨迹 API
public DefaultMQPushConsumer(final String consumerGroup boolean enableMsgTrace final String customizedTraceTopic) {
this(null consumerGroup null new AllocateMessageQueueAveragely() enableMsgTrace customizedTraceTopic);
}
说明:enableMsgTrace 是否开启消费轨迹,默认 false;customizedTraceTopic 设置收集消息轨迹的自定义主题,默认为 RMQ_SYS_TRACE_TOPIC。
消费代码示例
public static void main(String[] args) throws InterruptedException MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1" true);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest" "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n" Thread.currentThread().getName() msgs);
RETURN ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
说明:创建 DefaultMQPushConsumer 将 enableMsgTrace 设置为 true 开启消费消息轨迹。
4. 消息轨迹效果
通过发送和消费一条消息,在 RocketMQ-Console 中看下消息轨迹的效果截图。
发送消息内容
SendResult [sendStatus=SEND_OK msgId=A9FE1075810A18B4AAC24A40738B0000 offsetMsgId=A9FE107500002A9F0000000000002147 messageQueue=MessageQueue [topic=TopicTest brokerName=liangyong queueId=1] queueOffset=2]
消费消息内容
Receive New Messages: [MessageExt [brokerName=liangyong queueId=1 storeSize=189 queueOffset=2 sysFlag=0 bornTimestamp=1600135337872 bornHost=/169.254.16.117:65532 storeTimestamp=1600135337883 storeHost=/169.254.16.117:10911 msgId=A9FE107500002A9F0000000000002147 commitLogOffset=8519 bodyCRC=198614610 reconsumeTimes=0 preparedtransactionOffset=0 toString()=Message{topic='TopicTest' flag=0 properties={MIN_OFFSET=0 MAX_OFFSET=3 KEYS=OrderID111 CONSUME_START_TIME=1600135337915 UNIQ_KEY=A9FE1075810A18B4AAC24A40738B0000 WAIT=true TAGS=TagA} body=[72 101 108 108 111 32 119 111 114 108 100] transactionId='null'}]]
消息轨迹展现
在 RocketMQ 控制台,可以通过 message key 或者 message id 检索消息内容,如下图:
点击 MESSAGE TRACE DETAIL 可以查看消息轨迹,如下图:
消息轨迹原理
发送轨迹原理:在消息发送前与发送后收集指标信息,并将指标信息异步发送到轨迹主题。
消费轨迹原理:消费的消息轨迹有两部分,一部分在拉取消息后处理消息前收集指标异步发送都轨迹主题;另一部分处理消息后收集消息指标异步发送到轨迹主题。
轨迹格式说明
消息轨迹类型有三种,Pub 指发送轨迹,SubBefore 指消费前轨迹,SubAfter 指消费后轨迹。
发送轨迹 Pub
名称 |
说明 |
TraceType |
Pub 表示发送轨迹 |
timeStamp |
存储时间 |
regionId |
机房可用区,默认为 DefaultRegion(目前没有使用) |
groupName |
生产者组 producerGroup |
topic |
主题名称 |
msgId |
消息 ID,由客户端生成 |
tags |
消息 tag |
keys |
消息 key |
storeHost |
消息存储 Broker IP |
bodyLength |
消息体大小 |
costTime |
发送消息耗时 |
msgType |
消息类型:普通消息(Normal_Msg)、事物半消息(Trans_Msg_Half)、 事物提交消息(Trans_msg_Commit)、延迟消息(Delay_Msg) |
offsetMsgId |
消息 Id,由 Broker 生成 |
isSuccess |
发送是否成功,true 表示成功、false 表示失败 |
消费前轨迹 SubBefore
名称 |
说明 |
traceType |
SubBefore 表示消费前轨迹 |
timeStamp |
消息存储时间 |
regionId |
机房可用区(目前未使用) |
groupName |
消费组名称 |
requestId |
请求标识 |
msgId |
消息 Id |
retryTimes |
重试次数 |
keys |
消息 key |
消费前轨迹 SubAfter
名称 |
说明 |
traceType |
SubAfter 表示消费后轨迹 |
requestId |
请求标识 |
msgId |
消息 Id |
costTime |
消费耗时 |
isSuccess |
消费结果,true 消费成功、false 消费失败 |
keys |
消息 key |
contextCode |
Broker 返回的消费状态,0:SUCCESS,1:TIME_OUT,2:EXCEPTION,3:RETURNNULL,4:FAILED |
消息轨迹结语
- 在生产环境中使用消息轨迹,可以将 MQ 集群的一台 Broker 用于收集消息轨迹,避免轨迹消息对集群性能造成影响。
- 开源版本的消息轨迹中缺少消费的 IP 信息,即:我们不能查询到消息被哪个机器消费掉了。
- 开源版本中的消息轨迹组织格式用 char 字符拼接,解析使用数组,扩展性和兼容性不太友好。
- 基于此两位作者负责的 RocketMQ 集群中均未开启轨迹功能。
ACL
ACL 含义
访问控制表(Access Control List,ACL)描述用户或角色对资源的访问控制权限,RocketMQ 中的 ACL 见下表说明。
RocketMQ 中的 ACL 含义说明:
含义 |
说明 |
用户 |
在 plain_acl.yml 配置文件用 accessKey 表示 |
角色 |
admin 和其他角色 |
资源 |
包括主题和消费组 |
权限 |
DENY 表示无权限 ANY 表示拥有 PUB 或者 SUB 权限 PUB 表示拥有主题发送权限 SUB 表示拥有消费组订阅权限 |
ACL 使用示例
将 aclEnable = true 添加到 Broker 配置文件中,另外添加 ${ROCKETMQ_HOME}/conf/plain_acl.yml 文件,用于 ACL 控制。
1. Broker 配置
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
traceTopicEnable = true
aclEnable = true
说明
参数 |
含义 |
aclEnable |
默认 false,开启 ACL 需要设置为 true |
ROCKETMQ_HOME |
可以通过 -Drocketmq.home.dir 指定 MQ 根目录 |
ACL fileName |
默认 /conf/plain_acl.yml,可以通过 -Drocketmq.acl.plain.file 指定 ACL 文件名称 |
2. plain_acl.yml 配置
ACL 配置文件由全局白名单配置(globalWhiteRemoteAddresses)和账户配置(accounts)两部分构成。
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- TopicTes1=DENY
- TopicTest2=PUB|SUB
groupPerms:
- consumerTest=DENY
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
admin: true
说明
参数 |
含义 |
globalWhiteRemoteAddresses |
全局白名单配置,策略如下: 空:忽略白名单,继续执行下面校验 全匹配模式:全部放行不会执行后面校验 例如:* 或 ... 或 ::::::: 多 IP 模式:表示白名单 IP 在设置区间段的放行 例如:192.168.0.{1 2} 或 192.168.1.1 192.168.1.2 或 192.168.*. 或 192.168.1-10.5-50 |
accessKey |
唯一用户标识 |
secretKey |
访问密码 |
whiteRemoteAddress |
用户级白名单,格式同 globalWhiteRemoteAddresses |
admin |
是否为管理员,管理员拥有所有资源访问权限 |
defaultTopicPerm |
默认主题权限,默认值 DENY |
defaultGroupPerm |
默认消费组权限,默认值 DENY |
topicPerms |
详细的主题权限 |
groupPerms |
详细的消费组权限 |
3. ACL 发送示例
在上面的配置文件中,将 TopicTes1 设置了 DENY 权限,即禁止发送和消费;将 TopicTest2 设置成了 PUB|SUB 权限,即允许发送和订阅权限。下面例子尝试向主题 TopicTes1 发送消息,观察其是否可以成功。
禁止发送示例
public class AclSendTest {
private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "12345678";
public static void main(String[] args) throws MQClientException InterruptedException {
producer();
}
public static void producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName" getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i )
try {
{
Message msg = new Message("TopicTest1"
"TagA"
"OrderID188"
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n" sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY ACL_SECRET_KEY));
}
}
禁止发送截图
禁止发送说明
用户 RocketMQ 向主题 TopicTes1 发送消息时抛出 AclException,拒绝访问,如果将代码中主题换成 TopicTes2,则可以发送成功,接着看下文。
4. ACL 消费示例
在上面的配置文件中,将 consumerTest 设置了 DENY 权限,即禁止消费消息。由于 TopicTes2 设置为允许发送,我们下面尝试向 TopicTes2 发送一条消息,consumerTest 订阅了 TopicTes2 观察其是否可以消费。
允许发送示例
public class AclSendTest {
private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "12345678";
public static void main(String[] args) throws MQClientException InterruptedException {
producer();
}
public static void producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName" getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i )
try {
{
Message msg = new Message("TopicTest2"
"TagA"
"OrderID188"
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n" sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY ACL_SECRET_KEY));
}
}
允许发送结果
SendResult [sendStatus=SEND_OK msgId=C0A800667FB218B4AAC2663AB66F0000 offsetMsgId=C0A8006600002A9F00000000000085EA messageQueue=MessageQueue [topic=TopicTest2 brokerName=broker-a queueId=0] queueOffset=2]
禁止消费示例
public class AclConsumeTest {
private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "12345678";
public static void main(String[] args) throws MQClientException InterruptedException {
pushConsumer();
}
public static void pushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerTest" getAclRPCHook() new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest2" "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20180422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n" Thread.currentThread().getName() msgs);
printBody(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY ACL_SECRET_KEY));
}
private static void printBody(List<MessageExt> msg) {
if (msg == null || msg.size() == 0)
return;
for (MessageExt m : msg) {
if (m != null) {
System.out.printf("msgId : %s body : %s \n\r" m.getMsgId() new String(m.getBody()));
}
}
}
}
禁止消费截图
禁止消费说明
我们向 TopicTest2 成功发送了一条消息,但由于消费组 consumerTest 被设置成禁止消费,所有未能收到该消息。
ACL 命令汇总
RocketMQ 提供了一系列命令动态更新 Acl 配置文件,使设置的权限及时生效。
1. 获取 ACL 配置版本
使用 clusterAclConfigVersion 命令查看版本信息。
参数说明
参数 |
说明 |
-b |
Broker 地址,更新特定的 Broker |
-c |
集群名称,更新集群中的所有 Broker |
-n |
namesrv 地址 |
命令示例
$ bin/mqadmin clusterAclConfigVersion -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name #Broker Name #Broker Addr #AclConfigVersionNum #AclLastUpdateTime
DefaultCluster broker-a x.x.x.x:10911 0 2020-09-20 22:42:59
get cluster's plain access config version success.
2. 获取 Acl 权限配置
使用 getAccessConfigSubCommand 获取 ACL 的配置信息。
参数说明
参数 |
说明 |
-b |
Broker 地址,更新特定的 Broker |
-c |
集群名称,更新集群中的所有 Broker |
-n |
namesrv 地址 |
命令示例
$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
globalWhiteRemoteAddresses: [10.10.103.* 192.168.0.*]
accounts:
accessKey : RocketMQ
secretKey : 12345678
whiteRemoteAddress:
admin : false
defaultTopicPerm : DENY
defaultGroupPerm : SUB
topicPerms : [topicA=DENY topicB=PUB|SUB topicC=SUB]
groupPerms : [groupA=DENY groupB=PUB|SUB groupC=SUB]
accessKey : rocketmq2
secretKey : 12345678
whiteRemoteAddress: 192.168.1.*
admin : true
defaultTopicPerm :
defaultGroupPerm :
topicPerms :
groupPerms :
3. 更新全局白名单
使用 updateGlobalWhiteAddr 对 ACL 的全局白名单 globalWhiteRemoteAddresses 进行变更。
参数说明
参数 |
说明 |
-b |
Broker 地址,更新特定的 Broker |
-c |
集群名称,更新集群中的所有 Broker |
-n |
namesrv 地址 |
-g |
全局白名单值,例如:10.10.103. 192.168.0. |
命令示例
$ bin/mqadmin updateGlobalWhiteAddr -n x.x.x.x:9876 -c DefaultCluster -g 10.10.113.* 192.168.20.*
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
update global white remote addresses to x.x.x.x:10911 success.
查看生效
$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
globalWhiteRemoteAddresses: [10.10.113.* 192.168.20.*]
accounts:
accessKey : RocketMQ
secretKey : 12345678
whiteRemoteAddress:
admin : false
defaultTopicPerm : DENY
defaultGroupPerm : SUB
topicPerms : [topicA=DENY topicB=PUB|SUB topicC=SUB]
groupPerms : [groupA=DENY groupB=PUB|SUB groupC=SUB]
accessKey : rocketmq2
secretKey : 12345678
whiteRemoteAddress: 192.168.1.*
admin : true
defaultTopicPerm :
defaultGroupPerm :
topicPerms :
groupPerms :
说明:全局白名单已经被更新。
4. 更新用户配置
对于用户账户的配置的变更通过 updateAclConfig 来实现。
参数说明
参数 |
说明 |
-a |
指定 accessKey,变更哪个用户的配置 |
-b |
Broker 地址,更新特定的 Broker |
-c |
集群名称,更新集群中的所有 Broker |
-n |
namesrv 地址 |
-g |
设置 groupPerms 消费组权限,格式为:groupD=DENY groupD=SUB |
-i |
设置 Acl 文件中 defaultTopicPerm 权限 |
-m |
设置 Acl 文件中 admin 权限 |
-s |
设置 Acl 文件中 secretKey 密钥值 |
-t |
设置 topicPerms 主题权限,格式为:topicA=DENY topicD=SUB |
-u |
设置 Acl 文件中的默认消费组 defaultGroupPerm 权限 |
-w |
设置 Acl 文件中该用户下的白名单权限 whiteRemoteAddress |
命令示例
$ bin/mqadmin updateAclConfig -n x.x.x.x:9876 -c DefaultCluster -a RocketMQ -s 87654321 -t testTopicA=DENY testTopicb=SUB
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
create or update plain access config to x.x.x.x:10911 success.
查看生效
$ bin/mqadmin getAccessConfigSubCommand -n uat-mq2.ttbike.com.cn:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
globalWhiteRemoteAddresses: [10.10.113.* 192.168.20.*]
accounts:
accessKey : rocketmq2
secretKey : 12345678
whiteRemoteAddress: 192.168.1.*
admin : true
defaultTopicPerm :
defaultGroupPerm :
topicPerms :
groupPerms :
accessKey : RocketMQ
secretKey : 87654321
whiteRemoteAddress:
admin : false
defaultTopicPerm : DENY
defaultGroupPerm : SUB
topicPerms : [testTopicA=DENY testTopicb=SUB]
groupPerms : [groupA=DENY groupB=PUB|SUB groupC=SUB]
说明:用户 RocketMQ 的密钥 secretKey 和主题权限 topicPerms 已变更生效。
5. 删除用户配置
通过 deleteAccessConfig 删除指定用户的 ACL 配置信息。
参数说明
参数 |
说明 |
-b |
Broker 地址,更新特定的 Broker |
-c |
集群名称,更新集群中的所有 Broker |
-n |
namesrv 地址 |
-a |
指定特定用户 accessKey |
命令示例
$ bin/mqadmin deleteAccessConfig -n x.x.x.x:9876 -c DefaultCluster -a RocketMQ
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
delete plain access config account to x.x.x.x:10911 success.
查看生效
$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
globalWhiteRemoteAddresses: [10.10.113.* 192.168.20.*]
accounts:
accessKey : rocketmq2
secretKey : 12345678
whiteRemoteAddress: 192.168.1.*
admin : true
defaultTopicPerm :
defaultGroupPerm :
topicPerms :
groupPerms :
说明:用户 RocketMQ 的相关 Acl 配置已被全部删除。
ACL 原理简述
1. 规则加载
规则配置在 plain_acl.yml 文件中,需要加载到 Broker 缓存中使其生效。启动时会加载 acl 文件的内容,当其变更了也需要动态加载规则内容,详见如下流程。
2. 权限校验
注册的钩子程序通过 NettyServerHandler 实现,在 Broker 当前的 channel 接收到客户端消息时执行校验逻辑,入口为 NettyServerHandler#channelRead0() 以及 processRequestCommand#doBeforeRpcHooks。规则校验的入口位于 PlainAccessValidator#validate 方法,下面是校验简图。
ACL 结语
- 评估是否需要 ACL 通常大部分场景是不需要的,如果对数据敏感可以通过对内容加密实现。
- 如果几千个资源(主题和消费组)都配置了 ACL 那么配置文件是庞大的。
- 如果要用建议局部使用,比如:资金往来的主题、消费组等。
- 基于此两位作者负责的 RocketMQ 集群均未开启 ACL 功能。
多副本搭建
多副本意义
RocketMQ 开源版本在 4.5.0 版本开始支持多副本(DLedger),在以前的版本中只支持主从模式。
主从模式存在的问题:
- 如果主节点挂掉了不能动态切换到从节点,这一组 Broker 节点不能提供写入服务;
- 设置主从异步复制模式时,如果主节点意外挂掉,数据可能没有全部复制到从节点,存在数据丢失风险。
多副本使用 Raft 协议在节点意外掉线后能够完成自动选主,提高集群的高可用和保证数据的一致性。
多副本搭建
由于 DLedger 基于 Raft 协议开发的功能,需要过半数选举,最少 3 个节点组成一个 Raft 组。
broker-n0.conf
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
broker-n1.conf
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30921
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node01
storePathCommitLog=/tmp/rmqstore/node01/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16
broker-n2.conf
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30931
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node02
storePathCommitLog=/tmp/rmqstore/node02/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16
启动三个节点:
nohup bin/mqbroker -c conf/dledger/broker-n0.conf &
nohup bin/mqbroker -c conf/dledger/broker-n1.conf &
nohup bin/mqbroker -c conf/dledger/broker-n2.conf &
查看是否启动成功:
$ bin/mqadmin clusterList -n localhost:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
RaftCluster RaftNode00 0 x.x.x.x:30921 V4_7_0 0.00(0 0ms) 0.00(0 0ms) 0 444663.49 -1.0000
RaftCluster RaftNode00 1 x.x.x.x:30911 V4_7_0 0.00(0 0ms) 0.00(0 0ms) 0 444663.49 -1.0000
RaftCluster RaftNode00 3 x.x.x.x:30931 V4_7_0 0.00(0 0ms) 0.00(0 0ms) 0 444663.49 -1.0000
说明:BID 为 0 表示表示 Master,其他两个均为 Follower。
控制台截图:
查看发送消息:
说明:通过以上步骤,我们完成多副本的搭建过程。
重新选主
我们通过 kill 掉 Master 的方式来验证 DLedger 选主情况,上面的 clusterList 截图中我们看到 Master 为 x.x.x.x:30921,将该进程 kill 掉后观察一下。
$ bin/mqadmin clusterList -n localhost:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
RaftCluster RaftNode00 0 x.x.x.x:30931 V4_7_0 0.00(0 0ms) 0.00(0 0ms) 0 444664.03 -1.0000
RaftCluster RaftNode00 1 x.x.x.x:30911 V4_7_0 0.00(0 0ms) 0.00(0 0ms) 0 444664.03 -1.0000
说明:kill 掉原 Master 后,完成自动选主,新的 Master 为 x.x.x.x:30931。
参数说明
配置文件中多副本的参数说明见下面表格。
参数 |
说明 |
enableDLegerCommitLog |
是否启用 DLedger,默认 false |
dLegerGroup |
节点所属的 Raft 组,建议与 broker 一致 |
dLegerPeers |
集群节点信息,示例:n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 |
dLegerSelfId |
当前节点 id。取自 legerPeers 中条目的开头,即上述示例中的 n0,并且特别需要强调,只能第一个字符为英文,其他字符需要配置成数字 |
参考资料
Raft 的学习资料见下面链接,供学习使用。DLedger 的源码解读,见《RocketMQ 技术内幕》第二版。
- Raft 论文译文
- Raft 论文原文
- Raft 动画
- Raft 可视化操作
多副本结语
使用多副本时,请做好压测,压测的 TPS 是否满足业务的需求,作者曾做过多副本压测与主从异步的 TPS 有相当大的差距。
在 TPS 满足的情况下,建议使用多副本架构,尤其是支付类可以优先使用。
如果线上已经存在了主从默认的架构如何升级到 DLedger 模式呢?
- 可以参考前面平滑扩所容的方式,将多副本组成的 Raft 组加入到原集群中
- 关闭原主从架构节点的写入权限
- 在过了日志存储时间后,将主从架构节点下线