在 RabbitMQ 中,我们要确保消息不丢失,主要可以从以下几方面入手,即:交换机持久化、队列持久化、消息持久化、生产者开启发送确认、消费者开启消息确认、使用 RabbitMQ 的镜集群像模式进行部署以及消息补偿机制。
交换机持久化是指将交换机的属性数据存储在磁盘上,当 MQ 的服务器发生意外或关闭之后,在重启 RabbitMQ 时不需要重新手动或执行代码去创建交换机了,交换机会自动被创建,相当于一直存在。
在创建交换机的时候将 durable 参数设置为 true 即可。 比如,我声明一个类型为 direct 的交换机:
/**
* 设置交换机,类型为 direct
* @return DirectExchange
*/
@Bean
DirectExchange myExchange() {
return new DirectExchange(QueueConstants.QUEUE_EXCHANGE_NAME, true, false);
}
通过将 durable 参数设置为 true,则交换机的元数据会被存储在磁盘上,对于一个长期使用的交换机来说,建议将其设置为持久化。
如果不将队列设置为持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,数据也会丢失。队列都没有了,消息也找不到地方存储了。
同样,在创建队列的时候将 durable 参数设置为 true 即可:
/**
* 创建队列
* @return Queue
*/
@Bean
Queue myQueue() {
return new Queue(QueueConstants.QUEUE_NAME, true);
}
说明:
RabbitMQ 的消息是依附于队列存在的,所以要想消息持久化,那么前提是队列也必须设置持久化。
在创建消息的时候,添加一个持久化消息的属性(将 delivery_mode 设置为 2)。在 SpringBoot 中使用 rabbitTemplate 发送的消息默认就是持久化的。
开启消息发送确认,通过 ConfirmCallback 接口 和 ReturnCallback 接口来保障。
消费者收到消息还没来得及处理服务就宕机了。
消费端开启消息确认(ACK),将消息设置为手动确认:
# 开启 ACK(消费者接收到消息时手动确认) spring.rabbitmq.listener.simple.acknowledge-mode=manual
这样虽然服务宕机,但是在重启之后,消费者仍然会消费到该条数据。
持久化的消息成功存入 RabbitMQ 之后,如果在存入磁盘的这个过程中 RabbitMQ 服务节点宕机、异常重启等,消息还没来得及存入磁盘。
可以使用 RabbitMQ 的镜集群像模式进行部署,如果主节点在这个特殊的时间段内挂掉了,会自动切换到从节点,这样就保证了高可用性,除非整个集群都挂掉。
比如设置为持久化的消息,在保存到磁盘的过程中,当前队列节点挂了,存储节点的磁盘也挂了。
设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧会存在;
仅设置队列持久化,重启之后消息会丢失;
仅设置消息持久化,重启之后队列会消失,因此消息也就丢失了,所以只设置消息持久化而不设置队列持久化是没有意义的;
将所有的消息都设置为持久化(写入磁盘的速度比写入内存的速度慢的多),可能会影响 RabbitMQ 的性能,对于可靠性不是那么高的消息可以不采用持久化来提高 RabbitMQ 的吞吐量。