快捷搜索:  汽车  科技

队列定时处理(延迟队列-及存在的问题和解决方案)

队列定时处理(延迟队列-及存在的问题和解决方案)但上述方法,设置每一个消息不同的过期时间,依然存在以下问题:prop = new AMQP.BasicProperties().builder().expiration("3000").build();channel.basicPublish("daly_exchange" "daly_routing" prop "B过期时间为3秒".getBytes());正常声明队列:Map<String Object> map =Map.of("x-dead-letter-exchange" "dead_exchange" "x-dead-letter-routing-key" "dead_routing");channel.queueDeclare("

延迟队列的实现1:

  1. 将消息发送到指定了过期时间的队列。即在声明某个队列时,声明这个队列中所有消息的过期时间,这样,只要进入此队列的消息,它们的过期时间都是相同的,如以下代码,指定了过期时间为60秒,及过期的消息被路由的目标,如果不设置的话,默认过期时间为30分钟。

Map<String Object> map =
Map.of("x-dead-letter-exchange" "dead_exchange"
"x-dead-letter-routing-key" "dead_routing"
"x-message-ttl" 60000);
channel.queueDeclare("daly_queue" true false false map);

截图:

队列定时处理(延迟队列-及存在的问题和解决方案)(1)

在队列上,统一设置一个过期时间的缺点是,不能根据业务需求,设置某一个消息的过期时间。

  1. 或声明一个没有过期时间的队列,但在发布消息时,指定消息的过期时间,如:

正常声明队列:

Map<String Object> map =
Map.of("x-dead-letter-exchange" "dead_exchange"
"x-dead-letter-routing-key" "dead_routing");
channel.queueDeclare("daly_queue" true false false map);

发布时,设置过期时间:

prop = new AMQP.BasicProperties().builder()
.expiration("3000").build();
channel.basicPublish("daly_exchange"
"daly_routing"
prop
"B过期时间为3秒".getBytes());

但上述方法,设置每一个消息不同的过期时间,依然存在以下问题:

延迟队列的问题:

1:A消息到达队列Q1并设置A消息的过期时间为10秒。

2:B消息也到达队列Q1,并设置B消息的过期时间为1秒。

3:这种情况下,因为队列中的数据是FIFO,排队执行的,所以,虽然B消息已经过期,但因A消息没有过期无法先将B消息发送给消费者,这就是延迟队列的最大问题。

4:解决方案:使用插件。

问题2示例图-演示

以下演示,用于说明后发的信息虽然已经过期,但因为前一个消息并没有过期,所以,即使后面的消息过期,也不会被消费。

队列定时处理(延迟队列-及存在的问题和解决方案)(2)

开发生产者
  • 声明列信交换机,死信队列及绑定关系。
  • 声明正常交换机,正常队列,队列中消息过期时路由目的地,及绑定关系。
  • 发布消息时,设置消息的过期时间。

package wj.rabbitmq.daly;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import wj.mq.utils.ConnUtils;
import Java.util.Map;
/**
* 延迟队列生产者
*/
@Slf4j
public class DalySender {
public static void main(String[] args) throws Exception {
Connection con = ConnUtils.newConnection();
Channel channel = con.createChannel();
//声明接收死信的的交换机和队列
channel.exchangeDeclare("dead_exchange" "direct" true);
//声明接收列信交换机数据的队列
channel.queueDeclare("dead_queue" true false
false null);
//声明死信交换机与死信队列的绑定关系
channel.queueBind("dead_queue" "dead_exchange"
"dead_routing");
//声明正常接收信息的交换机
channel.exchangeDeclare("daly_exchange" "direct" true);
//声明正常接收数据的队列 并添加列信路由到哪儿去
Map<String Object> map =
Map.of("x-dead-letter-exchange" "dead_exchange"
"x-dead-letter-routing-key" "dead_routing");
channel.queueDeclare("daly_queue" true false
false map);
//声明正常的绑定关系
channel.queueBind("daly_queue" "daly_exchange" "daly_routing");
//先发送一个过期时间为30秒
AMQP.BasicProperties prop =
new AMQP.BasicProperties().builder()
.expiration("30000")
.build();
channel.basicPublish("daly_exchange"
"daly_routing"
prop
"A过期时间为30秒".getBytes());
log.info("信息A过期30秒,发送完成。");
//再发送一个过期时间为3秒
prop = new AMQP.BasicProperties().builder()
.expiration("3000").build();
channel.basicPublish("daly_exchange"
"daly_routing"
prop
"B过期时间为3秒".getBytes());
log.info("信息B过期3秒发送完成。");
con.close();
}
}

开发消费者

开发一个消费者,只需要从死信队列中,读取过期的信息即可。

package wj.rabbitmq.daly;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import wj.mq.utils.ConnUtils;
/**
* 只接收死信队列里面的数据即可,不要消费正常的队列
* 以便于让正常的队列里面消息过期
*/
@Slf4j
public class DalyReceiver {
public static void main(String[] args) throws Exception{
Connection con = ConnUtils.newConnection();
Channel channel = con.createChannel();
System.err.println("准备消费死信队列里面,即过期的信息");
channel.basicConsume("dead_queue"
true
(consumerTag message) -> {
log.info("死信,即过期消息:" new String(message.getBody()));
} consumerTag -> {
//ignore
});
}
}

运行

运行生产者,输出以下日志:

14:07:53.138 信息A过期30秒,发送完成。

14:07:53.142 信息B过期3秒发送完成。

运行消费者,输出以下日志,可见,并没有因为B先过期,而先收到B。

14:08:23.140 死信,即过期消息:A过期时间为30秒

14:08:23.140 死信,即过期消息:B过期时间为3秒

延迟队列问题的解决方案(使用插件)

插件下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.2/rabbitmq_delayed_message_exchange-3.10.2.ez

rabbitmq所有可用插件列表:

https://www.rabbitmq.com/community-plugins.html

查看已经启用的插件

通过命令rabbitmq-plugins list可以查看rabbitmq的插件列表,及已经启用的插件 前面添加了*号的,为已经启用的插件。可见,启用的插件为management和web,其他插件,都没有启动。但也没有我们要使用的dalyed_message_exchange插件,所以,还需要额外的安装这个插件。

队列定时处理(延迟队列-及存在的问题和解决方案)(3)

插件相关命令

官方参考地址:https://www.rabbitmq.com/plugins.html

rabbitmq-plugins list 用于列示所有插件

rabbitmq-plugins enable <plugin-name> 启用插件

rabbitmq-plugins disable <plugin-name> 禁用插件

rabbitmq-plugins directories -s 用于查看plugins可安装的目录 (以下示例,要进入容器执行)

队列定时处理(延迟队列-及存在的问题和解决方案)(4)

安装插件

下载rabbitmq_delayed_message_exchange-<version>.ez文件,进入docker容器,将文件放到plugins目录下,此目录为:/opt/bitnami/rabbitmq/plugins。

然后执行安装插件的命令:

队列定时处理(延迟队列-及存在的问题和解决方案)(5)

如果不是用docker容器运行的,则可以直接将*.ez文件放到rabbitmq的插件目录下为:/usr/lib/rabbitmq/rabbitmq-server-<version>/plugins。

然后就可以执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange。

验证插件

安装插件后,再次查看ui端口的交换机创建处,可以见到新的交换机类型:x-delayed-message .

队列定时处理(延迟队列-及存在的问题和解决方案)(6)

测试发送延迟消息(Java项目)开发生产者
  • 声明延迟交换机,x-delay-message 。
  • 声明普通队列。
  • 声明延迟交换机与普通队列的绑定关系。
  • 发送延迟消息,通过设置x-delay 单位毫秒。先发送一个延迟长的消息如1分钟。再发送一个延迟少的消息,如6秒。测试必须要先收到延迟少的消息,就算是延迟交换机运行成功。

package wj.rabbitmq.delayexchange;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import wj.mq.utils.ConnUtils;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* <pre>
* 使用延迟交换机发送延迟消息
* 发送的消息会被阻塞到延迟交换机中,先到期的消息会先被发送到队列中,并且不再设置过期时间
* 发送的队列的消息的,立刻会被消息
* </pre>
*/
@Slf4j
public class DelayExchangeSender {
public static void main(String[] args) throws Exception {
Connection con = ConnUtils.newConnection();
Channel channel = con.createChannel();
//声明延迟交换机,即指定个x-delayed-message的交换机的基础类型
Map<String Object> map = new HashMap<>();
map.put("x-delayed-type" "direct");//必须写,固定值
channel.exchangeDeclare(
"my-delayed-exchange" //指定交换机的名称,任意
"x-delayed-message" //指定交换机类型,必须是这个类型
true //是否持久化
false //是否自动删除
map);//必须传入这个参数
//声明普通队列
channel.queueDeclare("my-queue"
true false
false null);
//声明绑定关系
channel.queueBind("my-queue"
"my-delayed-exchange"
"delay-routing");
//先发送延迟60秒的消息 其中x-delay为固定key
AMQP.BasicProperties prop = new AMQP.BasicProperties()
.builder().headers(Map.of("x-delay" 1000 * 60))//设置延迟时间为60秒
.build();
channel.basicPublish("my-delayed-exchange"
"delay-routing"
prop
"Delay for 1 minutes".getBytes(StandardCharsets.UTF_8));
log.info("发送消息:[Delay for 1 minutes] 完成");
//再发送一个延迟时间少的信息,如6秒
prop = new AMQP.BasicProperties()
.builder().headers(Map.of("x-delay" 1000*6))
.build();
channel.basicPublish("my-delayed-exchange"
"delay-routing"
prop
"Delay for 6 seconds".getBytes(StandardCharsets.UTF_8));
log.info("发送消息: [Delay for 6 seconds] 完成");
}
}

如果先启动生产者,并已经声明了交换机与队列的绑定关系,则消费者可以不再次声明绑定关系。所以,消费者的代码就变的比较简单了,消费者仅是消费消息,并输入消费的时间:

开发消息者

package wj.rabbitmq.delayexchange;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import wj.mq.utils.ConnUtils;
/**
* 延迟消息消费者
*/
@Slf4j
public class DelayExchangeReceiver {
public static void main(String[] args) throws Exception {
Connection con = ConnUtils.newConnection();
Channel channel = con.createChannel();
//直接消费这个队列即可
channel.basicConsume("my-queue" true
(consumerTag message) -> {
log.info("消费消息:{}" new String(message.getBody()));
} consumerTag -> {
//ignore
});
}
}

启动

先启动生产者,发送信息,输出一下时间:

12:35:03.772 发送消息:[Delay for 1 minutes] 完成

12:35:03.774 发送消息: [Delay for 6 seconds] 完成

快速启动消费者,等待,并查看收到消息:

12:35:09.782 消费消息:Delay for 6 seconds

12:36:03.776 消费消息:Delay for 1 minutes

通过上面的代码,可以看出,消费者在6秒后,先收到了先延迟到期的消息,一分钟以后,再收到延迟一分钟的消息。虽然,先发送的是延迟一分钟的消息,但此消息,并没有阻塞后面延迟时间短的消息。

查看延迟交换机

可以看到rate out 此时间将会是一个倒计时,到时间以后再给发送给队列。

队列定时处理(延迟队列-及存在的问题和解决方案)(7)

在没有收以消息之前,消息会被阻塞到交换机中,所以,没有到延迟时间的消息,不会被发送的队列中。

队列定时处理(延迟队列-及存在的问题和解决方案)(8)

测试发送延迟消息(springBoot项目)

使用springboot项目,声明延迟交换机,需要使用CustumExchange自定义交换机类型。

开发配置类
  • 定义延迟交换机。
  • 定义普通队列。
  • 定义绑定关系。

package wj.mq.config.delay;

import java.util.Map;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.CustomExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* 使用rabbitmq_delayed_message_exchange发送延迟消息

*/

@Configuration

public class DelayMessageExchangeConfig {

@Bean

public CustomExchange delayExchange() {

CustomExchange exchange = //

new CustomExchange("my-delayed-exchange" // 交换机名称

"x-delayed-message" // 交换机的类型,必须是此值

true // durable

false // autoDelete

Map.of("x-delayed-type" "direct")// 传递延迟类型

);

return exchange;

}

@Bean

public Queue myQueue() {

Queue queue = new Queue("my-queue" //队列名称

true //durable

false //exclusive

false);//auto delete

return queue;

}

@Bean

public Binding myBinding(CustomExchange customExchange Queue myQueue) {

Binding binding = BindingBuilder.bind(myQueue)//Queue

.to(customExchange)//exchange

.with("my-routing")//路由routing

.noargs();

return binding;

}

}

开发生产者代码
  • 注入RabbitTemplate
  • 发送,并设置过期时间。
  • 发送信息,使用ApplicationRunner启动后即发送,也可以开发一个Conntroller,动态调用。

package wj.mq.rabbitmq.delay;

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.ApplicationArguments;

import org.springframework.boot.ApplicationRunner;

import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j

@Component

public class DelaySender implements ApplicationRunner {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send() {

// 先发送一个延迟1分钟的消息

rabbitTemplate.convertAndSend("my-delayed-exchange" // 交换机名称

"my-routing" // 路由key

"Message of delay 1 minutes" // 消息对象

new MessagePostProcessor() {// 接收一个消息发送前的处理函数

@Override

public Message postProcessMessage(Message message)//

throws AmqpException {

// 处理消息,设置消息的过期时间设置x-delay头,60秒钟

message.getMessageProperties().setDelay(1000 * 60);

return message;

}

});

log.info("发送延迟1分钟的消息-完成");

// 再发送一个延迟6秒钟的消息

rabbitTemplate.convertAndSend("my-delayed-exchange" // 交换机名称

"my-routing" // 路由key

"Message of delay 6 seconds" // 消息对象

new MessagePostProcessor() {// 接收一个消息发送前的处理函数

@Override

public Message postProcessMessage(Message message)//

throws AmqpException {

// 处理消息,设置消息的过期时间设置x-delay头,6秒钟

message.getMessageProperties().setDelay(1000 * 6);

return message;

}

});

log.info("发送延迟6秒的消息-完成");

}

@Override

public void run(ApplicationArguments args) throws Exception {

send();//启动完成后就发送,也可以通过一个controller调用测试

}

}

开发消息者代码

通过@RabblitListener接收消息

package wj.mq.rabbitmq.delay;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.messaging.handler.annotation.Payload;

import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

/**

* 消费延迟消息

*/

@Slf4j

@Component

public class DelayReceiver {

@RabbitListener(queues = "my-queue" ackMode = "AUTO")

public void cousumer(@Payload() Message message Channel channel) {

log.info("接收到消息:{}" new String(message.getBody()));

}

}

运行测试

通过以下输出可以看出,虽然先发送的延迟1分钟的消息,但由于后发出的消息因延迟时间更少(6秒)而被先接收到。说明这个测试已经成功运行。

2022-10-01 21:28:55.408 INFO : 发送延迟1分钟的消息-完成

2022-10-01 21:28:55.417 INFO : 发送延迟6秒的消息-完成

2022-10-01 21:29:01.433 INFO : 接收到消息:Message of delay 6 seconds

2022-10-01 21:29:55.408 INFO : 接收到消息:Message of delay 1 minutes

延迟插件的运行原理

队列定时处理(延迟队列-及存在的问题和解决方案)(9)

  • 延迟交换机,会将收到信息,先缓存到延迟交换机内部。
  • 消息延迟到期后,才会发送给目标队列。
延迟插件窗口镜像自定义

由于每一次启动延迟插件,需要将rabbitmq_delayed_message_exchange-<version>.ez copy到bitnami/rabbitmq容器的目录: /opt/bitnami/rabbitmq/plugins/下。且如果容器删除后,必须重新copy。

可以创建一个新的镜像,将插件copy到镜像中,在启动时通过RABBITMQ_PLUGINS指定启动的插件,就可以了。

创建Dockerfile

创建新的镜像

from bitnami/rabbitmq:3.10.8

MAINTAINER WJ

COPY rabbitmq_delayed_message_exchange-3.10.2.ez /opt/bitnami/rabbitmq/plugins/

构建新的镜像

docker build -t rabbitmq:1.0 .

启动新的容器

并指定启用的插件。

#!/bin/bash

docker stop mq

docker rm mq

docker run --name mq -d \

-p 5672:5672 \

-p 15672:15672 \

-e RABBITMQ_USERNAME=admin \

-e RABBITMQ_PASSWORD=admin \

-e TZ=Asia/Shanghai \

-e RABBITMQ_PLUGINS=rabbitmq_management rabbitmq_mqtt rabbitmq_stream rabbitmq_delayed_message_exchange \

-v ${PWD}/data:/bitnami \

mq:1.0

进入容器查看

可见,指定的插件,已经启用。

队列定时处理(延迟队列-及存在的问题和解决方案)(10)

通过界面查看:

队列定时处理(延迟队列-及存在的问题和解决方案)(11)

猜您喜欢: