RabbitMQ Confirm

概述

在使用 RabbitMQ 的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达 broker 代理服务器呢?

如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达 broker 的,如果在消息到达 broker 之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

RabbitMQ 为我们提供了两种方式:

  1. 通过 AMQP 事务机制实现,这也是 AMQP 协议层面提供的解决方案;
  2. 通过将 channel 设置成 confirm 模式来实现;

事务机制

这里首先探讨下 RabbitMQ 事务机制。

RabbitMQ 中与事务机制有关的方法有三个:txSelect(), txCommit() 以及 txRollback(), txSelect 用于将当前 channel 设置成 transaction 模式,txCommit 用于提交事务,txRollback 用于回滚事务,在通过 txSelect 开启事务之后,我们便可以发布消息给 broker 代理服务器了,如果 txCommit 提交成功了,则消息一定到达了 broker 了,如果在 txCommit 执行之前 broker 异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过 txRollback 回滚事务了。

关键代码:

channel.txSelect(); channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); channel.txCommit();

其实事务还有如下四个步骤:

  1. client 发送 Tx.Select
  2. broker 发送 Tx.Select-Ok(之后 publish)
  3. client 发送 Tx.Commit
  4. broker 发送 Tx.Commit-Ok

下面我们来看下事务回滚是什么样子的。关键代码如下:

try { channel.txSelect(); channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); int result = 1 / 0; channel.txCommit(); } catch (Exception e) { e.printStackTrace(); channel.txRollback(); }

代码中先是发送了消息至 broker 中但是这时候发生了异常,之后在捕获异常的过程中进行事务回滚。

事务确实能够解决 producer 与 broker 之间消息确认的问题,只有消息成功被 broker 接受,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发,但是使用事务机制的话会降低 RabbitMQ 的性能,那么有没有更好的方法既能保障 producer 知道消息已经正确送到,又能基本上不带来性能上的损失呢?

从 AMQP 协议的层面看是没有更好的方法,但是 RabbitMQ 提供了一个更好的方案,即将 channel 信道设置成 confirm 模式。

Confirm模式

概述

上面我们介绍了 RabbitMQ 可能会遇到的一个问题,即生成者不知道消息是否真正到达 broker,随后通过 AMQP 协议层面为我们提供了事务机制解决了这个问题。

但是采用事务机制实现会降低 RabbitMQ 的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用 Confirm 模式。

producer端confirm模式的实现原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 deliver-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

在 channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被 nack 一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm 又被 nack 。

开启confirm模式的方法

生产者通过调用 channel 的 confirmSelect 方法将 channel 设置为 confirm 模式,如果没有设置 no-wait 标志的话,broker 会返回 confirm.select-ok 表示同意发送者将当前 channel 信道设置为 confirm 模式(从目前 RabbitMQ 最新版本 3.6 来看,如果调用了 channel.confirmSelect 方法,默认情况下是直接将 no-wait 设置成 false 的,也就是默认情况下 broker 是必须回传 confirm.select-ok 的)。

09_RabbitMQ Confirm.png

已经在 transaction 事务模式的 channel 是不能再设置成 confirm 模式的,即这两种模式是不能共存的。

编程模式

对于固定消息体大小和线程数,如果消息持久化,生产者 confirm(或者采用事务机制),消费者 ack 那么对性能有很大的影响。

消息持久化的优化没有太好方法,用更好的物理存储(SAS, SSD, RAID 卡)总会带来改善。生产者 confirm 这一环节的优化则主要在于客户端程序的优化之上。归纳起来,客户端实现生产者 confirm 有三种编程方式:

  1. 普通 confirm 模式:每发送一条消息后,调用 waitForConfirms() 方法,等待服务器端 confirm。实际上是一种串行 confirm 了。
  2. 批量 confirm 模式:每发送一批消息后,调用 waitForConfirms() 方法,等待服务器端 confirm。
  3. 异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后 Client 端会回调这个方法。

从编程实现的复杂度上来看:

第一种

