RabbitMQ限流

消费者流量控制

在实际项目中使用 RabbitMQ 的时候,由于消费者自身处理消息的效率并不高,如果说这个时候生产者还是不断的在生产消息,一直推送消息到消费者,那么很容易引起消费者的宕机。

rabbitmq 提供了一个限流机制,用于限制一次性推送到消费者客户端的消息数量,让消费者都处理完了消息之后,生产者再推送新的消息过来。

限流的原因

出于以下两个方面,所以需要对消费者进行一些限流策略

  1. 假设某个时候,在 rabbitmq 队列中已经堆积了非常非常多的消息,这个时候,如果有一个消费者启动,那么大量的消息将会一起推送到这个消费者上面,这种瞬间超大流量,很有可能导致服务器崩溃。
  2. 生产者生产消息的效率比消费者处理消息的效率高很多,两端之间这种效率不平衡性。所以消费端需要做一些限流措施,否则可能导致消费端性能下降,服务器卡顿甚至崩溃等现象。

限流api讲解

RabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。

/** * Request specific "quality of service" settings. * These settings impose limits on the amount of data the server * will deliver to consumers before requiring acknowledgements. * Thus they provide a means of consumer-initiated flow control. * @param prefetchSize maximum amount of content (measured in * octets) that the server will deliver, 0 if unlimited * @param prefetchCount maximum number of messages that the server * will deliver, 0 if unlimited * @param global true if the settings should be applied to the * entire channel rather than each consumer * @throws java.io.IOException if an error is encountered */ void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

其中,参数解释如下:

参数 描述
prefetchSize 0,单条消息大小限制,0 代表不限制。
prefetchCount 一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。
global true、false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别。当我们设置为 false 的时候生效,设置为 true 的时候没有了限流功能,因为 channel 级别尚未实现。

注意:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不研究。特别注意一点,prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。

如何对消费端进行限流

  1. 我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 false:

    channel.basicConsume(queueName, false, consumer);
  2. 我们来设置具体的限流大小以及数量:

    channel.basicQos(0, 15, false);
  3. 在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 true:

    channel.basicAck(envelope.getDeliveryTag(), true);

这是生产端代码,主要的操作集中在消费端:

package com.jvli.project.fighting.fighting_qos; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class QosProducer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); //2. 通过连接工厂来创建连接 Connection connection = factory.newConnection(); //3. 通过 Connection 来创建 Channel Channel channel = connection.createChannel(); //4. 声明 String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String routingKey = "item.add"; //声明交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT); //声明队列;绑定交换机、队列、routing—key channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //5. 发送 String msg = "hello world,keep coding"; for (int i = 0; i < 10; i++) { String tem = msg + "-" + i; channel.basicPublish(exchangeName, routingKey, null, tem.getBytes()); System.out.println("Send message : " + tem); } //6. 关闭连接 channel.close(); connection.close(); } }

这里我们创建一个消费者,通过以下代码来验证限流效果以及 global 参数设置为 true 时不起作用。我们通过 Thread.sleep(5000); 来让 ack 即处理消息的过程慢一些,这样我们就可以从后台管理工具中清晰观察到限流情况。

package com.jvli.project.fighting.fighting_qos; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class QosProducer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); //2. 通过连接工厂来创建连接 Connection connection = factory.newConnection(); //3. 通过 Connection 来创建 Channel Channel channel = connection.createChannel(); //4. 声明 String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String routingKey = "item.add"; //声明交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT); //声明队列;绑定交换机、队列、routing—key channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //5. 发送 String msg = "hello world,keep coding"; for (int i = 0; i < 10; i++) { String tem = msg + "-" + i; channel.basicPublish(exchangeName, routingKey, null, tem.getBytes()); System.out.println("Send message : " + tem); } //6. 关闭连接 channel.close(); connection.close(); } }

如果,当我们将 void basicQos(int prefetchSize, int prefetchCount, boolean global) 中的 global 设置为 true 的时候我们发现并没有了限流的作用。