rabbitmq死信队列应用(RabbitMQ从零到集群高可用)
rabbitmq死信队列应用(RabbitMQ从零到集群高可用)public static void Consumer() { //死信交换机 string dlxexChange = "dlx.exchange"; //死信队列 string dlxQueueName = "dlx.queue"; //消息交换机 string exchange = "direct-exchange"; //消息队列 string queueName = "queue_a"; var connection = RabbitMQHelper.GetConnection();
目录
- 一、死信队列
- 二、延时队列
- 三、延时队列消息设置不同过期时间
一、死信队列
描述:Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-dead-letter-routing-key(死信路由key)指向Q2(队列2)
P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发特定情况,自动把消息经X2(交换机2)路由到Q2(队列2),C(消费者)直接消息Q2的消息。
特定情况有哪些呢:
- 消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue=false);
- 当前队列中的消息数量已经超过最大长度(创建队列时指定" x-max-length参数设置队列最大消息数量)。
- 消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL(Time To Live)时间;
这里演示情况1:
假如场景:Q1中队列数据不完整,就算从新处理也会报错,那就可以不ack,把这个消息转到死信队列另外处理。
生产者:
public static void SendMessage()
{
//死信交换机
string dlxexChange = "dlx.exchange";
//死信队列
string dlxQueueName = "dlx.queue";
//消息交换机
string exchange = "direct-exchange";
//消息队列
string queueName = "queue_a";
using (var connection = RabbitMQHelper.GetConnection())
{
using (var channel = connection.CreateModel())
{
//创建死信交换机
channel.ExchangeDeclare(dlxexChange type: ExchangeType.Direct durable: true autoDelete: false);
//创建死信队列
channel.QueueDeclare(dlxQueueName durable: true exclusive: false autoDelete: false);
//死信队列绑定死信交换机
channel.QueueBind(dlxQueueName dlxexChange routingKey: dlxQueueName);
// 创建消息交换机
channel.ExchangeDeclare(exchange type: ExchangeType.Direct durable: true autoDelete: false);
//创建消息队列 并指定死信队列
channel.QueueDeclare(queueName durable: true exclusive: false autoDelete: false arguments:
new Dictionary<string object> {
{ "x-dead-letter-exchange" dlxexChange} //设置当前队列的DLX(死信交换机)
{ "x-dead-letter-routing-key" dlxQueueName} //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
});
//消息队列绑定消息交换机
channel.QueueBind(queueName exchange routingKey: queueName);
string message = "hello rabbitmq message";
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//发布消息
channel.BasicPublish(exchange: exchange
routingKey: queueName
basicProperties: properties
body: Encoding.UTF8.GetBytes(message));
Console.WriteLine($"向队列:{queueName}发送消息:{message}");
}
}
}
消费者:
public static void Consumer()
{
//死信交换机
string dlxexChange = "dlx.exchange";
//死信队列
string dlxQueueName = "dlx.queue";
//消息交换机
string exchange = "direct-exchange";
//消息队列
string queueName = "queue_a";
var connection = RabbitMQHelper.GetConnection();
{
//创建信道
var channel = connection.CreateModel();
{
//创建死信交换机
channel.ExchangeDeclare(dlxexChange type: ExchangeType.Direct durable: true autoDelete: false);
//创建死信队列
channel.QueueDeclare(dlxQueueName durable: true exclusive: false autoDelete: false);
//死信队列绑定死信交换机
channel.QueueBind(dlxQueueName dlxexChange routingKey: dlxQueueName);
// 创建消息交换机
channel.ExchangeDeclare(exchange type: ExchangeType.Direct durable: true autoDelete: false);
//创建消息队列 并指定死信队列
channel.QueueDeclare(queueName durable: true exclusive: false autoDelete: false arguments:
new Dictionary<string object> {
{ "x-dead-letter-exchange" dlxexChange} //设置当前队列的DLX
{ "x-dead-letter-routing-key" dlxQueueName} //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
});
//消息队列绑定消息交换机
channel.QueueBind(queueName exchange routingKey: queueName);
var consumer = new EventingBasicConsumer(channel);
channel.BasicQos(prefetchSize: 0 prefetchCount: 1 global: true);
consumer.Received = (model ea) =>
{
//处理业务
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"队列{queueName}消费消息:{message} 不做ack确认");
//channel.BasicAck(ea.DeliveryTag false);
//不ack(BasicNack) 且不把消息放回队列(requeue:false)
channel.BasicNack(ea.DeliveryTag false requeue: false);
};
channel.BasicConsume(queueName autoAck: false consumer);
}
}
}
消费者加上channel.BasickNack()模拟消息处理不了,不ack确认。
执行结果:
RabbitMQ管理界面:
看到消息队列为queue_a,特性有DLX(死信交换机),DLK(死信路由)。因为消费端不nack,触发了死信,被转发到了死信队列dlx.queue。
回到顶部
二、延时队列延时队列其实也是配合死信队列一起用,其实就是上面死信队列的第二中情况。给队列添加消息过时时间(TTL),变成延时队列。
简单的描述就是:P(生产者)发送消息到Q1(延时队列),Q1的消息有过期时间,比如10s,那10s后消息过期就会触发死信,从而把消息转发到Q2(死信队列)。
解决问题场景:像商城下单,未支付时取消订单场景。下单时写一条记录入Q1 延时30分钟后转到Q2,消费Q2,检查订单,支付则不做操作,没支付则取消订单,恢复库存。
生产者代码:
public static void SendMessage()
{
//死信交换机
string dlxexChange = "dlx.exchange";
//死信队列
string dlxQueueName = "dlx.queue";
//消息交换机
string exchange = "direct-exchange";
//消息队列
string queueName = "delay_queue";
using (var connection = RabbitMQHelper.GetConnection())
{
using (var channel = connection.CreateModel())
{
//创建死信交换机
channel.ExchangeDeclare(dlxexChange type: ExchangeType.Direct durable: true autoDelete: false);
//创建死信队列
channel.QueueDeclare(dlxQueueName durable: true exclusive: false autoDelete: false);
//死信队列绑定死信交换机
channel.QueueBind(dlxQueueName dlxexChange routingKey: dlxQueueName);
// 创建消息交换机
channel.ExchangeDeclare(exchange type: ExchangeType.Direct durable: true autoDelete: false);
//创建消息队列 并指定死信队列,和设置这个队列的消息过期时间为10s
channel.QueueDeclare(queueName durable: true exclusive: false autoDelete: false arguments:
new Dictionary<string object> {
{ "x-dead-letter-exchange" dlxexChange} //设置当前队列的DLX(死信交换机)
{ "x-dead-letter-routing-key" dlxQueueName} //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
{ "x-message-TTL" 10000} //设置队列的消息过期时间
});
//消息队列绑定消息交换机
channel.QueueBind(queueName exchange routingKey: queueName);
string message = "hello rabbitmq message";
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//发布消息
channel.BasicPublish(exchange: exchange
routingKey: queueName
basicProperties: properties
body: Encoding.UTF8.GetBytes(message));
Console.WriteLine($"{DateTime.Now} 向队列:{queueName}发送消息:{message}");
}
}
}
消费者代码:
public static void Consumer()
{
//死信交换机
string dlxexChange = "dlx.exchange";
//死信队列
string dlxQueueName = "dlx.queue";
var connection = RabbitMQHelper.GetConnection();
{
//创建信道
var channel = connection.CreateModel();
{
//创建死信交换机
channel.ExchangeDeclare(dlxexChange type: ExchangeType.Direct durable: true autoDelete: false);
//创建死信队列
channel.QueueDeclare(dlxQueueName durable: true exclusive: false autoDelete: false);
//死信队列绑定死信交换机
channel.QueueBind(dlxQueueName dlxexChange routingKey: dlxQueueName);
var consumer = new EventingBasicConsumer(channel);
channel.BasicQos(prefetchSize: 0 prefetchCount: 1 global: true);
consumer.Received = (model ea) =>
{
//处理业务
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"{DateTime.Now},队列{dlxQueueName}消费消息:{message}");
channel.BasicAck(ea.DeliveryTag false);
};
channel.BasicConsume(dlxQueueName autoAck: false consumer);
}
}
}
执行代码:
向延时队列发送消息,监听死信队列,发送和收到消息时间刚好是设置的10s。
RabbitMQ管理界面:
回到顶部
三、延时队列消息设置不同过期时间上面的延时队列能解决消息过期时间都是相同的场景,能不能解决消息的过期时间是不一样的呢?
例如场景:机器人客服,为了更像人为操作,收到消息后要随机3-10秒回复客户。
1)队列不设置TTL(消息过期时间),把过期时间设置在消息上。
生产者代码:
public static void SendMessage()
{
//死信交换机
string dlxexChange = "dlx.exchange";
//死信队列
string dlxQueueName = "dlx.queue";
//消息交换机
string exchange = "direct-exchange";
//消息队列
string queueName = "delay_queue";
using (var connection = RabbitMQHelper.GetConnection())
{
using (var channel = connection.CreateModel())
{
//创建死信交换机
channel.ExchangeDeclare(dlxexChange type: ExchangeType.Direct durable: true autoDelete: false);
//创建死信队列
channel.QueueDeclare(dlxQueueName durable: true exclusive: false autoDelete: false);
//死信队列绑定死信交换机
channel.QueueBind(dlxQueueName dlxexChange routingKey: dlxQueueName);
// 创建消息交换机
channel.ExchangeDeclare(exchange type: ExchangeType.Direct durable: true autoDelete: false);
//创建消息队列 并指定死信队列,和设置这个队列的消息过期时间为10s
channel.QueueDeclare(queueName durable: true exclusive: false autoDelete: false arguments:
new Dictionary<string object> {
{ "x-dead-letter-exchange" dlxexChange} //设置当前队列的DLX(死信交换机)
{ "x-dead-letter-routing-key" dlxQueueName} //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
//{ "x-message-ttl" 10000} //设置队列的消息过期时间
});
//消息队列绑定消息交换机
channel.QueueBind(queueName exchange routingKey: queueName);
string message = "hello rabbitmq message 10s后处理";
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Expiration = "10000";
//发布消息 延时10s
channel.BasicPublish(exchange: exchange
routingKey: queueName
basicProperties: properties
body: Encoding.UTF8.GetBytes(message));
Console.WriteLine($"{DateTime.Now} 向队列:{queueName}发送消息:{message} 延时:10s");
string message2 = "hello rabbitmq message 5s后处理";
var properties2 = channel.CreateBasicProperties();
properties2.Persistent = true;
properties2.Expiration = "5000";
//发布消息 延时5s
channel.BasicPublish(exchange: exchange
routingKey: queueName
basicProperties: properties2
body: Encoding.UTF8.GetBytes(message2));
Console.WriteLine($"{DateTime.Now} 向队列:{queueName}发送消息:{message2} 延时:5s");
}
}
}
消费者代码还是上面延时队列的不变,先试下效果。
生产者向队列中发送一条延时10s的消息再发一条延时5秒的消息,但消费者却先拿到延时10s的,再拿到延时5秒的,我想要的结果是先拿到延时5s的再拿到延时10s的,是什么原因呢。
原因是:队列是先进先出的,而RabbitMQ只会对首位第一条消息做检测,第一条没过期,那么后面的消息就会阻塞住等待前面的过期。
解决办法:增加一个消费者对延时队列消费,不ack,把第一条消息放到队列尾部。一直让消息在流动,这样就能检测到了。
2)新增消费者代码:
public static void DelayConsumer()
{
//延时队列
string queueName = "delay_queue";
var connection = RabbitMQHelper.GetConnection();
{
//创建信道
var channel = connection.CreateModel();
{
var consumer = new EventingBasicConsumer(channel);
channel.BasicQos(prefetchSize: 0 prefetchCount: 1 global: true);
consumer.Received = (model ea) =>
{
//处理业务
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Thread.Sleep(20);//消息少的时候可以加个睡眠时间减少IO
channel.BasicNack(ea.DeliveryTag false requeue: true);
};
channel.BasicConsume(queueName autoAck: false consumer);
}
}
}
执行效果:
这会得到了想要的效果。
RabbitMQ管理界面: