发布可靠消息(发布确认)
发布可靠消息(发布确认)截图:package wj.rabbitmq.confirm;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import lombok.extern.slf4j.Slf4j;import wj.mq.utils.ConnUtils;@Slf4jpublic class ConfirmSender {public static void main(String[] args) throws Exception {Connection con = ConnUtils.newConnection();Channel channel = con.createChannel();log.info("发布信息");channel.basicPublish("不存在的交换机"
Publisher确认是实现可靠发布的RabbitMQ扩展。当在通道上启用发布者确认时,代理将异步确认客户端发布的消息,这意味着它们已在服务器端得到处理。
启用发布确认:
发布消息,等待确认:
在前面的示例中,我们像往常一样发布消息,并使用Channel#waitForConfirmsOrDie(long)方法等待消息确认。消息一经确认,方法就会返回。如果消息在超时时间内未得到确认,或者消息被中断(这意味着代理由于某种原因无法处理它),该方法将抛出异常。异常的处理通常包括记录错误消息和/或重试发送消息。
这种技术非常简单,但也有一个主要缺点:它大大降低了发布速度,因为消息的确认会阻止所有后续消息的发布。这种方法不会每秒传递超过数百条已发布消息的吞吐量。然而,这对于某些应用程序来说已经足够好了。
无发布确认的情况以下示例在没有发布确认一情况下,测试发布给一个不存在的交换机。
程序代码依然会返回信息发布完成。
package wj.rabbitmq.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import wj.mq.utils.ConnUtils;
@Slf4j
public class ConfirmSender {
public static void main(String[] args) throws Exception {
Connection con = ConnUtils.newConnection();
Channel channel = con.createChannel();
log.info("发布信息");
channel.basicPublish("不存在的交换机" //任意一个不存在的交换机
"不存在的路由key" //声明一个不存在的路由key
null //properties
"Hello".getbytes());
log.info("信息发布完成"); //如果没有发布确认,交换机不存在也同样会认为发布成功
}
}
截图:
输出效果:
15:09:01.024 发布信息
15:09:01.031 信息发布完成
有发布确认的情况package wj.rabbitmq.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import wj.mq.utils.ConnUtils;
@Slf4j
public class ConfirmSender {
public static void main(String[] args) throws Exception {
Connection con = ConnUtils.newConnection();
Channel channel = con.createChannel();
channel.confirmSelect();//开启发布确认
log.info("发布信息");
channel.basicPublish("不存在的交换机" //任意一个不存在的交换机
"不存在的路由key" //声明一个不存在的路由key
null //properties
"Hello".getBytes());
log.info("信息发布完成");
try {
channel.waitForConfirms(1000 * 10);//10 seconds
log.info("被确认");
}catch (Exception e){
log.info("确认失败:{}" e.getMessage());
}
}
}
运行效果:
16:53:11.128 发布信息
16:53:11.133 信息发布完成
16:53:11.145 确认失败,reply-code=404 reply-text=NOT_FOUND - no exchange
发布确认- 发布消息的过程中异常,会导致信道关闭,所以在Exception中重新打开channel。
- 如果routing key不存在,交换机在收到消息后,会直接丢弃。所以,这儿的确认,仅是交换机收到了消息。
package wj.rabbitmq.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import wj.mq.utils.ConnUtils;
@Slf4j
public class ConfirmSender {
public static void main(String[] args) throws Exception {
Connection con = ConnUtils.newConnection();
Channel channel = con.createChannel();
channel.confirmSelect();//开启发布确认
for(int i=0;i<10;i ) {
String msg = ("Hello" i);
log.info("发布信息:{}" msg);
String exchange = "my-exchange";
if(i==3){ //故意:让第三条发布时设置不一个不存在的交换机
exchange="不存在的交换机";
}
String routing = "my-routing";
if(i==4){
routing="不存在的路由";
}
channel.basicPublish(exchange //任意一个不存在的交换机
routing //声明一个不存在的路由key
null //properties
msg.getBytes());
log.info("信息发布完成");
try{
channel.waitForConfirms(1000*10);
log.info("交换机确认收到\n");
}catch ( Exception e){
log.info("交换机没有收到 失败:{}\n" e.getMessage());
if(!channel.isOpen()){//异常后,channel会关闭,所以再重新打开一下
channel =con.createChannel();
channel.confirmSelect();//开启发布确认
}
}
}
}
}
截图:
运行效果:
17:36:43.544 发布信息:Hello0
17:36:43.550 信息发布完成
17:36:43.551 交换机确认收到
17:36:43.551 发布信息:Hello1
17:36:43.551 信息发布完成
17:36:43.553 交换机确认收到
17:36:43.553 发布信息:Hello2
17:36:43.553 信息发布完成
17:36:43.554 交换机确认收到
17:36:43.555 发布信息:Hello3
17:36:43.555 信息发布完成
17:36:43.561 交换机没有收到 失败:channel error;channel.close,404 reply-text=NOT_FOUND - no exchange '不存在的交换机' in vhost '/' class-id=60 method-id=40)
17:36:43.564 发布信息:Hello4
17:36:43.566 信息发布完成
17:36:43.566 交换机确认收到
17:36:43.566 发布信息:Hello5
17:36:43.567 信息发布完成
17:36:43.568 交换机确认收到
17:36:43.568 发布信息:Hello6
17:36:43.569 信息发布完成
17:36:43.570 交换机确认收到
17:36:43.570 发布信息:Hello7
17:36:43.572 信息发布完成
17:36:43.572 交换机确认收到
17:36:43.572 发布信息:Hello8
17:36:43.572 信息发布完成
17:36:43.573 交换机确认收到
17:36:43.574 发布信息:Hello9
17:36:43.574 信息发布完成
17:36:43.575 交换机确认收到
使用监听器小结通过addConfirmListener可以监听到因exchange不存在而没有被确认收到的消息,但这本质上,一般不会发生,如果程序代码出现这种低级的错误,基本上就不用玩了。如果RabbitMQ宕机,而Connection则根本不存在,所以,这种错误应该不用浪费代码去处理。
消息回退- 通过ReturnListener可以处理因路由不存在而被回退的消息。
- 当basicPublish的mandatory参数设置为false时,如果消息没有被正确路由,消息会丢弃,不会返回给生产者。
以下是示例代碼:
package wj.rabbitmq.confirm;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import wj.mq.utils.ConnUtils;
import java.io.IOException;
@Slf4j
public class ConfirmSender3 {
public static void main(String[] args) throws Exception {
Connection con = ConnUtils.newConnection();
Channel channel = con.createChannel();
channel.confirmSelect();//开启发布确认
ReturnListener returnListener = new ReturnListener() {
@Override
public void handleReturn(int replyCode String replyText
String exchange String routingKey
AMQP.BasicProperties properties
byte[] body) throws IOException {
log.info("消息退回:replyCode={} replyText={} exchange={} routingKey={} id={}"
replyCode replyText exchange routingKey properties.getCorrelationId()
new String(body));
}
};
channel.addReturnListener(returnListener);
for (int i = 0; i < 10; i ) {
String msg = ("Hello" i);
log.info("发布信息:{}" msg);
String exchange = "my-exchange";
String routing = "my-routing";
if (i == 4) {
routing = "不存在的路由";
}
AMQP.BasicProperties prop =
new AMQP.BasicProperties().builder()
.correlationId("" i)
.build();
channel.basicPublish(exchange //任意一个不存在的交换机
routing //声明一个不存在的路由key
true
prop //properties
msg.getBytes());
log.info("信息发布完成");
}
}
}
截图:
輸出:
通過以下信息,可以看出,因第4個消息,設置了一個不存在的路由key,所以信息被回退。
18:23:31.931 [main] 信息发布完成
18:23:31.932 [main] 发布信息:Hello5
18:23:31.932 [AMQP Connection 192.168.56.61:5672] 消息退回:
replyCode=312 replyText=NO_ROUTE exchange=my-exchange routingKey=不存在的路由 id=4
18:23:31.932 [main] 信息发布完成
18:23:31.932 [main] 发布信息:Hello6
18:23:31.933 [main] 信息发布完成
18:23:31.933 [main] 发布信息:Hello7
18:23:31.933 [main] 信息发布完成
18:23:31.933 [main] 发布信息:Hello8
18:23:31.935 [main] 信息发布完成
18:23:31.935 [main] 发布信息:Hello9
18:23:31.936 [main] 信息发布完成