在实际项目中使用 RabbitMQ 的时候,由于消费者自身处理消息的效率并不高,如果说这个时候生产者还是不断的在生产消息,一直推送消息到消费者,那么很容易引起消费者的宕机。
rabbitmq 提供了一个限流机制,用于限制一次性推送到消费者客户端的消息数量,让消费者都处理完了消息之后,生产者再推送新的消息过来。
出于以下两个方面,所以需要对消费者进行一些限流策略
RabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。
/**
* Request specific "quality of service" settings.
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
其中,参数解释如下:
参数 | 描述 |
---|---|
prefetchSize | 0,单条消息大小限制,0 代表不限制。 |
prefetchCount | 一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。 |
global | true、false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别。当我们设置为 false 的时候生效,设置为 true 的时候没有了限流功能,因为 channel 级别尚未实现。 |
注意:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不研究。特别注意一点,prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。
我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 false:
channel.basicConsume(queueName, false, consumer);
我们来设置具体的限流大小以及数量:
channel.basicQos(0, 15, false);
在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 true:
channel.basicAck(envelope.getDeliveryTag(), true);
这是生产端代码,主要的操作集中在消费端:
package com.jvli.project.fighting.fighting_qos;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class QosProducer {
public static void main(String[] args) throws Exception {
//1. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
//2. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();
//4. 声明
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "item.add";
//声明交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
//声明队列;绑定交换机、队列、routing—key
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5. 发送
String msg = "hello world,keep coding";
for (int i = 0; i < 10; i++) {
String tem = msg + "-" + i;
channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());
System.out.println("Send message : " + tem);
}
//6. 关闭连接
channel.close();
connection.close();
}
}
这里我们创建一个消费者,通过以下代码来验证限流效果以及 global 参数设置为 true 时不起作用。我们通过 Thread.sleep(5000); 来让 ack 即处理消息的过程慢一些,这样我们就可以从后台管理工具中清晰观察到限流情况。
package com.jvli.project.fighting.fighting_qos;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class QosProducer {
public static void main(String[] args) throws Exception {
//1. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
//2. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();
//4. 声明
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "item.add";
//声明交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
//声明队列;绑定交换机、队列、routing—key
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5. 发送
String msg = "hello world,keep coding";
for (int i = 0; i < 10; i++) {
String tem = msg + "-" + i;
channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());
System.out.println("Send message : " + tem);
}
//6. 关闭连接
channel.close();
connection.close();
}
}
如果,当我们将 void basicQos(int prefetchSize, int prefetchCount, boolean global) 中的 global 设置为 true 的时候我们发现并没有了限流的作用。