RabbitMQ 的消息确认机制如下:
从图中我们可以看出:
这两个机制都是收到 TCP 协议的启发,它们对于数据安全至关重要。下面就分别从生产者、消费者两个方面结合实例来认识消息确认机制。
备注:
我们先来看一下 RabbitMQ 消息投递和接收的一个完整链路如下:
消息投递的链路用文字表示:
producer->rabbitmq broker cluster->exchange->queue->consumer
由于:
在编码时我们可以用两个选项用来控制消息投递的可靠性:
confirmCallback
;returnCallback
我们可以利用这两个 callback 接口来控制消息的一致性和处理一部分的异常情况。
server.port=10420
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回(消息有没有找到合适的队列)
spring.rabbitmq.publisher-returns=true
在 RabbitConfig 配置类里,定义 RabbitTemplate Bean,使用 callback 接口:
/**
* RabbitMQ配置
*
* @date 2020-05-17 17:20
**/
@Slf4j
@Configuration
public class RabbitConfig {
@Autowired
CachingConnectionFactory cachingConnectionFactory;
@Bean
RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
// 消息只要被 rabbitmq broker 接收到就会执行 confirmCallback
// 如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback
// 被broker接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
String msgId = data.getId();
if (ack) {
log.info(msgId + ": 消息发送成功");
} else {
log.info(msgId + ": 消息发送失败");
}
});
// confirm 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。
// 在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式
// 这样如果未能投递到目标 queue 里将调用 returnCallback,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info(MessageFormat.format("消息发送失败,ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode,
replyText, exchange, routingKey));
// TODO 做消息发送失败时的处理逻辑
});
return rabbitTemplate;
}
/**
* 声明队列
* 参数说明:
* durable 是否持久化,默认是false(持久化队列则数据会被存储在磁盘上,当消息代理重启时数据不会丢失;暂存队列只对当前连接有效)
* exclusive 默认是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
* autoDelete 默认是false,是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
* 一般设置一下队列的持久化就好,其余两个就是默认false
*
* @return Queue
**/
@Bean
Queue myQueue() {
return new Queue(QueueConstants.QUEUE\_NAME, true);
}
// 设置交换机,类型为 direct
@Bean
DirectExchange myExchange() {
return new DirectExchange(QueueConstants.QUEUE\_EXCHANGE\_NAME, true, false);
}
// 绑定:将交换机和队列绑定,并设置路由匹配键
@Bean
Binding queueBinding() {
return BindingBuilder.bind(myQueue()).to(myExchange()).with(QueueConstants.QUEUE\_ROUTING\_KEY\_NAME);
}
在 ProducerController 里,主要干了以下几件事:
sendDirectMessage
,通过请求该接口,可以实现生产者发送消息的功能;CorrelationData
,该对象内部只有一个 id 属性,用来表示消息的唯一性;/**
* 消息生产端
* @date 2020-05-17 18:30
**/
@RestController
public class ProducerController {
/\*\*
\* RabbitTemplate提供了发送/接收消息的方法
\*/
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 生产消息
*
* @Date 上午12:12 2020/5/20
* @param test
* @param test2
* @return java.lang.String
**/
@GetMapping("/sendDirectMessage")
public String sendDirectMessage(String test,Integer test2) {
// 生成消息的唯一id
String msgId = UUID.randomUUID().toString();
String messageData = "hello,this is rabbitmq demo message";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
// 定义要发送的消息对象
Map<String,Object\> messageObj = new HashMap<>();
messageObj.put("msgId",msgId);
messageObj.put("messageData",messageData);
messageObj.put("createTime",createTime);
rabbitTemplate.convertAndSend(QueueConstants.QUEUE\_EXCHANGE\_NAME,QueueConstants.QUEUE\_ROUTING\_KEY\_NAME,
messageObj,new CorrelationData(msgId));
return "message send ok";
}
}
保存代码,在 RabbitConfig 里的 setConfirmCallback
方法内部打上断点;
重启服务后,使用 PostMan 请求生产消息接口:[http://你的域名](http://xn--6qqv7i2xdt95b/
):10420/sendDirectMessage,生产消息,并将消息发送给 RabbitMQ:
也就说明我们生产的消息已经成功的到达了 RabbitMQ Server 里。
至此,生产者消息确认结束,且通过运行的实例,我们能够得出结论:本次生产的消息已经正确无误的投递到了队列中去。
消费者确认指的就是 RabbitMQ 需要确认消息到底有没有被收到,来确定要不要将该条消息从队列中删除掉。这就需要消费者来告诉 RabbitMQ,有以下两种方式:
消费者在消费消息的时候,如果设定应答模式为自动,则消费者收到消息后,消息就会立即被 RabbitMQ 从队列中删除掉。因此,在实际开发者,我们基本上是将消费应答模式设置为手动确认更为妥当一些。
消费者在收到消息后:
server.port=10421
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启 ACK(消费者接收到消息时手动确认)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
ConsumerController 里主要干了以下几件事儿:
@RabbitListener
来监听队列;deliveryTag
;channel.basicAck
来确认消息已经消费;channel.basicNack
把消费失败的消息重新放入到队列中去。/**
* 消息消费端
* @date 2020-05-21 18:00
**/
@Component
public class ConsumerController {
@RabbitListener(queues = {QueueConstants.QUEUE\_NAME})
public void handler(Message message, Channel channel) throws IOException {
System.out.println("收到消息:" + message.toString());
MessageHeaders headers = message.getHeaders();
Long tag = (Long) headers.get(AmqpHeaders.DELIVERY\_TAG);
try {
// 手动确认消息已消费
channel.basicAck(tag,false);
} catch (IOException e) {
// 把消费失败的消息重新放入到队列
channel.basicNack(tag, false, true);
e.printStackTrace();
}
}
}
查看 RabbitMQ 管理界面会发现 队列的 Ready 和 Total 仍然是 1,说明我们的手动应答设置生效:
点击 Debug 的绿色箭头继续像下执行,查看 RabbitMQ 管理界面:
会发现:Ready 变为 0,Unacked 为 0,Total 为 0。 说明该条数据已经被成功消费。