队列定时处理(延迟队列-及存在的问题和解决方案)
队列定时处理(延迟队列-及存在的问题和解决方案)但上述方法,设置每一个消息不同的过期时间,依然存在以下问题:prop = new AMQP.BasicProperties().builder().expiration("3000").build();channel.basicPublish("daly_exchange" "daly_routing" prop "B过期时间为3秒".getBytes());正常声明队列:Map<String Object> map =Map.of("x-dead-letter-exchange" "dead_exchange" "x-dead-letter-routing-key" "dead_routing");channel.queueDeclare("
延迟队列的实现1:
- 将消息发送到指定了过期时间的队列。即在声明某个队列时,声明这个队列中所有消息的过期时间,这样,只要进入此队列的消息,它们的过期时间都是相同的,如以下代码,指定了过期时间为60秒,及过期的消息被路由的目标,如果不设置的话,默认过期时间为30分钟。
Map<String Object> map =
Map.of("x-dead-letter-exchange" "dead_exchange"
"x-dead-letter-routing-key" "dead_routing"
"x-message-ttl" 60000);
channel.queueDeclare("daly_queue" true false false map);
截图:
在队列上,统一设置一个过期时间的缺点是,不能根据业务需求,设置某一个消息的过期时间。
- 或声明一个没有过期时间的队列,但在发布消息时,指定消息的过期时间,如:
正常声明队列:
Map<String Object> map =
Map.of("x-dead-letter-exchange" "dead_exchange"
"x-dead-letter-routing-key" "dead_routing");
channel.queueDeclare("daly_queue" true false false map);
发布时,设置过期时间:
prop = new AMQP.BasicProperties().builder()
.expiration("3000").build();
channel.basicPublish("daly_exchange"
"daly_routing"
prop
"B过期时间为3秒".getBytes());
但上述方法,设置每一个消息不同的过期时间,依然存在以下问题:
延迟队列的问题:
1:A消息到达队列Q1并设置A消息的过期时间为10秒。
2:B消息也到达队列Q1,并设置B消息的过期时间为1秒。
3:这种情况下,因为队列中的数据是FIFO,排队执行的,所以,虽然B消息已经过期,但因A消息没有过期无法先将B消息发送给消费者,这就是延迟队列的最大问题。
4:解决方案:使用插件。
问题2示例图-演示以下演示,用于说明后发的信息虽然已经过期,但因为前一个消息并没有过期,所以,即使后面的消息过期,也不会被消费。
开发生产者- 声明列信交换机,死信队列及绑定关系。
- 声明正常交换机,正常队列,队列中消息过期时路由目的地,及绑定关系。
- 发布消息时,设置消息的过期时间。
package wj.rabbitmq.daly;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import wj.mq.utils.ConnUtils;
import Java.util.Map;
/**
* 延迟队列生产者
*/
@Slf4j
public class DalySender {
public static void main(String[] args) throws Exception {
Connection con = ConnUtils.newConnection();
Channel channel = con.createChannel();
//声明接收死信的的交换机和队列
channel.exchangeDeclare("dead_exchange" "direct" true);
//声明接收列信交换机数据的队列
channel.queueDeclare("dead_queue" true false
false null);
//声明死信交换机与死信队列的绑定关系
channel.queueBind("dead_queue" "dead_exchange"
"dead_routing");
//声明正常接收信息的交换机
channel.exchangeDeclare("daly_exchange" "direct" true);
//声明正常接收数据的队列 并添加列信路由到哪儿去
Map<String Object> map =
Map.of("x-dead-letter-exchange" "dead_exchange"
"x-dead-letter-routing-key" "dead_routing");
channel.queueDeclare("daly_queue" true false
false map);
//声明正常的绑定关系
channel.queueBind("daly_queue" "daly_exchange" "daly_routing");
//先发送一个过期时间为30秒
AMQP.BasicProperties prop =
new AMQP.BasicProperties().builder()
.expiration("30000")
.build();
channel.basicPublish("daly_exchange"
"daly_routing"
prop
"A过期时间为30秒".getBytes());
log.info("信息A过期30秒,发送完成。");
//再发送一个过期时间为3秒
prop = new AMQP.BasicProperties().builder()
.expiration("3000").build();
channel.basicPublish("daly_exchange"
"daly_routing"
prop
"B过期时间为3秒".getBytes());
log.info("信息B过期3秒发送完成。");
con.close();
}
}
开发一个消费者,只需要从死信队列中,读取过期的信息即可。
package wj.rabbitmq.daly;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import wj.mq.utils.ConnUtils;
/**
* 只接收死信队列里面的数据即可,不要消费正常的队列
* 以便于让正常的队列里面消息过期
*/
@Slf4j
public class DalyReceiver {
public static void main(String[] args) throws Exception{
Connection con = ConnUtils.newConnection();
Channel channel = con.createChannel();
System.err.println("准备消费死信队列里面,即过期的信息");
channel.basicConsume("dead_queue"
true
(consumerTag message) -> {
log.info("死信,即过期消息:" new String(message.getBody()));
} consumerTag -> {
//ignore
});
}
}
运行生产者,输出以下日志:
14:07:53.138 信息A过期30秒,发送完成。
14:07:53.142 信息B过期3秒发送完成。
运行消费者,输出以下日志,可见,并没有因为B先过期,而先收到B。
14:08:23.140 死信,即过期消息:A过期时间为30秒
14:08:23.140 死信,即过期消息:B过期时间为3秒
延迟队列问题的解决方案(使用插件)插件下载地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.2/rabbitmq_delayed_message_exchange-3.10.2.ez
rabbitmq所有可用插件列表:
https://www.rabbitmq.com/community-plugins.html
查看已经启用的插件通过命令rabbitmq-plugins list可以查看rabbitmq的插件列表,及已经启用的插件 前面添加了*号的,为已经启用的插件。可见,启用的插件为management和web,其他插件,都没有启动。但也没有我们要使用的dalyed_message_exchange插件,所以,还需要额外的安装这个插件。
插件相关命令官方参考地址:https://www.rabbitmq.com/plugins.html
rabbitmq-plugins list 用于列示所有插件
rabbitmq-plugins enable <plugin-name> 启用插件
rabbitmq-plugins disable <plugin-name> 禁用插件
rabbitmq-plugins directories -s 用于查看plugins可安装的目录 (以下示例,要进入容器执行)
安装插件下载rabbitmq_delayed_message_exchange-<version>.ez文件,进入docker容器,将文件放到plugins目录下,此目录为:/opt/bitnami/rabbitmq/plugins。
然后执行安装插件的命令:
如果不是用docker容器运行的,则可以直接将*.ez文件放到rabbitmq的插件目录下为:/usr/lib/rabbitmq/rabbitmq-server-<version>/plugins。
然后就可以执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange。
验证插件安装插件后,再次查看ui端口的交换机创建处,可以见到新的交换机类型:x-delayed-message .
测试发送延迟消息(Java项目)开发生产者- 声明延迟交换机,x-delay-message 。
- 声明普通队列。
- 声明延迟交换机与普通队列的绑定关系。
- 发送延迟消息,通过设置x-delay 单位毫秒。先发送一个延迟长的消息如1分钟。再发送一个延迟少的消息,如6秒。测试必须要先收到延迟少的消息,就算是延迟交换机运行成功。
package wj.rabbitmq.delayexchange;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import wj.mq.utils.ConnUtils;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* <pre>
* 使用延迟交换机发送延迟消息
* 发送的消息会被阻塞到延迟交换机中,先到期的消息会先被发送到队列中,并且不再设置过期时间
* 发送的队列的消息的,立刻会被消息
* </pre>
*/
@Slf4j
public class DelayExchangeSender {
public static void main(String[] args) throws Exception {
Connection con = ConnUtils.newConnection();
Channel channel = con.createChannel();
//声明延迟交换机,即指定个x-delayed-message的交换机的基础类型
Map<String Object> map = new HashMap<>();
map.put("x-delayed-type" "direct");//必须写,固定值
channel.exchangeDeclare(
"my-delayed-exchange" //指定交换机的名称,任意
"x-delayed-message" //指定交换机类型,必须是这个类型
true //是否持久化
false //是否自动删除
map);//必须传入这个参数
//声明普通队列
channel.queueDeclare("my-queue"
true false
false null);
//声明绑定关系
channel.queueBind("my-queue"
"my-delayed-exchange"
"delay-routing");
//先发送延迟60秒的消息 其中x-delay为固定key
AMQP.BasicProperties prop = new AMQP.BasicProperties()
.builder().headers(Map.of("x-delay" 1000 * 60))//设置延迟时间为60秒
.build();
channel.basicPublish("my-delayed-exchange"
"delay-routing"
prop
"Delay for 1 minutes".getBytes(StandardCharsets.UTF_8));
log.info("发送消息:[Delay for 1 minutes] 完成");
//再发送一个延迟时间少的信息,如6秒
prop = new AMQP.BasicProperties()
.builder().headers(Map.of("x-delay" 1000*6))
.build();
channel.basicPublish("my-delayed-exchange"
"delay-routing"
prop
"Delay for 6 seconds".getBytes(StandardCharsets.UTF_8));
log.info("发送消息: [Delay for 6 seconds] 完成");
}
}
如果先启动生产者,并已经声明了交换机与队列的绑定关系,则消费者可以不再次声明绑定关系。所以,消费者的代码就变的比较简单了,消费者仅是消费消息,并输入消费的时间:
开发消息者package wj.rabbitmq.delayexchange;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import wj.mq.utils.ConnUtils;
/**
* 延迟消息消费者
*/
@Slf4j
public class DelayExchangeReceiver {
public static void main(String[] args) throws Exception {
Connection con = ConnUtils.newConnection();
Channel channel = con.createChannel();
//直接消费这个队列即可
channel.basicConsume("my-queue" true
(consumerTag message) -> {
log.info("消费消息:{}" new String(message.getBody()));
} consumerTag -> {
//ignore
});
}
}
先启动生产者,发送信息,输出一下时间:
12:35:03.772 发送消息:[Delay for 1 minutes] 完成
12:35:03.774 发送消息: [Delay for 6 seconds] 完成
快速启动消费者,等待,并查看收到消息:
12:35:09.782 消费消息:Delay for 6 seconds
12:36:03.776 消费消息:Delay for 1 minutes
通过上面的代码,可以看出,消费者在6秒后,先收到了先延迟到期的消息,一分钟以后,再收到延迟一分钟的消息。虽然,先发送的是延迟一分钟的消息,但此消息,并没有阻塞后面延迟时间短的消息。
查看延迟交换机可以看到rate out 此时间将会是一个倒计时,到时间以后再给发送给队列。
在没有收以消息之前,消息会被阻塞到交换机中,所以,没有到延迟时间的消息,不会被发送的队列中。
测试发送延迟消息(springBoot项目)使用springboot项目,声明延迟交换机,需要使用CustumExchange自定义交换机类型。
开发配置类- 定义延迟交换机。
- 定义普通队列。
- 定义绑定关系。
package wj.mq.config.delay;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 使用rabbitmq_delayed_message_exchange发送延迟消息
*/
@Configuration
public class DelayMessageExchangeConfig {
@Bean
public CustomExchange delayExchange() {
CustomExchange exchange = //
new CustomExchange("my-delayed-exchange" // 交换机名称
"x-delayed-message" // 交换机的类型,必须是此值
true // durable
false // autoDelete
Map.of("x-delayed-type" "direct")// 传递延迟类型
);
return exchange;
}
@Bean
public Queue myQueue() {
Queue queue = new Queue("my-queue" //队列名称
true //durable
false //exclusive
false);//auto delete
return queue;
}
@Bean
public Binding myBinding(CustomExchange customExchange Queue myQueue) {
Binding binding = BindingBuilder.bind(myQueue)//Queue
.to(customExchange)//exchange
.with("my-routing")//路由routing
.noargs();
return binding;
}
}
开发生产者代码- 注入RabbitTemplate
- 发送,并设置过期时间。
- 发送信息,使用ApplicationRunner启动后即发送,也可以开发一个Conntroller,动态调用。
package wj.mq.rabbitmq.delay;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class DelaySender implements ApplicationRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
// 先发送一个延迟1分钟的消息
rabbitTemplate.convertAndSend("my-delayed-exchange" // 交换机名称
"my-routing" // 路由key
"Message of delay 1 minutes" // 消息对象
new MessagePostProcessor() {// 接收一个消息发送前的处理函数
@Override
public Message postProcessMessage(Message message)//
throws AmqpException {
// 处理消息,设置消息的过期时间设置x-delay头,60秒钟
message.getMessageProperties().setDelay(1000 * 60);
return message;
}
});
log.info("发送延迟1分钟的消息-完成");
// 再发送一个延迟6秒钟的消息
rabbitTemplate.convertAndSend("my-delayed-exchange" // 交换机名称
"my-routing" // 路由key
"Message of delay 6 seconds" // 消息对象
new MessagePostProcessor() {// 接收一个消息发送前的处理函数
@Override
public Message postProcessMessage(Message message)//
throws AmqpException {
// 处理消息,设置消息的过期时间设置x-delay头,6秒钟
message.getMessageProperties().setDelay(1000 * 6);
return message;
}
});
log.info("发送延迟6秒的消息-完成");
}
@Override
public void run(ApplicationArguments args) throws Exception {
send();//启动完成后就发送,也可以通过一个controller调用测试
}
}
开发消息者代码通过@RabblitListener接收消息
package wj.mq.rabbitmq.delay;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
/**
* 消费延迟消息
*/
@Slf4j
@Component
public class DelayReceiver {
@RabbitListener(queues = "my-queue" ackMode = "AUTO")
public void cousumer(@Payload() Message message Channel channel) {
log.info("接收到消息:{}" new String(message.getBody()));
}
}
运行测试通过以下输出可以看出,虽然先发送的延迟1分钟的消息,但由于后发出的消息因延迟时间更少(6秒)而被先接收到。说明这个测试已经成功运行。
2022-10-01 21:28:55.408 INFO : 发送延迟1分钟的消息-完成
2022-10-01 21:28:55.417 INFO : 发送延迟6秒的消息-完成
2022-10-01 21:29:01.433 INFO : 接收到消息:Message of delay 6 seconds
2022-10-01 21:29:55.408 INFO : 接收到消息:Message of delay 1 minutes
延迟插件的运行原理- 延迟交换机,会将收到信息,先缓存到延迟交换机内部。
- 消息延迟到期后,才会发送给目标队列。
由于每一次启动延迟插件,需要将rabbitmq_delayed_message_exchange-<version>.ez copy到bitnami/rabbitmq容器的目录: /opt/bitnami/rabbitmq/plugins/下。且如果容器删除后,必须重新copy。
可以创建一个新的镜像,将插件copy到镜像中,在启动时通过RABBITMQ_PLUGINS指定启动的插件,就可以了。
创建Dockerfile
创建新的镜像from bitnami/rabbitmq:3.10.8
MAINTAINER WJ
COPY rabbitmq_delayed_message_exchange-3.10.2.ez /opt/bitnami/rabbitmq/plugins/
构建新的镜像docker build -t rabbitmq:1.0 .
启动新的容器并指定启用的插件。
#!/bin/bash
docker stop mq
docker rm mq
docker run --name mq -d \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_USERNAME=admin \
-e RABBITMQ_PASSWORD=admin \
-e TZ=Asia/Shanghai \
-e RABBITMQ_PLUGINS=rabbitmq_management rabbitmq_mqtt rabbitmq_stream rabbitmq_delayed_message_exchange \
-v ${PWD}/data:/bitnami \
mq:1.0
进入容器查看可见,指定的插件,已经启用。
通过界面查看: