快捷搜索:  汽车  科技

rocketmq订阅机制和定时消息(RocketMQmsgId与)

rocketmq订阅机制和定时消息(RocketMQmsgId与)消息发送结果执行效果如图所示:消息发送返回结果1.2 从消息消费看消息ID消息消费Demo

本篇详细介绍消息发送、消息消费、RocketMQ queryMsgById 命令以及 rocketmq-console 等使用场景中究竟是用的哪一个ID。

1、抛出问题

1.1 从消息发送看消息ID

rocketmq订阅机制和定时消息(RocketMQmsgId与)(1)

消息发送Demo

执行效果如图所示:

rocketmq订阅机制和定时消息(RocketMQmsgId与)(2)

消息发送返回结果

1.2 从消息消费看消息ID

rocketmq订阅机制和定时消息(RocketMQmsgId与)(3)

消息消费Demo

执行效果如图所示:

rocketmq订阅机制和定时消息(RocketMQmsgId与)(4)

消息发送结果

不知道大家是否有注意到,调用 msgs.get(0).getMsgId()返回的msgId 与直接输出msgs中的 msgId 不一样,那这又是为什么呢?答案在本文的第二部分有详细分析。

2、消息ID释疑

从消息发送的结果可以得知,RocketMQ 发送的返回结果会返回msgId 与 offsetMsgId,那这两个 msgId 分别是代表什么呢?

  • msgId:该ID 是消息发送者在消息发送时会首先在客户端生成,全局唯一,在 RocketMQ 中该 ID 还有另外的一个叫法:uniqId,无不体现其全局唯一性。
  • offsetMsgId:消息偏移ID,该 ID 记录了消息所在集群的物理地址,主要包含所存储 Broker 服务器的地址( IP 与端口号)以及所在commitlog 文件的物理偏移量。

2.1 msgId 即全局唯一 ID 构建规则

rocketmq订阅机制和定时消息(RocketMQmsgId与)(5)

全局唯一ID构建代码

从这张图可以看出,msgId 是由客户端生成的,接下来我们详细分析一下其生成算法。

rocketmq订阅机制和定时消息(RocketMQmsgId与)(6)

MessageClientIDSetter#createUniqID

一个 uniqID 的构建主要分成两个部分:FIX_STRING 与唯一 ID 生成算法,顾名思义,FIX_STRING 就是一个客户端固定一个前缀,那接下来先看一下固定字符串的生成规则。

2.1.1 FIX_STRING

rocketmq订阅机制和定时消息(RocketMQmsgId与)(7)

MessageClientIDSetter静态代码块

从这里可以看出 FIX_STRING 的主要由:客户端的IP、进程ID、加载 MessageClientIDSetter 的类加载器的 hashcode。

2.1.2 唯一性算法

msgId 的唯一性算法由 MessageClientIDSetter 的createUniqIDBuffer 方法实现。

rocketmq订阅机制和定时消息(RocketMQmsgId与)(8)

MessageClientIDSetter#createUniqIDBuffer

可以得出 msgId 的后半段主要由:当前时间与系统启动时间的差值,以及自增序号。

2.2 offsetMsgId 构建规则

rocketmq订阅机制和定时消息(RocketMQmsgId与)(9)

offsetMsgId 构建规则

在消息 Broker 服务端将消息追加到内存后会返回其物理偏移量,即在 commitlog 文件中的文件,然后会再次生成一个id,代码中虽然也叫 msgId,其实这里就是我们常说的 offsetMsgId,即记录了消息的物理偏移量,故我们重点来看一下其具体生成规则:

rocketmq订阅机制和定时消息(RocketMQmsgId与)(10)

MessageDecoder#createMessageId

首先结合该方法的调用上下文,先解释一下该方法三个入参的含义:

  • ByteBuffer input
    用来存放 offsetMsgId 的字节缓存区( NIO 相关的基础知识)
  • ByteBuffer addr
    当前 Broker 服务器的 IP 地址与端口号,即通过解析 offsetMsgId 从而得到消息服务器的地址信息。
  • long offset
    消息的物理偏移量。

即构成 offsetMsgId 的组成部分:Broker 服务器的 IP 与端口号、消息的物理偏移量。

温馨提示:即在 RocketMQ中,只需要提供 offsetMsgId,可用不必知道该消息所属的topic信息即可查询该条消息的内容。

2.3 消息发送与消息消费返回的消息ID信息

消息发送时会在 SendSesult中返回 msgId、offsetMsgId,在了解了这个两个 ID 的含义时则问题不大,接下来重点介绍一下消息消费时返回的 msgId 到底是哪一个。

在消息消费时,我们更加希望因为 msgId (即客户端生成的全局唯一性ID),因为该全局性 ID 非常方便实现消费端的幂等。

在本文的1.2节我们也提到一个现象,为什么如下图代码中输出的 msgId 会不一样呢?

rocketmq订阅机制和定时消息(RocketMQmsgId与)(11)

在客户端返回的 msg 信息,其最终返回的对象是 MessageClientExt ,继承自 MessageExt。

那我们接下来分别看一下其 getMsgId() 方法与 toString 方法即可。

rocketmq订阅机制和定时消息(RocketMQmsgId与)(12)

MessageClientExt#getMsgId()

原来在调用 MessageClientExt 中的 getMsgId 方法如果消息的属性中存在其唯一ID,则返回消息的全局唯一ID,否则返回消息的 offsetMsgId。

而 MessageClientExt 方法并没有重写 MessageExt 的 toString 方法,其实现如图所示:

rocketmq订阅机制和定时消息(RocketMQmsgId与)(13)

MessageClientExt#toString

故返回的是 MessageExt中 的 msgId,该 msgId 存放的是 offsetMsgId,所以才造成了困扰。

温馨提示:如果消息消费失败需要重试,RocketMQ 的做法是将消息重新发送到 Broker 服务器,此时全局 msgId 是不会发生变化的,但该消息的 offsetMsgId 会发生变化,因为其存储在服务器中的位置发生了变化。

3、实践经验

在回答了消息发送与消息消费关于 msgId 与 offsetMsgId 的困扰后,再来介绍一下如果根据 msgId 去查询消息。

想必大家对 rocketmq-console ,那在消息查找界面,展示的消息列表中返回的 msgId 又是哪一个呢?

rocketmq订阅机制和定时消息(RocketMQmsgId与)(14)

RocketMQ-Console 查找消息界面

这里的 Message ID 返回的是消息的全局唯一ID。

其实 RokcetMQ 也提供了 queryMsgById 命令来查看消息的内容,不过这里的 msgId 是 offsetMsgId,我们首先将全局唯一ID传入命令,其执行效果如下:

rocketmq订阅机制和定时消息(RocketMQmsgId与)(15)

queryMsgById 示例

发现报错,那我们将 offsetMsgId 传入其执行效果如图所示:

rocketmq订阅机制和定时消息(RocketMQmsgId与)(16)

但在 rocketmq-console 中根据消息ID去查找消息,无论传入哪个msgId,下图该功能都能返回正确的结果:

rocketmq订阅机制和定时消息(RocketMQmsgId与)(17)

这是因为 rocketmq-console 做了兼容,首先将传入的 msgId 用 queryMsgById 该命令去查,如果报错,则当成 uniqID(全局ID)去查,首先全局ID会存储在消息的属性中,并会创建 Hash 索引,即可用通过 indexfile 快速定位到该条消息。

关于 RocketMQ 消息ID的相关问题就介绍到这里了。您的三连(点赞 转发 关注)是对我最大的鼓励。

猜您喜欢: