RabbitMQ死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机,因为其他 MQ 产品中没有交换机的概念),当消息成为 Dead message 后,可以被重新发送到另一个交换机,这个交换机就是 DLX。

比如消息队列的消息过期,如果绑定了死信交换器,那么该消息将发送给死信交换机。

11_RabbitMQ死信队列.png

消息变成死信

消息变成死信有以下几种情况:

  • 消息被拒绝(basic.reject / basic.nack),并且 requeue = false
  • 消息 TTL 过期
  • 队列达到最大长度

死信处理过程

  • DLX 也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
  • 当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange 上去,进而被路由到另一个队列。
  • 可以监听这个队列中的消息做相应的处理。

死信队列设置

  1. 首先需要设置死信队列的 exchange 和 queue,然后进行绑定:

    Exchange: dlx.exchange Queue: dlx.queue RoutingKey: # #表示只要有消息到达了Exchange,那么都会路由到这个queue上
  2. 然后需要有一个监听,去监听这个队列进行处理。

  3. 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可:

    arguments.put(" x-dead-letter-exchange""dlx.exchange");

这样消息在过期、requeue、 队列在达到最大长度时,消息就可以直接路由到死信队列!

死信队列演示

生产端

public class Producer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.43.157"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 获取Connection Connection connection = connectionFactory.newConnection(); //3 通过Connection创建一个新的Channel Channel channel = connection.createChannel(); String exchange = "test_dlx_exchange"; String routingKey = "dlx.save"; String msg = "Hello RabbitMQ DLX Message"; AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") .build(); //发送消息 channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); } }

自定义消费者

public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }

消费端

  • 声明正常处理消息的交换机、队列及绑定规则。
  • 在正常交换机上指定死信发送的 Exchange。
  • 声明死信交换机、队列及绑定规则。
  • 监听死信队列,进行后续处理,这里省略
public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.43.157"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 声明一个普通的交换机 和 队列 以及路由 String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; String queueName = "test_dlx_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); //指定死信发送的Exchange Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", "dlx.exchange"); //这个agruments属性,要设置到声明队列上 channel.queueDeclare(queueName, true, false, false, agruments); channel.queueBind(queueName, exchangeName, routingKey); //要进行死信队列的声明 channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare("dlx.queue", true, false, false, null); channel.queueBind("dlx.queue", "dlx.exchange", "#"); channel.basicConsume(queueName, true, new MyConsumer(channel)); } }

运行说明

启动消费端,此时查看管控台,新增了两个 Exchange,两个 Queue。在 test_dlx_queue上我们设置了 DLX,也就代表死信消息会发送到指定的 Exchange 上,最终其实会路由到 dlx.queue 上。

12_RabbitMQ死信队列.png

此时关闭消费端,然后启动生产端,查看管控台队列的消息情况,test_dlx_queue 的值为 1,而 dlx_queue 的值为 0。10s 后的队列结果如图,由于生产端发送消息时指定了消息的过期时间为 10s,而此时没有消费端进行消费,消息便被路由到死信队列中。

13_RabbitMQ死信队列.png

实际环境我们还需要对死信队列进行一个监听和处理,当然具体的处理逻辑和业务相关,这里只是简单演示死信队列是否生效。