rabbitmq kafka 区别(四种消息中间件分析介绍)
rabbitmq kafka 区别(四种消息中间件分析介绍)如果我们使用消息中间件,我们只需要将消息推送至消息中间件中,BCD系统对积压的消息进行相应的处理。还是上述我们的应用场景,假设某一时间段内,每秒都有一条消息推送,如果我们使用接口进行推送,BCD三个系统处理完就需要三秒。这样会导致用户前端体验较差,而且系统后台一直处于阻塞状态,后续的消息推送接口一直在等待。总结:通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。继续我们上述的消息推送业务,如果我们现在需要同时推送消息到BCD三个系统中,而BCD系统接收到消息后需要进行复杂的逻辑处理,以及读库写库处理。如果一个三方系统进行消息处理需要1s,那我们遍历推送完一次消息,就需要三秒。而如果我们使用消息中间件,我们只需要推送到消息中间件,然后进行接口返回,可能只需要20ms,大大提升了用户体验。消息推送后BCD系统各自进行消息消费即可,不需要我们等待。
我们从四种消息中间件的介绍到基本使用,以及高可用,消息重复性,消息丢失,消息顺序性能方面进行分析介绍!
一、消息中间件的使用场景消息中间件的使用场景总结就是六个字:解耦、异步、削峰
1.解耦如果我方系统A要与三方B系统进行数据对接,推送系统人员信息,通常我们会使用接口开发来进行。但是如果运维期间B系统进行了调整,或者推送过程中B系统网络进行了调整,又或者后续过程中我们需要推送信息到三方C系统中,这样的话就需要我们进行频繁的接口开发调整,还需要考虑接口推送消息失败的场景。
如果我们使用消息中间件进行消息推送,我们只需要按照一种约定的数据结构进行数据推送,其他三方系统从消息中间件取值消费就可以,即便是三方系统出现宕机或者其他调整,我们都可以正常进行数据推送。
总结:通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。
2.异步继续我们上述的消息推送业务,如果我们现在需要同时推送消息到BCD三个系统中,而BCD系统接收到消息后需要进行复杂的逻辑处理,以及读库写库处理。如果一个三方系统进行消息处理需要1s,那我们遍历推送完一次消息,就需要三秒。
而如果我们使用消息中间件,我们只需要推送到消息中间件,然后进行接口返回,可能只需要20ms,大大提升了用户体验。消息推送后BCD系统各自进行消息消费即可,不需要我们等待。
3.削峰还是上述我们的应用场景,假设某一时间段内,每秒都有一条消息推送,如果我们使用接口进行推送,BCD三个系统处理完就需要三秒。这样会导致用户前端体验较差,而且系统后台一直处于阻塞状态,后续的消息推送接口一直在等待。
如果我们使用消息中间件,我们只需要将消息推送至消息中间件中,BCD系统对积压的消息进行相应的处理。
在上述高频发的消息时间段内,会在消息中间中产生消息积压,BCD系统在上述时间段外对积压消息进行相应的处理即可。
二、消息中间件的优缺点消息中间件的优点其实就是他的使用场景。
消息中间件的缺点与优点也是相辅相成的,主要有以下三个
1.系统可用性降低系统关联的中间件越多,越容易引发宕机问题。
如上述案例中的问题,原本进行消息推送我们只需要开发接口进行推送即可,引入消息中间件后就需要考虑消息中间件的高可用问题,如果消息中间件出现宕机问题,我们所有的消息推送都会失败。
2.系统复杂度提高上述案例中,如果我们使用接口进行消息推送,我们只需要考虑接口超时以及接口推送消息失败的问题。但我们引入消息中间件后,就需要考虑消息中间件的维护,以及消息重复消费,消息丢失的问题。
3.一致性问题上述案例中,如果我们使用接口进行消息推送,推送消息我们可以放在事务中处理,如果推送过程中出现异常,我们可以进行数据回滚,但我们引入消息中间件后,就需要考虑消息推送后,消费失败的问题,以及如果我们同时推送消息到BCD系统中,如何保证他们的事务一致性。
三、四种消息中间件的基本介绍
特性 |
ActiveMQ |
RabbitMQ |
RocketMQ |
Kafka |
单机吞吐量 |
万级,比 RocketMQ、Kafka 低一个数量级 |
同 ActiveMQ |
10 万级,支撑高吞吐 |
10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic 数量对吞吐量的影响 |
topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic |
topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | ||
时效性 |
ms 级 |
微秒级,这是 RabbitMQ 的一大特点,延迟最低 |
ms 级 |
延迟在 ms 级以内 |
可用性 |
高,基于主从架构实现高可用 |
同 ActiveMQ |
非常高,分布式架构 |
非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 |
有较低的概率丢失数据 |
基本不丢 |
经过参数优化配置,可以做到 0 丢失 |
同 RocketMQ |
功能支持 |
MQ 领域的功能极其完备 |
基于 erlang 开发,并发能力很强,性能极好,延时很低 |
MQ 功能较为完善,还是分布式的,扩展性好 |
功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
其他 |
Apache软件基金会开发、起步较早,但没有经过大量吞吐场景验证,目前社区不是很活跃 |
开源,稳定,社区活跃度高 |
阿里出品,目前已交给Apache,但社区活跃度较低 |
Apache软件基金会开发、开源、高通吐量,社区活跃度高 |
Activemq 是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。
1.2:Activemq 的作用及原理Activemq 的作用就是系统之间进行通信,原理就是生产者生产消息, 把消息发送给activemq, Activemq 接收到消息, 然后查看有多少个消费者,
然后把消息转发给消费者, 此过程中生产者无需参与。 消费者接收到消息后做相应的处理和生产者没有任何关系。
1.3:Activemq 的通信方式publish(发布)-subscribe(订阅)(发布-订阅方式)发布/订阅方式用于多接收客户端的方式,作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。
p2p(point-to-point)(点对点)p2p的过程则理解起来比较简单。它好比是两个人打电话,这两个人是独享这一条通信链路的。一方发送消息,另外一方接收,就这么简单。在实际应用中因为有多个用户对使用p2p的链路,相互通信的双方是通过一个类似于队列的方式来进行交流。和前面pub-sub的区别在于一个topic有一个发送者和多个接收者,而在p2p里一个queue只有一个发送者和一个接收者。
1.4:Activemq 的消息持久化机制JDBC: 持久化到数据库
AMQ :日志文件(已基本不用)
KahaDB : AMQ基础上改进,默认选择
LevelDB :谷歌K/V数据库
在activemq.xml中查看默认的broker持久化机制。
1.5:Activemq 的消息确认机制(1)AUTO_ACKNOWLEDGE = 1 自动确认
(2)CLIENT_ACKNOWLEDGE = 2 客户端手动确认
(3)DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
(4)SESSION_TRANSACTED = 0 事务提交并确认
(5)INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认
前四种是JMS API中提供的客户端ACK_MODE。第五种是InforSuiteMQ自定义补充的一种ACK_MODE。
2.RabbitMQ2.1:RabbitMQ是什么RabbitMQ是一个由erlang语言编写的、开源的、在AMQP基础上完整的、可复用的企业消息系统。
2.2:RabbitMQ的作用及原理基本概念
关键名称 |
说明 |
Channel(信道) |
消息推送使用的通道 |
Producer(消息的生产者) |
向消息队列发布消息的客户端应用程序 |
Consumer(消息的消费者) |
从消息队列取得消息的客户端应用程序 |
Message(消息) |
消息由消息头和消息体组成 |
Routing Key(路由键) |
消息头的一个属性,用于标记消息的路由规则,决定了交换机的转发路径 |
Queue(消息队列) |
用于存储生产者的消息 |
Exchange(交换器路由器) |
提供Producer到Queue之间的匹配 |
Binding(绑定) |
用于建立Exchange和Queue之间的关联 |
Binding Key(绑定键) |
Exchange与Queue的绑定关系,用于匹配Routing Key |
Broker(服务主体) |
RabbitMQ Server,服务器实体 |
最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为点对点模式
2.3.2:工作队列模式一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式
2.3.3:发布订阅模式Pulish/Subscribe,无选择接收消息,一个消息生产者,一个交换机(交换机类型为fanout),多个消息队列,多个消费者
生产者只需把消息发送到交换机,绑定这个交换机的队列都会获得一份一样的数据。
2.3.4:路由模式在发布/订阅模式的基础上,有选择的接收消息,也就是通过 routing 路由进行匹配条件是否满足接收消息。
2.3.5:主体模式topics(主题)模式跟routing路由模式类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由键 routingKey,类似于SQL中 = 和 like 的关系。
2.3.6:RPC模式与上面其他5种所不同之处,该模式是拥有请求/回复的。也就是有响应的,上面5种都没有。
RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的处理业务,处理完后然后在A服务器继续执行下去,把异步的消息以同步的方式执行。
2.4:RabbitMQ的消息持久化机制Queue(消息队列)的持久化是通过durable=true来实现的。
Message(消息)的持久化 ,通过设置消息是持久化的标识。
Exchange(交换机)的持久化 。
2.5:RabbitMQ的消息确认机制confirm机制:确认消息是否成功发送到Exchange
ack机制:确认消息是否被消费者成功消费
- AcknowledgeMode.NONE:自动确认
- AcknowledgeMode.AUTO:根据情况确认
- AcknowledgeMode.MANUAL:手动确认
RocketMQ是阿里开发的一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
3.2:RocketMQ的作用及原理基本概念
关键名称 |
说明 |
Producer |
消息生产者 |
Producer Group |
生产者组 |
Consumer |
消息消费者 |
Consumer Group |
消费者组 |
Topic |
Topic用于将消息按主题做划分,Producer将消息发往指定的Topic,Consumer订阅该Topic就可以收到这条消息 |
Message |
代表一条消息 |
Tag |
标签可以被认为是对 Topic 进一步细化 |
Broker |
负责接收并存储消息 |
Queue |
Topic和Queue是1对多的关系,一个Topic下可以包含多个Queue,主要用于负载均衡 |
Offset |
RocketMQ在存储消息时会为每个Topic下的每个Queue生成一个消息的索引文件,每个Queue都对应一个Offset记录当前Queue中消息条数。 |
NameServer |
NameServer可以看作是RocketMQ的注册中心 |
RocketMQ消息订阅有两种模式
一种是Push模式(MQPushConsumer),即MQServer主动向消费端推送
另外一种是Pull模式(MQPullConsumer),即消费端在需要时,主动到MQ Server拉取
但在具体实现时,Push和Pull模式本质都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息
集群模式和广播模式
集群模式:默认情况下我们都是使用的集群模式,也就是说消费者组收到消息后,只有其中的一台机器会接收到消息。
广播模式:消费者组内的每台机器都会收到这条消息。
3.4:RocketMQ的消息持久化机制exchange持久化、queue持久化、message持久化
CommitLog:日志数据文件,存储消息内容,所有 queue 共享,不区分 topic ,顺序读写 ,1G 一个文件
ConsumeQueue:逻辑 Queue,基于 topic 的 CommitLog 的索引文件,消息先到达 commitLog,然后异步转发到 consumeQueue,包含 queue 在 commitLog 中的物理偏移量 offset,消息实体内容大小和 Message Tag 的 hash 值,大于 600W 个字节,写满之后重新生成,顺序写
IndexFile:基于 Key 或 时间区间的 CommitLog 的索引文件,文件名以创建的时间戳命名,固定的单个 indexFile 大小为 400M,可以保存 2000W 个索引
3.5:RocketMQ的消息确认机制confirm机制:确认消息是否成功发送到Exchange
ack机制:确认消息是否被消费者成功消费
4.Kafka4.1:Kafka是什么Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统,可作为消息中间件
4.2:Kafka的作用及原理基本概念
关键名称 |
说明 |
producer |
生产者 |
consumer |
消费者 |
consumer group |
消费者组 |
broker |
一台kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic |
topic |
一个消息队列,生产者和消费者都是面对一个Topic |
partition |
每个partition时一个有序队列,partition是topic中存储数据和消费数据所使用的队列所在 |
replica |
副本,为了保证当前某个节点发生故障时,当前节点上的数据不会发生丢失 |
leader |
每个分区多个副本的“主”,生产者生产数据的对象,以及消费组消费者消费的对象 |
follower |
每个分区多个副本的“从”,实时从leader数据的同步 |
生产者发送模式
1.发后即忘(fire-and-forget):只管往Kafka中发送消息而并不关心消息是否正确到达
2.同步(sync):一般是在send()方法里指定一个Callback的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。
3.异步(async):send()方法会返回Futrue对象,通过调用Futrue对象的get()方法,等待直到结果返回
消费者消费模式
1.At-most-once(最多一次):在每一条消息commit成功之后,再进行消费处理;设置自动提交为false,接收到消息之后,首先commit,然后再进行消费。
2.At-least-once(最少一次):在每一条消息处理成功之后,再进行commit;设置自动提交为false;消息处理成功之后,手动进行commit。
3.Exactly-once(正好一次):将offset作为唯一id与消息同时处理,并且保证处理的原子性;设置自动提交为false;消息处理成功之后再提交。
4.4:Kafka的消息持久化机制Kafka直接将数据写入到日志文件中,以追加的形式写入
4.5:Kafka的消息确认机制confirm机制:确认消息是否成功发送
ack机制:确认消息是否被消费者成功消费
四、消息队列高可用引言:系统应用MQ作为消息中间件后,会导致系统可用性降低。所以只要你用了 MQ,高可用肯定是要考虑到的
1.ActiveMQ高可用ActiveMQ的部署方式有三种,分别为:单节点部署(不支持高可用),Master-Slave部署方式(主从模式),Broker-Cluster部署方式(负载均衡)
1.1.单节点部署(不支持高可用)单节点部署方式因为不支持高可用,只会在开发或者测试环境下用到,且单节点部署方式较简单,不进行详述。
1.2.Master-Slave部署方式(支持高可用)1.2.1.shared filesystem Master-Slave部署方式主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master。
多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave。
1.2.2.shared database Master-Slave方式与shared filesystem方式类似,只是共享的存储介质由文件系统改成了数据库而已。
1.2.3.Replicated LevelDB Store方式这种主备方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper协调选择一个node作为master。被选择的master broker node开启并接受客户端连接。
其他node转入slave模式,连接master并同步他们的存储状态。slave不接受客户端连接。所有的存储操作都将被复制到连接至Master的slaves。
如果master死了,得到了最新更新的slave被允许成为master。fialed node能够重新加入到网络中并连接master进入slave mode。所有需要同步的disk的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。
当一个新的master被选中,你需要至少保障一个法定node在线以能够找到拥有最新状态的node。这个node将会成为新的master。因此,推荐运行至少3个replica nodes,以防止一个node失败了,服务中断。
1.3.Broker-Cluster部署方式(不支持高可用)前面的Master-Slave的方式虽然能解决多服务热备的高可用问题,但无法解决负载均衡和分布式的问题。Broker-Cluster的部署方式就可以解决负载均衡的问题。
Broker-Cluster部署方式中,各个broker通过网络互相连接,并共享queue。当broker-A上面指定的queue-A中接收到一个message处于pending状态,而此时没有consumer连接broker-A时。如果cluster中的broker-B上面由一个consumer在消费queue-A的消息,那么broker-B会先通过内部网络获取到broker-A上面的message,并通知自己的consumer来消费。
1.3.1.static Broker-Cluster部署在activemq.xml文件中静态指定Broker需要建立桥连接的其他Broker
1.3.2.Dynamic Broker-Cluster部署在activemq.xml文件中不直接指定Broker需要建立桥连接的其他Broker,由activemq在启动后动态查找
1.4.Master-Slave与Broker-Cluster相结合的部署方式可以看到Master-Slave的部署方式虽然解决了高可用的问题,但不支持负载均衡,
Broker-Cluster解决了负载均衡,但当其中一个Broker突然宕掉的话,那么存在于该Broker上处于Pending状态的message将会丢失,无法达到高可用的目的。
Master-Slave与Broker-Cluster相结合的部署方式是目前ActiveMQ比较推荐的部署方案。
2.RabbitMQ高可用RabbitMQ的部署方式有三种,分别为:单机模式(不支持高可用),普通集群模式(不支持高可用),镜像集群模式(支持高可用)
2.1单机模式(不支持高可用)单节点部署方式因为不支持高可用,只会在开发或者测试环境下用到,且单节点部署方式较简单,不进行详述。
2.2普通集群模式(不支持高可用)普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。
这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。
而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。
2.3镜像集群模式(支持高可用)这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
3.RocketMQ高可用RocketMQ的部署方式有两种,分别为:单节点模式(不支持高可用),多节点模式
3.1.单节点模式(不支持高可用)单节点部署方式因为不支持高可用,只会在开发或者测试环境下用到,且单节点部署方式较简单,不进行详述。
3.2.多节点模式3.2.1.多Master模式(不支持高可用)一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
配置简单,单个Master 宕机或重启维护对应用无影响。
单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。
3.2.2.多Master多Slave模式(异步复制)(支持高可用)每个 Master 配置一个 Slave,有多对Master-Slave, HA,采用异步复制方式,主备有短暂消息延迟,毫秒级。
即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master 宕机后,消费者仍然可以从 Slave消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
Master 宕机,磁盘损坏情况,会丢失少量消息。
3.2.3.多Master多Slave模式(同步双写)(支持高可用)每个 Master 配置一个 Slave,有多对Master-Slave, HA采用同步双写方式,主备都写成功,向应用返回成功。
数据与服务都无单点, Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT会略高。
4.Kafka高可用Kafka的部署方式有三种,分别为:单broke节点(不支持高可用),单机多broker模式(支持高可用),多机多broker模式(支持高可用)
4.1.单broke节点(不支持高可用)单节点部署方式因为不支持高可用,只会在开发或者测试环境下用到,且单节点部署方式较简单,不进行详述。
4.2.单机多broker模式(支持高可用)这种部署方式其实是一种伪集群模式,单机部署多节点如果出现服务器宕机,那么所有节点都不能正常提供服务。
4.3.多机多broker模式(支持高可用)Kafka 0.8 以前,是没有 HA 机制的,就是任何一个 broker 宕机了,那个 broker 上的 partition 就废了,没法写也没法读,没有什么高可用性可言。
Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。只能读写 leader?很简单,要是你可以随意读写每个 follower,那么就要 care 数据一致性的问题,系统复杂度太高,很容易出问题。Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。
五、消息重复消费问题引言:为什么要考虑重复消费的问题?比如我们消费后通过消费中间件来调用,扣费10元,但是消费者消费消息后还没来得及进行确认,消息中间件进行了重启,那么消息者就会进行再次扣费处理,这样就会出问题!
ActiveMQ、RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。
我们以Kafka为例说明一下重复消费的问题:
Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。
但是,如果在这期间重启系统或者直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset。重启之后,少数消息会再次消费一次。
如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,你可能就把数据在数据库里插入了 2 次,那么数据就错啦。
重复消费问题引发后,我们就需要考虑怎么保证幂等性。
幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
保证幂等性的具体实现方式需要结合对应的业务去实现,这里提供几个思路:
- 如果是数据插入操作,插入前我们根据唯一键先进行查询,如果已有数据那我们只进行更新就行。
- 如果是写 Redis,则我们无需考虑幂等性,反正每次都是 set,天然幂等性。
- 如果是基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。
- 如果不是以上集中通用的场景,那需要我们发送消息的时候携带唯一ID,消费者在消费前进行相应的查重处理,处理后在进行相应的业务操作。
引言:MQ 有个基本原则,就是数据不能多一条,也不能少一条。
不能多,就是上面说的重复消费和幂等性问题。
不能少,就是说这数据别搞丢了。那这个问题你必须得考虑一下。
消息丢失的问题需要从生产者、MQ、消费者三个方面来进行考虑,相应的解决方案也需要从这三方面出发(生产者确认机制,MQ消息持久化、消费者确认机制)。
1.ActiveMQ1.生产者丢失消息生产者丢失消息的问题可以通过消息重投、重试机制来解决
2.ActiveMQ丢失消息ActiveMQ丢失消息的问题需要通过ActiveMQ消息持久化机制 高可用(见ActiveMQ章节)来解决,ActiveMQ的消息持久化机制有以下几种
JDBC: 持久化到数据库
AMQ :日志文件(已基本不用)
KahaDB : AMQ基础上改进,默认选择
LevelDB :谷歌K/V数据库
在activemq. xml 中查看默认的broker持久化机制。
3.消息者丢失消息消费者丢失消息通过ack机制来解决,消息者进行业务处理后,再进行ack确认,避免消息丢失。
2.RabbitMQ1.生产者丢失消息生产者消息丢失,通过confirm机制来确认消息发送,然后进行相应的消息重投、重试机制
2.RabbitMQ丢失消息RabbitMQ丢失消息的问题需要通过RabbitMQ消息持久化机制 高可用(见RabbitMQ章节)来解决,
RabbitMQ持久化包含:
Queue(消息队列)的持久化是通过durable=true来实现的。
Message(消息)的持久化 ,通过设置消息是持久化的标识。
Exchange(交换机)的持久化 。
3.消息者丢失消息消费者丢失消息通过ack机制来解决,消息者进行业务处理后,再进行ack确认,避免消息丢失。
3.RocketMQ1.生产者丢失消息生产者消息丢失,通过confirm机制来确认消息发送,然后进行相应的消息重投、重试机制
2.RocketMQ丢失消息RocketMQ丢失消息的问题需要通过RocketMQ消息持久化机制 高可用(见RocketMQ章节)来解决,
RocketMQ持久化包含:exchange持久化、queue持久化、message持久化
3.消息者丢失消息消费者丢失消息通过ack机制来解决,消息者进行业务处理后,再进行ack确认,避免消息丢失。
4.Kafka1.生产者丢失消息生产者消息丢失,通过confirm机制来确认消息发送,然后进行相应的消息重投、重试机制
2.Kafka丢失消息Kafka直接将数据写入到日志文件中,以追加的形式写入
3.消息者丢失消息消费者丢失消息通过ack机制来解决,消息者进行业务处理后,再进行ack确认,避免消息丢失。
总结:其实MQ消息丢失,无非就是生产者发送时丢失,MQ传递时丢失,消费者消费时丢失几种问题,我们相应的从以上三方面解决就可以,但是上述三种方式使用后,其实也不能保证100%消息不丢失,所以往往在业务场景还会使用数据库辅助记录的方式,来保证消息不丢失。但数据库辅助记录方式对相关性能以及使用用较大的影响,所以一般数据只需要进行上面三种方式处理,就能保证消息基本不丢失。发生消息丢失时我们配合日志进行相应的消息恢复就可以。数据库辅助记录:生产者发送消息时同步发送一条消息到数据库中,消费者拿到消息并完成业务处理后,从数据库删除对应的记录。
七、消息顺序性问题引言:为什么要保证消息的顺序性?
比如现在我们有个账号余额为5,我们充值50元,购买一件20元的商品,但因消息不能保证顺序,导致先进行扣费处理,这样就会导致我们购买失败。
消息顺序性消费情况,尤其在高可用(集群方式)下一定要考虑。
1.ActiveMQActiveMQ因为默认是单queue 队列,所以它模式就是保证消息顺序性消费的。
2.RabbitMQ- 将RabbitMQ拆分多个 queue,每个 queue 一个 consumer,保证消息的顺序性。
- 一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
RocketMQ保证消息顺序性方法与Kafka大致相同。
- 一个 topic,一个 queue,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
- 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。
- 一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
- 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。
引言:如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,怎么处理?
其实消息积压的问题,一般都是由消费端出了问题导致的,在实际业务场景中一般不会出现,但是出现问题一般都是大问题。
模拟场景:一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。由于消费者宕机导致现在MQ中积压几百万数据
解决思路:- 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉(避免重复消费)。
- 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
- 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
- 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
- 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。
假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
mq 都快写满了如果消息积压在 mq 里,长时间都没有处理掉,此时导致 mq 都快写满了,咋办?
这种情况下只能是通过增加临时Consumer将数据进行快速消费,等MQ恢复正常后再补充数据。
RocketMQ方案对于 RocketMQ,官方针对消息积压问题,提供了解决方案。
- 提高消费并行度
- 绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:
同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者 在已有机器启动多个进程的方式。 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax 实现。
- 批量方式消费
- 某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer 的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。
- 跳过非重要消息
- 发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到 100000 条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:
- 优化每条消息消费过程
- 举例如下,某条消息的消费过程如下:
引言:如果让你写一个消息队列,该如何进行架构设计?
比如说消息队列系统,我们从以下几个角度来考虑一下:
- 可扩展性:就是需要的时候快速扩容,就可以增加吞吐量和容量。可以参考afka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。
- 持久化:为了保证MQ的消息不丢失,设计时一定要考虑消息的持久化机制,且持久化要顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。
- 高可用:保证MQ的可靠性,可以参考kafka 。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。
- 能不能支持数据 0 丢失啊?可以的,参考我们之前说的那个 kafka 数据零丢失方案。
其实MQ的使用,无非就是从原理,高可用,重复消息,顺序读写,数据丢失几个方面开展。
上述的介绍是偏重思路方面来进行展开的,至于具体的MQ使用细节,我想你有了对应的思路去查会有一大堆。这也是我学习技术的一个思路,先掌握一个大的方向,然后沿着一个大的方向再进行相应的详细学习。
最后,上述MQ介绍中,大部分都是有我平时开发积累所得,也有一部分是借助网络现场学习。
如有不足或错误,欢迎大家指出,我们共同学习进步!