通配符模式

模式说明

Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以 ”.” 分割,例如: item.insert。通配符规则:

规则 模式
# 匹配一个或多个词。
* 匹配不多不少恰好 1 个词。

举例:

规则 模式
item.# 能够匹配 item.insert.abc 或者 item.insert。
item.* 只能匹配 item.insert。

13_通配符模式.png

14_通配符模式.png

说明:

  1. 红色 Queue:绑定的是 usa.#,因此凡是以 usa.开头的 routing key 都会被匹配到。
  2. 黄色 Queue:绑定的是 #.news,因此凡是以 .news结尾的 routing key 都会被匹配。

代码实现

生产者

所有 error 级别的日志存入数据库,所有 order 系统的日志存入数据库:

package com.haicoder.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 发送消息 */ public class Producer_Topics { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("106.15.72.229");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/haicoder");//虚拟机 默认值/ factory.setUsername("heima");//用户名 默认 guest factory.setPassword("heima");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) 参数: 1. exchange:交换机名称 2. type:交换机类型 DIRECT("direct"),:定向 FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。 TOPIC("topic"),通配符的方式 HEADERS("headers");参数匹配 3. durable:是否持久化 4. autoDelete:自动删除 5. internal:内部使用。 一般false 6. arguments:参数 */ String exchangeName = "test_topic"; //5. 创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null); //6. 创建队列 String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null); //7. 绑定队列和交换机 /* queueBind(String queue, String exchange, String routingKey) 参数: 1. queue:队列名称 2. exchange:交换机名称 3. routingKey:路由键,绑定规则 如果交换机的类型为fanout ,routingKey设置为"" */ // routing key 系统的名称.日志的级别。 //=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库 channel.queueBind(queue1Name, exchangeName, "#.error"); channel.queueBind(queue1Name, exchangeName, "order.*"); channel.queueBind(queue2Name, exchangeName, "*.*"); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; //8. 发送消息 channel.basicPublish(exchangeName, "goods.error", null, body.getBytes()); //9. 释放资源 channel.close(); connection.close(); } }

消费者1

package com.haicoder.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_Topic1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("106.15.72.229");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/haicoder");//虚拟机 默认值/ factory.setUsername("heima");//用户名 默认 guest factory.setPassword("heima");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body)); System.out.println("将日志信息存入数据库......."); } }; channel.basicConsume(queue1Name,true,consumer); //关闭资源?不要 } }

消费者2

package com.haicoder.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_Topic2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("106.15.72.229");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/haicoder");//虚拟机 默认值/ factory.setUsername("heima");//用户名 默认 guest factory.setPassword("heima");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /* System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body)); System.out.println("将日志信息打印控制台......."); } }; channel.basicConsume(queue2Name,true,consumer); //关闭资源?不要 } }

测试

启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应 routing key 对应队列的消息;到达按照需要接收的效果;并且这些 routing key 可以使用通配符。

在执行完测试代码后,其实到 RabbitMQ 的管理后台找到 Exchanges 选项卡,点击 topic_exchange 的交换机,可以查看到如下的绑定:

15_通配符模式.png

总结

Topic 主题模式可以实现 Publish/Subscribe 发布与订阅模式 和 Routing 路由模式 的功能;只是 Topic 在配置 routing key 的时候可以使用通配符,显得更加灵活。