RabbitMQ消息确认机制

为什么要有消息确认

  1. 由于网络可能以不可预知的方式出现故障,且检测故障可能需要耗费一些时间;
  2. 因此不能保证发送的消息能够到达对等方或由它成功地处理。

消息确认流程

RabbitMQ 的消息确认机制如下:

01_RabbitMQ消息确认机制.png

从图中我们可以看出:

  • 生产者发送消息到 RabbitMQ Server 后,RabbitMQ Server 需要对生产者进行消息 Confirm 确认;
  • 消费者消费消息后需要对 RabbitMQ Server 进行消息 ACK 确认。

这两个机制都是收到 TCP 协议的启发,它们对于数据安全至关重要。下面就分别从生产者、消费者两个方面结合实例来认识消息确认机制。

备注:

  1. 在 RabbitMQ 中 有两种事务机制来确保消息的安全送达,分别是事务机制和确认机制;
  2. 事务机制需要每个消息或一组消息发布、提交的通道设置为事务性的,因此会非常耗费性能,降低了 Rabbitmq 的消息吞吐量;
  3. 因此我们在实际生产中通常采用确认机制,下面的实例演示就采用确认机制来进行编码。

生产者确认

消息投递和消息确认链路

我们先来看一下 RabbitMQ 消息投递和接收的一个完整链路如下:

02_RabbitMQ消息确认机制.png

消息投递可靠性保证

消息投递的链路用文字表示:

producer->rabbitmq broker cluster->exchange->queue->consumer

由于:

  1. 生产者向 RabbitMQ Server 发出的消息可能会在发送途中丢失或者需要经过一定的延迟后才能成功发送到 RabbitMQ Server;
  2. 因此,需要 RabbitMQ 告诉生产者,生产者才能知道自己发布的消息是否已经送达。

在编码时我们可以用两个选项用来控制消息投递的可靠性:

  • 消息从 producer 到 RabbitMQ broker cluster 成功,则会返回一个 confirmCallback
  • 消息从 exchange 到 queue 投递失败,则会返回一个 returnCallback

我们可以利用这两个 callback 接口来控制消息的一致性和处理一部分的异常情况。

开启confirm和return确认

server.port=10420 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 开启发送确认 spring.rabbitmq.publisher-confirms=true # 开启发送失败退回(消息有没有找到合适的队列) spring.rabbitmq.publisher-returns=true

使用callback接口来确保消息投递状态

在 RabbitConfig 配置类里,定义 RabbitTemplate Bean,使用 callback 接口:

/** * RabbitMQ配置 * * @date 2020-05-17 17:20 **/ @Slf4j @Configuration public class RabbitConfig { @Autowired CachingConnectionFactory cachingConnectionFactory; @Bean RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); // 消息只要被 rabbitmq broker 接收到就会执行 confirmCallback // 如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback // 被broker接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里 rabbitTemplate.setConfirmCallback((data, ack, cause) -> { String msgId = data.getId(); if (ack) { log.info(msgId + ": 消息发送成功"); } else { log.info(msgId + ": 消息发送失败"); } }); // confirm 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。 // 在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式 // 这样如果未能投递到目标 queue 里将调用 returnCallback,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info(MessageFormat.format("消息发送失败,ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)); // TODO 做消息发送失败时的处理逻辑 }); return rabbitTemplate; } /** * 声明队列 * 参数说明: * durable 是否持久化,默认是false(持久化队列则数据会被存储在磁盘上,当消息代理重启时数据不会丢失;暂存队列只对当前连接有效) * exclusive 默认是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable * autoDelete 默认是false,是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 * 一般设置一下队列的持久化就好,其余两个就是默认false * * @return Queue **/ @Bean Queue myQueue() { return new Queue(QueueConstants.QUEUE\_NAME, true); } // 设置交换机,类型为 direct @Bean DirectExchange myExchange() { return new DirectExchange(QueueConstants.QUEUE\_EXCHANGE\_NAME, true, false); } // 绑定:将交换机和队列绑定,并设置路由匹配键 @Bean Binding queueBinding() { return BindingBuilder.bind(myQueue()).to(myExchange()).with(QueueConstants.QUEUE\_ROUTING\_KEY\_NAME); }

消息生产端

在 ProducerController 里,主要干了以下几件事:

  • 提供了一个 Rest 接口 sendDirectMessage,通过请求该接口,可以实现生产者发送消息的功能;
  • 在该接口内部使用了 CorrelationData,该对象内部只有一个 id 属性,用来表示消息的唯一性;
  • 使用 rabbitTemplate.convertAndSend 像 RabbitMQ 发送消息(这里使用的 rabbitTemplate 就是在 RabbitConfig 里被重写的 RabbitTemplate)。
