RabbitMQ延迟队列实现(RabbitMQ延迟队列实现)
RabbitMQ延迟队列实现(RabbitMQ延迟队列实现)https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases首先我们需要下载 rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上面的开源项目,我们直接下载即可:Docker安装RabbitMQ#下载镜像 docker pull rabbitmq:3-management #运行rabbitmq docker run -it --rm -d --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management #进入容器并开启管理功能 docker exec -it rabbitmq /bin/bash #进入容器后开启管理功能 rabbitmq-plugins enable rabbitmq_m
什么是延迟队列
延迟队列是一个带有延迟功能的消息队列,本质上也是消息队列, 对于某些业务场景,我们需要让消费者在指定的时间延迟后消费该消息
- 订单提交后一定时间内没支付的话,提示用户该支付
- 比如说在头条上写文章也有个定时发布功能,需要延迟指定的时间后去修改文章状态
上述这两个业务场景虽然说可以利用定时任务去完成,但这种方案在数据量大的情况下会给数据库增加无形中的负担,而且定时任务在计算时间上也有时差。
RabbitMQ本身是不直接支持延时队列的,RabbitMQ的延迟队列基于消息的存活时间TTL(Time To Live)和死信交换机DLE(Dead Letter Exchanges)实现:
- TTL:RabbitMQ可以对队列和消息各自设置存活时间,规则是两者中较小的值,即队列无消费者连接的消息过期时间,或者消息在队列中一直未被消费的过期时间
- DLE:过期的消息通过绑定的死信交换机,路由到指定的死信队列,消费者实际上消费的是死信队列上的消息
Docker安装RabbitMQ
#下载镜像
docker pull rabbitmq:3-management
#运行rabbitmq
docker run -it --rm -d --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management
#进入容器并开启管理功能
docker exec -it rabbitmq /bin/bash
#进入容器后开启管理功能
rabbitmq-plugins enable rabbitmq_management
运行起来后可以访问http://localhost:15672
安装延迟队列插件rabbitmq_delayed_messsage_exchange
首先我们需要下载 rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上面的开源项目,我们直接下载即可:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
下载成功后复制到docker容器中去
docker cp ./rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins
接下来再次进入容器中
#查看插件
rabbitmq-plugins list
#安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看插件列表
安装成功后在add a new exchange中可以查看
实现延迟队列
开发环境:SpringBoot RabbitMQ
编写配置类
package com.taobao.mqdelaydemo.config;
import com.taobao.mqdelaydemo.DelayedTopic;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MqConfig {
@Bean
public Queue queue(){
return new Queue(DelayedTopic.DELAY_QUEUE);
}
/**
* 延迟交换机
*/
@Bean
public CustomExchange customExchange(){
Map<String Object> map = new HashMap<>();
map.put("x-delayed-type" "direct");
//设置持久化参数
return new CustomExchange(DelayedTopic.DELAY_EXCHANGE "x-delayed-message" true false map);
}
/**
* 绑定延迟队列和交换机
*/
@Bean
public Binding binding(@Qualifier("queue") Queue queue
@Qualifier("customExchange") CustomExchange customExchange){
return BindingBuilder.bind(queue).to(customExchange).with(DelayedTopic.DELAY_ROUTING_KEY).noargs();
}
}
编写自定义类
package com.taobao.mqdelaydemo;
public interface DelayedTopic {
String DELAY_EXCHANGE = "delay_exchange";
String DELAY_QUEUE = "delay_queue";
String DELAY_ROUTING_KEY = "delay_routing_key";
}
编写消费发送者
package com.taobao.mqdelaydemo.component;
import com.taobao.mqdelaydemo.DelayedTopic;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
public class MqSender {
@Autowired
AmqpTemplate amqpTemplate;
public void sender() {
String msg = "test delayed message";
System.out.println("Send Time:" LocalDateTime.now() " Send:" msg);
//延迟6s执行
this.amqpTemplate.convertAndSend(DelayedTopic.DELAY_EXCHANGE DelayedTopic.DELAY_ROUTING_KEY msg x -> {
x.getMessageProperties().setDelay(6000);
return x;
});
}
}
编写消息的消费者
package com.taobao.mqdelaydemo.component;
import com.rabbitmq.client.Channel;
import com.taobao.mqdelaydemo.DelayedTopic;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
@Component
public class MqReceiver {
@RabbitListener(queues = DelayedTopic.DELAY_QUEUE)
public void receive(Message message Channel channel) throws IOException {
String str = new String(message.getBody());
System.out.println("Receive Time:" LocalDateTime.now() " Receive:" str);
channel.basicAck(message.getMessageProperties().getDeliveryTag() false);
}
}
测试消息
package com.taobao.mqdelaydemo.controller;
import com.taobao.mqdelaydemo.component.MqSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping
public class TestController {
@Autowired
MqSender mqSender;
@GetMapping("/sender")
public String sender(){
this.mqSender.sender();
return "ok";
}
}
延迟6秒后发送消息
扩展
除了延迟队列外,如果不考虑到消息的持久化,并且系统如果是单机版的话可以使用jdk的Timer和TimerTask两个类来实现,也可以达到一些场景的目的
public class MyTimerTask extends TimerTask {
@Override
public void run() {
System.out.println("MyTimerTask:" LocalDateTime.now());
}
}
public static void main(String[] args) {
Timer timer = new Timer();
System.out.println("Main Thread Start at:" LocalDateTime.now());
MyTimerTask myTimerTask = new MyTimerTask();
timer.schedule(myTimerTask 5000);
System.out.println("Main Thread Stop at:" LocalDateTime.now());
}
延迟5秒执行
参考
https://juejin.cn/post/6844904163168485383
https://www.baeldung.com/java-timer-and-timertask