redis消息队列订阅与发布(Redis中的消息队列)
redis消息队列订阅与发布(Redis中的消息队列)持久性是指消息发送到系统后是否会消失,也分为三种,精确一次是要求较高的机制,它确保消息被处理仅处理一次。共有三种常见的保证,最多一次相对容易实现。可以说,所有的Mq都有这个保证。消费者可以接收到发送的消息或什么也没有。比如,网络问题导致的消息丢失。其次,消费者虽然收到,但处理异常。至少一次 ,如 RabbitMQ、Kafka 等。与 至多一次 相比,至少一次 具有更强的保证。它可以确保必须处理消息,但是有可能存在重复消费。例如,消费者消费后不确认的情况下。
在本文中,将介绍消息队列的一些特性,并浅谈如何使用 Redis 构建消息队列。
消息队列选择消息队列时需要考虑很多方面,例如消息传播、消息消费、持久性和消费组。
消息传播消息传播有2种方式,
- 1对1
- 一对多
一对一非常简单。生产者向队列发送一条消息,这条消息只被一个消费者接收。另一方面,一对多是可以传递给多个消费者的消息。
消息消费共有三种常见的保证,
- 最多一次
- 至少一次
- 精确一次
最多一次相对容易实现。可以说,所有的Mq都有这个保证。消费者可以接收到发送的消息或什么也没有。比如,网络问题导致的消息丢失。其次,消费者虽然收到,但处理异常。
至少一次 ,如 RabbitMQ、Kafka 等。与 至多一次 相比,至少一次 具有更强的保证。它可以确保必须处理消息,但是有可能存在重复消费。例如,消费者消费后不确认的情况下。
精确一次是要求较高的机制,它确保消息被处理仅处理一次。
持久性持久性是指消息发送到系统后是否会消失,也分为三种,
- 在内存中
- 磁盘内
- 混合的方式
提一个有趣的问题,将消息保存在磁盘中会更慢吗?如果想不明白,可以看看各个Mq的持久化原理,答案是不一定,具体取决于持久性是如何实现的。Kafka 使用 LSM-tree 实现了很大的吞吐量;此外,它比使用内存的 RabbitMQ 更好。在 Cassandra 中还有一个例子,Cassandra 的写入速度非常快,并且也使用了 LSM-tree。
混合是内存和磁盘结合的一种特殊情况。消息队列为了提高写入性能,先写入内存,再刷入磁盘。RabbitMQ 是混合中的一个典型中间件。但,RabbitMQ 支持持久化到磁盘。
消费组在我看来,消费者组是消息队列中最重要的特性。处理消息通常需要一定的时间,因此我们需要使用更多的消费者来处理消息,也就是做横向扩展。在消费组场景中,一对一和一对多的目标都变成了一个消费组。
Redis 队列说完消息队列中的特性,再来说说Redis是如何实现消息队列的。有3种方法可以支持,
- 发布/订阅
- 列表
- stream
Pub/Sub 是常见的解决方案。消费者SUBSCRIBE一个主题,也就是一个键,然后在客户端PUBLISH向同一主题发送消息后接收数据。作为传统的 Pub/Sub 功能,支持将消息传播多个消费者。而且,也可以实现一定程度的消息路由。
但是 Redis 中的 Pub/Sub 在大多数场景中并不流行。最大的问题是消息最多只能传递一次。当消息发布时,如果消费者现在没有收到,消息就会消失。此外,Redis 不会持久化消息。如果 Redis 关闭,所有消息都会消失。
让我们总结一下 Pub/Sub:
- 一对一和一对多都可以
- 最多一次
- 无持久化保障机制
- 没有消费组的概念
List 是 Redis 中常用的数据结构,可以通过它轻松地完成一个 FIFO 队列,核心操作是我们可以通过BLPOP在阻塞模式下等待消息,这里建议添加超时。
从图中可以看出,如果有多个消费者在等待同一个列表,那他们相当于一个消费者组。另一方面,列表不支持消息广播。如果消息通过BLPOP消费,其他消费者无法在获取该条消息。
Redis 支持List消息持久化,此外,如果您启用AOF或RDB,则可以将消息备份到磁盘中。
总结一下,
- 1对1可以,但不能1对多
- 最多一次
- 保存在内存中,并备份在磁盘中
- 变向支持消费组
介绍了 Pub/Sub 和 List 之后,我们注意到这两种方法都不是很好。他们有自己的缺点。因此,从 Redis 5.0 开始,Stream 推出,以解决如上问题。
因为Stream要复杂的多,我们先来看看Stream带来了什么好处。
- 一对一和一对多都可以
- 至少一次
- 保存在内存中,并备份在磁盘中
- 支持消费组
因此,Stream 解决了 Pub/Sub 和 List 中的所有问题并做了增强。
该图类似于 Pub/Sub,但工作流程更接近 List。生产者可以随时生成消息,然后XADD到 Redis Stream。您可以将 Stream 视为维护所有传入消息的列表。消费者也可以随时通过XREAD读取消息。
- $: 不管之前Stream中有什么消息,从现在开始只检索。
- 0-0: 总是从头读。
- <id>: 从特定的消息 id 开始。
除了支持一对一映射之外,Stream 还支持如下消费组:
为了实现至少一次保证,像大多数消息队列一样,消费者必须在使用XACK.
看看Stream的类型结构
Stream消费者故障转移在消费者组中,消费者通过客户端的名称(区分大小写的字符串)进行区分,当断开连接重新连通后,消费者客户端还是提供相同的名字,会被当做同一个消费者。这意味着在消费者组中由客户端提供唯一标识符
在分布式系统中,我们不能草率命名消费者。例如,消费者在 K8s 中的容器中运行:我如何维护每个 pod 的名称,如何面对横向扩展的场景?
Redis Stream 维护一个针对最后位置的名称表。所以,如果我们每次生成一个随机名称,映射表就会越来越大。对于那些已收到但未确认的消息将永远不会被处理,这样PEL就会膨胀,出现一定的风险。
幸运的是,Redis Stream 提供了一种方法来声明这些待处理的消息。工作流程是这样的:
- 找出所有待处理的消息 ID。
- 声明这些 ID ,并且转移所有权。
工作流程是:
- XPENDING StreamName GroupName
- XCLAIM StreamName GroupName <ConsumerName in uuid> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
这min-idle-time是一个非常有用的方法。通过使用min-idle-time,我们可以避免多个消费者同时声明相同的消息。
Redis 流持久化即使开启了最严谨的策略,Redis 也不保证数据完全不会丢失。如果我们使用 Redis 作为消息队列,我们必须采取额外的措施来确保持久性。在发布消息之前,把消息写入 MySQL 等持久存储中,如果发生错误,仍然可以利用 MySQL 中的持久消息来恢复我们的工作。
此外,如果 Stream 持久化的消息越来越多,Redis 的内存使用也会是瓶颈。具体不做深入分析。
结论让我总结一下这三种方法。
更多细节,可以查阅官方资料深入了解。