普通 confirm 模式最简单,publish 一条消息后,等待服务器端 confirm,如果服务端返回 false 或者超时时间内未返回,客户端进行消息重传。关键代码如下:

channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); if(!channel.waitForConfirms()){ System.out.println("send message failed."); }

第二种

批量 confirm 模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来 publish 消息,然后等待服务器端 confirm, 相比普通 confirm 模式,批量极大提升 confirm 效率,但是问题在于一旦出现 confirm 返回 false 或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量 confirm 性能应该是不升反降的。

关键代码:

channel.confirmSelect(); for(int i=0;i<batchCount;i++){ channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); } if(!channel.waitForConfirms()){ System.out.println("send message failed."); }

第三种

异步 confirm 模式的编程实现最复杂,Channel 对象提供的 ConfirmListener() 回调方法只包含 deliveryTag(当前 Chanel 发出的消息序号),我们需要自己为每一个 Channel 维护一个 unconfirm 的消息序号集合,每 publish 一条数据,集合中元素加 1,每回调一次 handleAck 方法,unconfirm 集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个 unconfirm 集合最好采用有序集合 SortedSet 存储结构。实际上,SDK 中的 waitForConfirms() 方法也是通过 SortedSet 维护消息序号的。

关键代码:

SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } }); while (true) { long nextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); confirmSet.add(nextSeqNo); }

SDK 中 waitForConfirms 方法实现:

/** Set of currently unconfirmed messages (i.e. messages that have * not been ack'd or nack'd by the server yet. */ private final SortedSet<Long> unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); public boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException { if (nextPublishSeqNo == 0L) throw new IllegalStateException("Confirms not selected"); long startTime = System.currentTimeMillis(); synchronized (unconfirmedSet) { while (true) { if (getCloseReason() != null) { throw Utility.fixStackTrace(getCloseReason()); } if (unconfirmedSet.isEmpty()) { boolean aux = onlyAcksReceived; onlyAcksReceived = true; return aux; } if (timeout == 0L) { unconfirmedSet.wait(); } else { long elapsed = System.currentTimeMillis() - startTime; if (timeout > elapsed) { unconfirmedSet.wait(timeout - elapsed); } else { throw new TimeoutException(); } } } } }

消息确认(Consumer端)

为了保证消息从队列可靠地到达消费者,RabbitMQ 提供消息确认机制(message acknowledgment)。消费者在声明队列时,可以指定 noAck 参数,当 noAck=false 时,RabbitMQ 会等待消费者显式发回 ack 信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ 会在队列中消息被消费后立即删除它。

采用消息确认机制后,只要令 noAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直持有消息直到消费者显式调用 basicAck 为止。

当 noAck=false 时,对于 RabbitMQ 服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者 ack 信号的消息。如果服务器端一直没有收到消费者的 ack 信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。

RabbitMQ 不会为未 ack 的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。

RabbitMQ 管理平台界面上可以看到当前队列中 Ready 状态和 Unacknowledged 状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到 ack 信号的消息数。也可以通过命令行来查看上述信息:

10_RabbitMQ Confirm.png

代码示例(关闭自动消息确认,进行手动ack):

QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(ConfirmConfig.queueName, false, consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); // do something with msg. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }

broker 将在下面的情况中对消息进行 confirm:

  1. broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会发送 basic.return)
  2. 非持久属性的消息到达了其所应该到达的所有 queue 中(和镜像 queue 中)
  3. 持久消息到达了其所应该到达的所有 queue 中(和镜像中),并被持久化到了磁盘(fsync)
  4. 持久消息从其所在的所有 queue 中被 consume 了(如果必要则会被 ack)

basicRecover:是路由不成功的消息可以使用 recovery 重新发送到队列中。

basicReject:是接收端告诉服务器这个消息我拒绝接收,不处理,可以设置是否放回到队列中还是丢掉,而且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了 basicNack。

basicNack:可以一次拒绝 N 条消息,客户端可以设置 basicNack 方法的 multiple 参数为 true,服务器会拒绝指定了 delivery_tag 的所有未确认的消息(tag 是一个 64 位的 long 值,最大值是 9223372036854775807)。