/** * 消息生产端 * @date 2020-05-17 18:30 **/ @RestController public class ProducerController { /\*\* \* RabbitTemplate提供了发送/接收消息的方法 \*/ @Autowired RabbitTemplate rabbitTemplate; /** * 生产消息 * * @Date 上午12:12 2020/5/20 * @param test * @param test2 * @return java.lang.String **/ @GetMapping("/sendDirectMessage") public String sendDirectMessage(String test,Integer test2) { // 生成消息的唯一id String msgId = UUID.randomUUID().toString(); String messageData = "hello,this is rabbitmq demo message"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 定义要发送的消息对象 Map<String,Object\> messageObj = new HashMap<>(); messageObj.put("msgId",msgId); messageObj.put("messageData",messageData); messageObj.put("createTime",createTime); rabbitTemplate.convertAndSend(QueueConstants.QUEUE\_EXCHANGE\_NAME,QueueConstants.QUEUE\_ROUTING\_KEY\_NAME, messageObj,new CorrelationData(msgId)); return "message send ok"; } }

生产消息

  1. 保存代码,在 RabbitConfig 里的 setConfirmCallback 方法内部打上断点;

  2. 重启服务后,使用 PostMan 请求生产消息接口:[http://你的域名](http://xn--6qqv7i2xdt95b/):10420/sendDirectMessage,生产消息,并将消息发送给 RabbitMQ:

03_RabbitMQ消息确认机制.png

  1. 然后打开 RabbitMQ 管理界面,找到对应的队列,会发现:

04_RabbitMQ消息确认机制.png

  1. 在 IDEA 里,服务在启动后直接停在断点处:

05_RabbitMQ消息确认机制.png

也就说明我们生产的消息已经成功的到达了 RabbitMQ Server 里。

  1. 继续执行断点调试的绿色箭头,发现 setReturnCallback 方法里的断点没有执行到,也就说明了我们生产的消息已经被交换机顺利的投递到队列里去了。

总结

至此,生产者消息确认结束,且通过运行的实例,我们能够得出结论:本次生产的消息已经正确无误的投递到了队列中去。

消费者确认

消费者确认指的就是 RabbitMQ 需要确认消息到底有没有被收到,来确定要不要将该条消息从队列中删除掉。这就需要消费者来告诉 RabbitMQ,有以下两种方式:

自动应答

消费者在消费消息的时候,如果设定应答模式为自动,则消费者收到消息后,消息就会立即被 RabbitMQ 从队列中删除掉。因此,在实际开发者,我们基本上是将消费应答模式设置为手动确认更为妥当一些。

手动应答

消费者在收到消息后:

  • 可以在既定的正常情况下进行确认(告诉 RabbitMQ,我已经消费过该消息了,你可以删除该条数据了);
  • 可以在既定的异常情况下不进行确认(RabbitMQ 会继续保留该条数据),这样下一次可以继续消费该条数据。

开启手动应答

server.port=10421 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 开启 ACK(消费者接收到消息时手动确认) spring.rabbitmq.listener.simple.acknowledge-mode=manual

消息消费者

ConsumerController 里主要干了以下几件事儿:

  1. 使用 @RabbitListener 来监听队列;
  2. 从消息头里拿到消息的唯一表示 deliveryTag
  3. 使用 channel.basicAck 来确认消息已经消费;
  4. 如果有异常,使用 channel.basicNack 把消费失败的消息重新放入到队列中去。
/** * 消息消费端 * @date 2020-05-21 18:00 **/ @Component public class ConsumerController { @RabbitListener(queues = {QueueConstants.QUEUE\_NAME}) public void handler(Message message, Channel channel) throws IOException { System.out.println("收到消息:" + message.toString()); MessageHeaders headers = message.getHeaders(); Long tag = (Long) headers.get(AmqpHeaders.DELIVERY\_TAG); try { // 手动确认消息已消费 channel.basicAck(tag,false); } catch (IOException e) { // 把消费失败的消息重新放入到队列 channel.basicNack(tag, false, true); e.printStackTrace(); } } }

消费消息

  1. 重启消费端服务,停在断点处:

06_RabbitMQ消息确认机制.png

  1. 查看 RabbitMQ 管理界面会发现 队列的 Ready 和 Total 仍然是 1,说明我们的手动应答设置生效:

  2. 点击 Debug 的绿色箭头继续像下执行,查看 RabbitMQ 管理界面:

07_RabbitMQ消息确认机制.png

  1. 几秒后再次查看 RabbitMQ 管理界面:

08_RabbitMQ消息确认机制.png

会发现:Ready 变为 0,Unacked 为 0,Total 为 0。 说明该条数据已经被成功消费。