rocketmq的队列模式(消息队列之RocketMQ)
rocketmq的队列模式(消息队列之RocketMQ)一个消息队列通常只允许被一个消费者消费 一个消费者同时可以消费多个消息队列表达式(TAG/SQL92) 类过滤同一个消费组下,同一条消息只能被其中一个消费者消费 消费进度保存在Broker监听即可消费 消费进度保存在消费端正常理解,推模式就是消息队列将消息推送给消费者。但在RocketMQ中,推模式是在pull模式上包装了一层,一个拉取任务完成后再拉取下一个。 拉模式PULL
Rocketmq 常见问题autoCreateTopicEnable机制前提条件 Broker启动时会判断是否开启主题自动创建,若开启则会构建默认主题的路由信息(队列数默认8)并发送给NameServer
流程 消费者发送消息,主题topicA --->发现本地缓存中无对应topicA的路由信息 --->消费者客户端向NameServer获取topicA路由信息 --->NameServer发现没有topicA路由信息,因此返回空 --->消费者客户端使用默认主题再次查询路由信息 --->NameServer返回默认主题路由信息(1、broker若开启了自动创建,则启动时已经构建了默认主题的路由信息并已注册到NameServer),如果未找到默认主题路由信息则返回空 --->若默认主题返回空,则报空主题异常;
不为空时,客户端选择默认主题的一个队列,并构造消息(包含默认主题、原主题、模式等) --->broker收到默认主题的消息后,会提取出原主题并构造该主题的路由信息 --->构建好的路由信息定时发送到NameServer --->客户端再次发送消息时就会拿到该主题的消息队列
特别注意:(先不考虑高并发场景下) 1、使用默认主题获取队列,此时只获取了一个队列,然后客户端向这个队列发送默认主题的消息,显然只有该队列所在的服务器能够创建原主题的消息队列,如果在第二次消息发送前被注册到了NameServer,那么以后发送消息都会发送到当前服务器,其他服务器就没有机会创建消息队列。 2、broker启动时默认主题默认创建8个消息队列,而客户端默认创建4个。通过默认主题创建消息队列时,broker默认值和客户端默认值取最小。 3、broker会定时把内存中的路由信息刷新到磁盘 ack卡进度
消费者(Consumer)消费模式集群模式(默认)同一个消费组下,同一条消息只能被其中一个消费者消费 消费进度保存在Broker
广播模式监听即可消费 消费进度保存在消费端
消息获取方式推模式PUSH正常理解,推模式就是消息队列将消息推送给消费者。但在RocketMQ中,推模式是在pull模式上包装了一层,一个拉取任务完成后再拉取下一个。 拉模式PULL
消息过滤模式表达式(TAG/SQL92) 类过滤
重新分布机制消息队列负载一个消息队列通常只允许被一个消费者消费 一个消费者同时可以消费多个消息队列
消费方式普通消费
顺序消费只有当前消费成功,才消费下一条,没有RECONSUME_LATER 只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费 定时消息
重试机制默认重试16次消息消费失败,该消息本身会被ack,保证消费不被阻塞。同时被重新发回给broker,broker将其放在重试队列,走重试流程 达到重试次数后,放入死信队列 1 10秒 9 7分钟 2 30秒 10 8分钟 3 1分钟 11 9分钟 4 2分钟 12 10分钟 5 3分钟 13 20分钟 6 4分钟 14 30分钟 7 5分钟 15 1小时 8 6分钟 16 2小时
触发条件Exception异常一般重复16次 10s、30s、1mins、2mins、3mins等
超时超时情况,MQ会无限制的发送消息给消费端 进度未提交 返回null
订阅重试消息主题名:%RETRY% 消费组名 消费者启动的时候会自动订阅该主题 注意
消费确认机制ack可能丢失,所以存在重复消费问题
特点Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接 定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定 Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知 Consumer每隔30s(由clientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接 并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费 当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉
线程模型单Consumer实例 多worker线程模型 为每个队列分配一个 PullRequest,并将其放入 pullRequestQueue,PullMessageService 线程会不断轮询从 pullRequestQueue 中取出 PullRequest 去拉取消息,接着将拉取到的消息给到 ConsumeMessageService 处理
从哪儿消费策略CONSUME_FROM_LAST_OFFSET(默认)从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_FIRST_OFFSET从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍不一定从offset=0开始,因为历史消息只保留一部分
CONSUME_FROM_TIMESTAMP从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
注意以上策略只针对新加入的组,已经存在的组不生效 已存在的组会按实际消费进度消费
进度管理机制group queue为单位一个group可能订阅多个topic、tag,但本质上是一个group对应多个queueId
问题进度如何跟tag结合?broker收到拉取请求,此时在一定范围内过滤出相应的消息(即拉取的一批消息下标不一定是连续的),消费完成后,ack最大下标即可
消费进度如何保存?本地保存 定时任务同步到broker Ack卡进度怎么办?
生产者(Producer)单个JVM有个MQClientManager实例,维护map<String,MQClientInstance>,一个clientId只会创建一个MQClientInstance
其中 clientId=客户端IP instance( unitname可选) 若instance为默认值,则会使用进程ID
同一个JVM不同的消费者、生产者在启动时获取到的MQClientInstance实例是同一个
消息发送流程验证消息主题不能为空 消息体不能为空 消息长度不能为0且最大不能超过4M
查找路由查找第一次发送消息时,本地没有路由缓存,消息发送之前需要先通过主题名称从NameServer获取该主题的路由信息,若未找到,则尝试用默认主题查询,如果autoCreateTopicEnable为true,则NameServer返回路由信息,否则抛出未找到路由的异常。
1、通过默认主题找的路由信息 替换路由信息中读写队列个数为消息生产者默认的队列个数(可配置) 2、通过topic找到路由信息 与本地缓存的路由信息对比并更新
注意: 1、从NameServer获取路由信息以后,会做本地缓存,第二次直接取缓存 2、客户端会产生一个定时任务,默认30秒从NameServer获取一次路由信息并更新本地缓存
选择一次消息发送,可能会执行多次队列选择
第一次选择队列1,发送消息失败,此时会把该失败队列放到失败队列中,并重新选择队列,第二次选择时会规避第一次失败的队列 故障延迟机制
发送1、为消息体生成全局唯一ID 2、消息体超过4K则启用zip压缩 从nameServer获取broker服务地址列表 消息发送
发送方式同步指:消息发送后,必须等待服务器响应,只有结果允许的情况下才可执行下一步
引入重试机制
异步指:消息发送后无需等待服务器响应,只需提供一个回调方法,待服务器响应或者超时后的结果处理。
出现网络异常、超时等时不进行重试
单向指:调用消息发送API后无需等待服务器返回发送结果,也无提供回调方法
无重试机制 批量发送
重试机制异步发送根据设定的次数进行重试
同步发送不重试 如果发送的过程中超时,则直接抛出异常,不重试(有可能是重试的过程中)
特点Producer与Name Server集群中的其中一个节点(随机选择)建立长连接如何切换 定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳 Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败 Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接 Group
消息存储所有主题的消息均存储在一个文件上,以保证消息发送时顺序写文件。然而消息消费是基于主题,为了提高消费效率,引入了ConsumeQueue消息队列文件,每个消息队列都有自己的消息队列文件
存储文件主要消息文件Comitlog消息存储文件,将
位置: 每个文件默认1G(大小可修改),
所有主题的消息都存储在该文件中默认是1GB位置消息队列文件ConsumeQueue消息消费队列,消息到达comitlog后,将异步发送到该文件,供消费者消费,可以将该文件看做commitlog文件的索引
位置:${ROCKETMQ_HOME}/store/consumeQueue 每个主题一个子目录,每个主题有多个消息队列,所以该主题目录下又根据消息队列的ID依次创建了子目录,如topic-test/0 topic-test/1 存储结构:commitlog-offset:message-size:tag-hashcode ConsumeQueue并不全量存储消息,只存储对应消息在commitlog文件中的偏移量 消息长度 tag-hashcode
问题 1、整个消息全量异步发送吗?
Hash索引文件IndexFileIndexFile(索引文件)则只是为了消息查询提供了一种通过key或时间区间来查询消息的方法 为了加速消息的检索性能
检测点文件checkPoint
关闭异常文件abort
其他checkPoint
消息文件过期机制commitLog和ConsumeQueue公用一套过期删除机制
判断如果非当前写文件在一定时间内没有再被更新,则被当作过期文件默认72小时 mq默认10s判断一次
删除(满足任何一个)到达时间点默认凌晨4点 磁盘空间不足 预留的手工触发
特别注意删除时不会判断文件中的消息是否已经被消费 消息文件存储空间报警机制 同步/异步双写 同步/异步复制
刷盘同步在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入
优点数据可靠性很高
缺点写入慢,并发低
异步在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态
优点写入很快,并发高
缺点如果内存刷入磁盘前发生故障,数据容易丢失
消息过滤FilterServer表达式TAG基于Topic tag,比较好理解
SQL92对消息的属性运用SQL过滤表达式进行条件匹配 类
主从同步(HA)机制RocketMQ主从复制原理 RocketMQ读写分离机制
实战消息批量发送 消息发送队列自选择 消息过滤 事务消息 Spring整合RocketMQ Spring Cloud整合RocketMQ RocketMQ监控与运维命令 应用场景分析 常见问题 面试
服务组成与理论NameServer负责broker的注册,主题路由信息的收集等
broker每30秒向NameServer发送心跳包,包中包含broker所拥有的的路由信息,NameServer收集所有broker的路由信息并整理
消息生产者客户端每30秒向NameServer获取路由信息并更新本地缓存的路由,在发送消息前如果不存在该主题的路由则会主动从NameServer获取该主题路由信息
路由中心NameServer默认端口 9876 Broker的注册,Broker的路由,Broker生命周期的管理
作用: 1、为生产者、消费者提供关于主题Topic的路由信息 2、Nameserver相当于broker的注册中心,在集群结构中,NameServer之间并不通信。 3、NameServer与所有Broker保持长链接,每隔10秒检测一次本地的Broker信息,如果连续120S没有收到Broker的心跳包,则将移除该broker的路由信息并关闭socket连接(Broker每30秒向NameServer发送心跳)
部署Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
部署时,可以启动多个,相互之间完全独立 配置
数据类型路由信息 broker服务器信息
功能路由管理broker每30秒向NameServer发送心跳,包含路由信息 服务注册与发现
特点nameserver之间不通信 与broker保持长链接,30S心跳 Name Server定时(每隔10s)扫描所有存活broker的连接 Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接
BrokerBroker正常关闭时会告诉NameServer
属性brokerAddr可以手动指定broker的ip地址,在向NameServer注册的时候如果配置了brokerAddr则注册该地址
brokerId0:master 大于0:slave
brokerNamebroker名称
clusterNamebroker所在的集群名称,通过该名称区分broker所在的集群
每个broker在
haServerAddrmaster主节点地址,初次请求该值为空,slave向NameServer注册后返回并赋值
启动流程1、在Broker启动流程中,会构建TopicConfigManager对象,其构造方法中首先会判断是否开启了允许自动创建主题,如果启用了自动创建主题,则向topicConfigTable中添加默认主题的路由信息 注意:此时默认主题队列数为8 2、Broker启动时向所有配置的NameServer发送心跳信息,每隔30秒向集群中所有的NameServer发送心跳包 3、心跳包包含 brokerId、名称、网络地址、集群名称、关联的FilterServer地址列表,broker所拥有的的路由信息(包括默认路由)
部署masterBrokerId为0
slaveBrokerId非0
概要通过指定相同的Broker Name关联 配置
客户端路由发现1、当NameServer的路由信息发生变化时,不会主动推给客户端,而是由客户端定期根据主题获取路由信息 扩容
主从同步机制(HA)主服务器启动,并监听从服务器的连接 从服务器主动链接主服务器,并携带待拉取的偏移量Master与slave的HA心跳发送间隔是5s
同步主从消息发送者将消息刷到磁盘以后,需要继续等待数据传输到从服务器
异步主从数据到达主broker后并完成持久化后理解返回,然后异步同步到从broker
读写分离机制消费者先向主服务器发起拉取消息请求,然后主服务器返回一批消息,同时会根据主服务器负载压力与主从同步情况,然后建议客户端下次拉取消息是从主服务器还是从服务器拉取
回溯消息已消费消息可再次消费,支持按时间回溯消费
消息类型延时消息延迟级别大于0,则将消息的原主题和原消息队列ID存入消息属性中,用延迟消息主题SCHEDULE_TOPIC存储,到时间则将消息发送出去
事务消息事务消息实现思想两阶段提交 定时事务状态会查 决定提交还是回滚 事务消息发送流程 提交或回滚事务 事务消息回查事务状态
组成msgId