快捷搜索:  汽车  科技

RabbitMQ延迟队列实现(RabbitMQ延迟队列实现)

RabbitMQ延迟队列实现(RabbitMQ延迟队列实现)https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases首先我们需要下载 rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上面的开源项目,我们直接下载即可:Docker安装RabbitMQ#下载镜像 docker pull rabbitmq:3-management #运行rabbitmq docker run -it --rm -d --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management #进入容器并开启管理功能 docker exec -it rabbitmq /bin/bash #进入容器后开启管理功能 rabbitmq-plugins enable rabbitmq_m

什么是延迟队列

延迟队列是一个带有延迟功能的消息队列,本质上也是消息队列, 对于某些业务场景,我们需要让消费者在指定的时间延迟后消费该消息

  • 订单提交后一定时间内没支付的话,提示用户该支付
  • 比如说在头条上写文章也有个定时发布功能,需要延迟指定的时间后去修改文章状态

上述这两个业务场景虽然说可以利用定时任务去完成,但这种方案在数据量大的情况下会给数据库增加无形中的负担,而且定时任务在计算时间上也有时差。

RabbitMQ本身是不直接支持延时队列的,RabbitMQ的延迟队列基于消息的存活时间TTL(Time To Live)和死信交换机DLE(Dead Letter Exchanges)实现:

  1. TTL:RabbitMQ可以对队列和消息各自设置存活时间,规则是两者中较小的值,即队列无消费者连接的消息过期时间,或者消息在队列中一直未被消费的过期时间
  2. DLE:过期的消息通过绑定的死信交换机,路由到指定的死信队列,消费者实际上消费的是死信队列上的消息

Docker安装RabbitMQ

#下载镜像 docker pull rabbitmq:3-management #运行rabbitmq docker run -it --rm -d --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management #进入容器并开启管理功能 docker exec -it rabbitmq /bin/bash #进入容器后开启管理功能 rabbitmq-plugins enable rabbitmq_management

运行起来后可以访问http://localhost:15672

安装延迟队列插件rabbitmq_delayed_messsage_exchange

首先我们需要下载 rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上面的开源项目,我们直接下载即可:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

下载成功后复制到docker容器中去

docker cp ./rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins

接下来再次进入容器中

#查看插件 rabbitmq-plugins list #安装插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

RabbitMQ延迟队列实现(RabbitMQ延迟队列实现)(1)

查看插件列表

RabbitMQ延迟队列实现(RabbitMQ延迟队列实现)(2)

安装成功后在add a new exchange中可以查看

实现延迟队列

开发环境:SpringBoot RabbitMQ

编写配置类

package com.taobao.mqdelaydemo.config; import com.taobao.mqdelaydemo.DelayedTopic; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class MqConfig { @Bean public Queue queue(){ return new Queue(DelayedTopic.DELAY_QUEUE); } /** * 延迟交换机 */ @Bean public CustomExchange customExchange(){ Map<String Object> map = new HashMap<>(); map.put("x-delayed-type" "direct"); //设置持久化参数 return new CustomExchange(DelayedTopic.DELAY_EXCHANGE "x-delayed-message" true false map); } /** * 绑定延迟队列和交换机 */ @Bean public Binding binding(@Qualifier("queue") Queue queue @Qualifier("customExchange") CustomExchange customExchange){ return BindingBuilder.bind(queue).to(customExchange).with(DelayedTopic.DELAY_ROUTING_KEY).noargs(); } }

编写自定义类

package com.taobao.mqdelaydemo; public interface DelayedTopic { String DELAY_EXCHANGE = "delay_exchange"; String DELAY_QUEUE = "delay_queue"; String DELAY_ROUTING_KEY = "delay_routing_key"; }

编写消费发送者

package com.taobao.mqdelaydemo.component; import com.taobao.mqdelaydemo.DelayedTopic; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDateTime; @Component public class MqSender { @Autowired AmqpTemplate amqpTemplate; public void sender() { String msg = "test delayed message"; System.out.println("Send Time:" LocalDateTime.now() " Send:" msg); //延迟6s执行 this.amqpTemplate.convertAndSend(DelayedTopic.DELAY_EXCHANGE DelayedTopic.DELAY_ROUTING_KEY msg x -> { x.getMessageProperties().setDelay(6000); return x; }); } }

编写消息的消费者

package com.taobao.mqdelaydemo.component; import com.rabbitmq.client.Channel; import com.taobao.mqdelaydemo.DelayedTopic; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; import java.time.LocalDateTime; @Component public class MqReceiver { @RabbitListener(queues = DelayedTopic.DELAY_QUEUE) public void receive(Message message Channel channel) throws IOException { String str = new String(message.getBody()); System.out.println("Receive Time:" LocalDateTime.now() " Receive:" str); channel.basicAck(message.getMessageProperties().getDeliveryTag() false); } }

测试消息

package com.taobao.mqdelaydemo.controller; import com.taobao.mqdelaydemo.component.MqSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping public class TestController { @Autowired MqSender mqSender; @GetMapping("/sender") public String sender(){ this.mqSender.sender(); return "ok"; } }

延迟6秒后发送消息

RabbitMQ延迟队列实现(RabbitMQ延迟队列实现)(3)

扩展

除了延迟队列外,如果不考虑到消息的持久化,并且系统如果是单机版的话可以使用jdk的Timer和TimerTask两个类来实现,也可以达到一些场景的目的

public class MyTimerTask extends TimerTask { @Override public void run() { System.out.println("MyTimerTask:" LocalDateTime.now()); } } public static void main(String[] args) { Timer timer = new Timer(); System.out.println("Main Thread Start at:" LocalDateTime.now()); MyTimerTask myTimerTask = new MyTimerTask(); timer.schedule(myTimerTask 5000); System.out.println("Main Thread Stop at:" LocalDateTime.now()); }

RabbitMQ延迟队列实现(RabbitMQ延迟队列实现)(4)

延迟5秒执行

参考

https://juejin.cn/post/6844904163168485383

https://www.baeldung.com/java-timer-and-timertask

猜您喜欢: