RabbitMQ确保消息不丢失

RabbitMQ 中,我们要确保消息不丢失,主要可以从以下几方面入手,即:交换机持久化、队列持久化、消息持久化、生产者开启发送确认、消费者开启消息确认、使用 RabbitMQ 的镜集群像模式进行部署以及消息补偿机制。

交换机持久化

什么是交换机持久化

交换机持久化是指将交换机的属性数据存储在磁盘上,当 MQ 的服务器发生意外或关闭之后,在重启 RabbitMQ 时不需要重新手动或执行代码去创建交换机了,交换机会自动被创建,相当于一直存在。

怎么将交换机持久化

在创建交换机的时候将 durable 参数设置为 true 即可。 比如,我声明一个类型为 direct 的交换机:

/** * 设置交换机,类型为 direct * @return DirectExchange */ @Bean DirectExchange myExchange() { return new DirectExchange(QueueConstants.QUEUE_EXCHANGE_NAME, true, false); }

通过将 durable 参数设置为 true,则交换机的元数据会被存储在磁盘上,对于一个长期使用的交换机来说,建议将其设置为持久化。

队列持久化

如果不将队列设置为持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,数据也会丢失。队列都没有了,消息也找不到地方存储了。

怎么将队列持久化

同样,在创建队列的时候将 durable 参数设置为 true 即可:

/** * 创建队列 * @return Queue */ @Bean Queue myQueue() { return new Queue(QueueConstants.QUEUE_NAME, true); }

说明:

  • durable 参数默认为 false,只针对当前连接有效,当 RabbitMQ 服务重启后数据会丢失;
  • 队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失;
  • 如果要确保消息不会丢失,就需要设置消息的持久化。

消息持久化

RabbitMQ 的消息是依附于队列存在的,所以要想消息持久化,那么前提是队列也必须设置持久化。

怎么将消息持久化

在创建消息的时候,添加一个持久化消息的属性(将 delivery_mode 设置为 2)。在 SpringBoot 中使用 rabbitTemplate 发送的消息默认就是持久化的。

生产者开启发送确认

场景

  1. 不知道生产者发送的消息究竟是否已经到达 RabbitMQ Server;
  2. 不知道生产者发送的消息是否已经成功的分配到队列中去。

解决办法

开启消息发送确认,通过 ConfirmCallback 接口 和 ReturnCallback 接口来保障。

消费者开启消息确认(ACK)

场景

消费者收到消息还没来得及处理服务就宕机了。

解决办法

消费端开启消息确认(ACK),将消息设置为手动确认:

# 开启 ACK(消费者接收到消息时手动确认) spring.rabbitmq.listener.simple.acknowledge-mode=manual

这样虽然服务宕机,但是在重启之后,消费者仍然会消费到该条数据。

使用RabbitMQ的镜集群像模式进行部署

场景

持久化的消息成功存入 RabbitMQ 之后,如果在存入磁盘的这个过程中 RabbitMQ 服务节点宕机、异常重启等,消息还没来得及存入磁盘。

解决办法

可以使用 RabbitMQ 的镜集群像模式进行部署,如果主节点在这个特殊的时间段内挂掉了,会自动切换到从节点,这样就保证了高可用性,除非整个集群都挂掉。

消息补偿机制

场景

比如设置为持久化的消息,在保存到磁盘的过程中,当前队列节点挂了,存储节点的磁盘也挂了。

解决办法

  1. 由于系统功能复杂,加上网络不确定性太多,所以消息补偿机制需要建立在系统记录了详细的日志,比如消息发送日志,消息接收日志,存入数据库日志等的前提下;
  2. 通过手动触发或者定时扫描,从这些日志中提取出符合消息补偿要求的数据,进行消息补偿。

说明

设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧会存在;

仅设置队列持久化,重启之后消息会丢失;

仅设置消息持久化,重启之后队列会消失,因此消息也就丢失了,所以只设置消息持久化而不设置队列持久化是没有意义的;

将所有的消息都设置为持久化(写入磁盘的速度比写入内存的速度慢的多),可能会影响 RabbitMQ 的性能,对于可靠性不是那么高的消息可以不采用持久化来提高 RabbitMQ 的吞吐量。