快捷搜索:  汽车  科技

rocketmq的队列模式(消息队列之RocketMQ)

rocketmq的队列模式(消息队列之RocketMQ)一个消息队列通常只允许被一个消费者消费 一个消费者同时可以消费多个消息队列表达式(TAG/SQL92) 类过滤同一个消费组下,同一条消息只能被其中一个消费者消费 消费进度保存在Broker监听即可消费 消费进度保存在消费端正常理解,推模式就是消息队列将消息推送给消费者。但在RocketMQ中,推模式是在pull模式上包装了一层,一个拉取任务完成后再拉取下一个。 拉模式PULL

Rocketmq

rocketmq的队列模式(消息队列之RocketMQ)(1)

常见问题autoCreateTopicEnable机制

前提条件 Broker启动时会判断是否开启主题自动创建,若开启则会构建默认主题的路由信息(队列数默认8)并发送给NameServer

流程 消费者发送消息,主题topicA --->发现本地缓存中无对应topicA的路由信息 --->消费者客户端向NameServer获取topicA路由信息 --->NameServer发现没有topicA路由信息,因此返回空 --->消费者客户端使用默认主题再次查询路由信息 --->NameServer返回默认主题路由信息(1、broker若开启了自动创建,则启动时已经构建了默认主题的路由信息并已注册到NameServer),如果未找到默认主题路由信息则返回空 --->若默认主题返回空,则报空主题异常;

不为空时,客户端选择默认主题的一个队列,并构造消息(包含默认主题、原主题、模式等) --->broker收到默认主题的消息后,会提取出原主题并构造该主题的路由信息 --->构建好的路由信息定时发送到NameServer --->客户端再次发送消息时就会拿到该主题的消息队列

特别注意:(先不考虑高并发场景下) 1、使用默认主题获取队列,此时只获取了一个队列,然后客户端向这个队列发送默认主题的消息,显然只有该队列所在的服务器能够创建原主题的消息队列,如果在第二次消息发送前被注册到了NameServer,那么以后发送消息都会发送到当前服务器,其他服务器就没有机会创建消息队列。 2、broker启动时默认主题默认创建8个消息队列,而客户端默认创建4个。通过默认主题创建消息队列时,broker默认值和客户端默认值取最小。 3、broker会定时把内存中的路由信息刷新到磁盘 ack卡进度

消费者(Consumer)消费模式集群模式(默认)

同一个消费组下,同一条消息只能被其中一个消费者消费 消费进度保存在Broker

广播模式

监听即可消费 消费进度保存在消费端

消息获取方式推模式PUSH

正常理解,推模式就是消息队列将消息推送给消费者。但在RocketMQ中,推模式是在pull模式上包装了一层,一个拉取任务完成后再拉取下一个。 拉模式PULL

消息过滤模式

表达式(TAG/SQL92) 类过滤

重新分布机制消息队列负载

一个消息队列通常只允许被一个消费者消费 一个消费者同时可以消费多个消息队列

消费方式

普通消费

顺序消费

只有当前消费成功,才消费下一条,没有RECONSUME_LATER 只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费 定时消息

重试机制默认重试16次

消息消费失败,该消息本身会被ack,保证消费不被阻塞。同时被重新发回给broker,broker将其放在重试队列,走重试流程 达到重试次数后,放入死信队列 1 10秒 9 7分钟 2 30秒 10 8分钟 3 1分钟 11 9分钟 4 2分钟 12 10分钟 5 3分钟 13 20分钟 6 4分钟 14 30分钟 7 5分钟 15 1小时 8 6分钟 16 2小时

触发条件Exception异常

一般重复16次 10s、30s、1mins、2mins、3mins等

超时

超时情况,MQ会无限制的发送消息给消费端 进度未提交 返回null

订阅重试消息

主题名:%RETRY% 消费组名 消费者启动的时候会自动订阅该主题 注意

消费确认机制

ack可能丢失,所以存在重复消费问题

特点

Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接 定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定 Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知 Consumer每隔30s(由clientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接 并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费 当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉

线程模型

单Consumer实例 多worker线程模型 为每个队列分配一个 PullRequest,并将其放入 pullRequestQueue,PullMessageService 线程会不断轮询从 pullRequestQueue 中取出 PullRequest 去拉取消息,接着将拉取到的消息给到 ConsumeMessageService 处理

从哪儿消费策略CONSUME_FROM_LAST_OFFSET(默认)

从该队列最尾开始消费,即跳过历史消息

CONSUME_FROM_FIRST_OFFSET从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍

不一定从offset=0开始,因为历史消息只保留一部分

CONSUME_FROM_TIMESTAMP

从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前

注意

以上策略只针对新加入的组,已经存在的组不生效 已存在的组会按实际消费进度消费

进度管理机制group queue为单位

一个group可能订阅多个topic、tag,但本质上是一个group对应多个queueId

问题进度如何跟tag结合?

broker收到拉取请求,此时在一定范围内过滤出相应的消息(即拉取的一批消息下标不一定是连续的),消费完成后,ack最大下标即可

消费进度如何保存?

本地保存 定时任务同步到broker Ack卡进度怎么办?

生产者(Producer)

单个JVM有个MQClientManager实例,维护map<String,MQClientInstance>,一个clientId只会创建一个MQClientInstance

其中 clientId=客户端IP instance( unitname可选) 若instance为默认值,则会使用进程ID

同一个JVM不同的消费者、生产者在启动时获取到的MQClientInstance实例是同一个

消息发送流程验证消息

主题不能为空 消息体不能为空 消息长度不能为0且最大不能超过4M

查找路由查找

第一次发送消息时,本地没有路由缓存,消息发送之前需要先通过主题名称从NameServer获取该主题的路由信息,若未找到,则尝试用默认主题查询,如果autoCreateTopicEnable为true,则NameServer返回路由信息,否则抛出未找到路由的异常。

1、通过默认主题找的路由信息 替换路由信息中读写队列个数为消息生产者默认的队列个数(可配置) 2、通过topic找到路由信息 与本地缓存的路由信息对比并更新

注意: 1、从NameServer获取路由信息以后,会做本地缓存,第二次直接取缓存 2、客户端会产生一个定时任务,默认30秒从NameServer获取一次路由信息并更新本地缓存

选择

一次消息发送,可能会执行多次队列选择

第一次选择队列1,发送消息失败,此时会把该失败队列放到失败队列中,并重新选择队列,第二次选择时会规避第一次失败的队列 故障延迟机制

发送

1、为消息体生成全局唯一ID 2、消息体超过4K则启用zip压缩 从nameServer获取broker服务地址列表 消息发送

发送方式同步

指:消息发送后,必须等待服务器响应,只有结果允许的情况下才可执行下一步

引入重试机制

异步

指:消息发送后无需等待服务器响应,只需提供一个回调方法,待服务器响应或者超时后的结果处理。

出现网络异常、超时等时不进行重试

单向

指:调用消息发送API后无需等待服务器返回发送结果,也无提供回调方法

无重试机制 批量发送

重试机制异步发送

根据设定的次数进行重试

同步发送

不重试 如果发送的过程中超时,则直接抛出异常,不重试(有可能是重试的过程中)

特点Producer与Name Server集群中的其中一个节点(随机选择)建立长连接

如何切换 定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳 Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败 Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接 Group

消息存储

所有主题的消息均存储在一个文件上,以保证消息发送时顺序写文件。然而消息消费是基于主题,为了提高消费效率,引入了ConsumeQueue消息队列文件,每个消息队列都有自己的消息队列文件

存储文件主要消息文件Comitlog

消息存储文件,将

位置: 每个文件默认1G(大小可修改),

所有主题的消息都存储在该文件中默认是1GB位置消息队列文件ConsumeQueue

消息消费队列,消息到达comitlog后,将异步发送到该文件,供消费者消费,可以将该文件看做commitlog文件的索引

位置:${ROCKETMQ_HOME}/store/consumeQueue 每个主题一个子目录,每个主题有多个消息队列,所以该主题目录下又根据消息队列的ID依次创建了子目录,如topic-test/0 topic-test/1 存储结构:commitlog-offset:message-size:tag-hashcode ConsumeQueue并不全量存储消息,只存储对应消息在commitlog文件中的偏移量 消息长度 tag-hashcode

问题 1、整个消息全量异步发送吗?

Hash索引文件IndexFile

IndexFile(索引文件)则只是为了消息查询提供了一种通过key或时间区间来查询消息的方法 为了加速消息的检索性能

检测点文件

checkPoint

关闭异常文件

abort

其他

checkPoint

消息文件过期机制

commitLog和ConsumeQueue公用一套过期删除机制

判断如果非当前写文件在一定时间内没有再被更新,则被当作过期文件

默认72小时 mq默认10s判断一次

删除(满足任何一个)到达时间点

默认凌晨4点 磁盘空间不足 预留的手工触发

特别注意

删除时不会判断文件中的消息是否已经被消费 消息文件存储空间报警机制 同步/异步双写 同步/异步复制

刷盘同步

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入

优点

数据可靠性很高

缺点

写入慢,并发低

异步

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态

优点

写入很快,并发高

缺点

如果内存刷入磁盘前发生故障,数据容易丢失

消息过滤FilterServer表达式TAG

基于Topic tag,比较好理解

SQL92

对消息的属性运用SQL过滤表达式进行条件匹配 类

主从同步(HA)机制

RocketMQ主从复制原理 RocketMQ读写分离机制

实战

消息批量发送 消息发送队列自选择 消息过滤 事务消息 Spring整合RocketMQ Spring Cloud整合RocketMQ RocketMQ监控与运维命令 应用场景分析 常见问题 面试

服务组成与理论

NameServer负责broker的注册,主题路由信息的收集等

broker每30秒向NameServer发送心跳包,包中包含broker所拥有的的路由信息,NameServer收集所有broker的路由信息并整理

消息生产者客户端每30秒向NameServer获取路由信息并更新本地缓存的路由,在发送消息前如果不存在该主题的路由则会主动从NameServer获取该主题路由信息

路由中心NameServer

默认端口 9876 Broker的注册,Broker的路由,Broker生命周期的管理

作用: 1、为生产者、消费者提供关于主题Topic的路由信息 2、Nameserver相当于broker的注册中心,在集群结构中,NameServer之间并不通信。 3、NameServer与所有Broker保持长链接,每隔10秒检测一次本地的Broker信息,如果连续120S没有收到Broker的心跳包,则将移除该broker的路由信息并关闭socket连接(Broker每30秒向NameServer发送心跳)

部署

Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

部署时,可以启动多个,相互之间完全独立 配置

数据类型

路由信息 broker服务器信息

功能路由管理

broker每30秒向NameServer发送心跳,包含路由信息 服务注册与发现

特点

nameserver之间不通信 与broker保持长链接,30S心跳 Name Server定时(每隔10s)扫描所有存活broker的连接 Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接

Broker

Broker正常关闭时会告诉NameServer

属性brokerAddr

可以手动指定broker的ip地址,在向NameServer注册的时候如果配置了brokerAddr则注册该地址

brokerId

0:master 大于0:slave

brokerName

broker名称

clusterName

broker所在的集群名称,通过该名称区分broker所在的集群

每个broker在

haServerAddr

master主节点地址,初次请求该值为空,slave向NameServer注册后返回并赋值

启动流程

1、在Broker启动流程中,会构建TopicConfigManager对象,其构造方法中首先会判断是否开启了允许自动创建主题,如果启用了自动创建主题,则向topicConfigTable中添加默认主题的路由信息 注意:此时默认主题队列数为8 2、Broker启动时向所有配置的NameServer发送心跳信息,每隔30秒向集群中所有的NameServer发送心跳包 3、心跳包包含 brokerId、名称、网络地址、集群名称、关联的FilterServer地址列表,broker所拥有的的路由信息(包括默认路由)

部署master

BrokerId为0

slave

BrokerId非0

概要

通过指定相同的Broker Name关联 配置

客户端路由发现

1、当NameServer的路由信息发生变化时,不会主动推给客户端,而是由客户端定期根据主题获取路由信息 扩容

主从同步机制(HA)

主服务器启动,并监听从服务器的连接 从服务器主动链接主服务器,并携带待拉取的偏移量Master与slave的HA心跳发送间隔是5s

同步主从

消息发送者将消息刷到磁盘以后,需要继续等待数据传输到从服务器

异步主从

数据到达主broker后并完成持久化后理解返回,然后异步同步到从broker

读写分离机制

消费者先向主服务器发起拉取消息请求,然后主服务器返回一批消息,同时会根据主服务器负载压力与主从同步情况,然后建议客户端下次拉取消息是从主服务器还是从服务器拉取

回溯消息

已消费消息可再次消费,支持按时间回溯消费

消息类型延时消息

延迟级别大于0,则将消息的原主题和原消息队列ID存入消息属性中,用延迟消息主题SCHEDULE_TOPIC存储,到时间则将消息发送出去

事务消息事务消息实现思想

两阶段提交 定时事务状态会查 决定提交还是回滚 事务消息发送流程 提交或回滚事务 事务消息回查事务状态

组成

msgId

猜您喜欢: