Golang RabbitMQ Topics

在上一个教程中,我们改进了日志系统。我们没有使用只能进行虚拟广播的扇形交换机,而是使用了直连交换机,并获得了有选择地接收日志的可能性。虽然使用直连交换改进了我们的系统,但它仍然有局限性,它不能基于多个标准进行路由。

在我们的日志系统中,我们可能不仅希望根据严重性订阅日志,还希望根据发出日志的源订阅日志。这将给我们很大的灵活性,我们可能只想监听来自 “cron” 的关键错误,还需要监听来自 “kern” 的所有日志。

要在日志系统中实现这一点,我们需要了解更复杂的主题交换。

Topic交换机

发送到主题交换的消息不能有任意的路由关键字,它必须是由点分隔的单词列表。单词可以是任何东西,但通常它们指定了与消息相关的一些特性。一些有效的路由密钥示例:“stock.usd.nyse", “nyse.vmw”, “quick.orange.rabbit”. 路由 key 中可以有任意多个字,最多 255 字节。

binding key 的格式也必须相同。主题交换机背后的逻辑类似于直接交换机,使用特定 routing key 发送的消息将被传递到使用匹配 binding key 绑定的所有队列。但是,binding key有两种重要的特殊情况:

  • * 只能代替一个单词。
  • # 可以代替零个或多个单词。

用一个例子来解释这一点:

21_Go语言操作RabbitMQ Topic模式.png

在这个例子中,我们将发送所有描述动物的信息。消息将使用由三个单词(两点)组成的路由键发送。路由键中的第一个词将描述速度,第二个是颜色,第三个是种类:“”。

我们创建了三个绑定:Q1 用绑定键 *.orange.* 绑定,Q2 用 “..rabbit” 和 “lazy.#” 绑定。这些绑定可以概括为:

  • Q1 对所有的橙色动物都感兴趣。
  • Q2 想听到关于兔子的一切,以及关于懒惰动物的一切。

routing key 设置为 “quick.orange.rabbit” 的消息将传递到所有队列。routing key 为 “lazy.orange.elephant” 的消息同样会被传递到所有的队列,但 “quick.orange.fox” 将只会发送到第一个队列,“lazy.brown.fox” 将只会发送到第二个队列。

routing key 为 “lazy.pink.rabbit” 的消息将只会发送到第二个队列一次,尽管他可以匹配两次,同样,routing key 为 “quick.brown.fox” 的消息将被丢弃,因为它不满足任何 topic。

如果我们设置了一个四个单词的 routing key,比如 “quick.orange.male.rabbit”,同样会被丢失,因为不匹配任何的队列。

说明

Topic 交换机功能非常强大,当队列用 # 绑定键绑定时,它将接收所有消息,而不管路由键是什么,就像在扇形交换机中一样。当绑定中不使用特殊字符 *# 时,Topic 交换机的行为将与直接交换机一样。

完整代码

我们将在日志系统中使用主题交换。我们首先假设日志的路由键有两个字:“”。

emit_log_topic.go 代码如下:

package main import ( "fmt" "github.com/streadway/amqp" "time" ) const( addr = "amqp://guest:guest@localhost:5672/" Exchange_Name = "logs_topic" ) var Log_Levels = []string{"error", "info", "error", "warning"} func main(){ fmt.Println("haicoder(www.haicoder.net)") var( conn *amqp.Connection channel *amqp.Channel err error ) //连接MQServer if conn, err = amqp.Dial(addr); err != nil{ fmt.Println("Connect RabbitMQ Err =", err, "Addr =", addr) return } //需要关闭 defer conn.Close() //创建一个Channel,所有的连接都是通过Channel管理的 if channel, err = conn.Channel(); err != nil{ fmt.Println("Create Channel Err =", err) return } defer channel.Close() //创建交换机 if err = channel.ExchangeDeclare( Exchange_Name, //name "topic", //type true, false, false, false, nil, ); err != nil{ fmt.Println("ExchangeDeclare Err =", err) return } //直接向交换机发送数据即可 for i := 0; i < 100; i++{ logLevel := Log_Levels[i%4] msg := fmt.Sprintf("Msg Level %s", logLevel) if err = channel.Publish(Exchange_Name, "lazy.brown.fox", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }); err != nil{ fmt.Println("Publish Err =", err) return } fmt.Println("Send msg ok, msg =", msg) time.Sleep(5*time.Second) } }

receive_logs_topic.go 代码如下:

package main import ( "fmt" "github.com/streadway/amqp" "time" ) const( recvAddr = "amqp://guest:guest@localhost:5672/" Receive_Exchange_Name = "logs_topic" ) func main(){ fmt.Println("haicoder(www.haicoder.net)") var( conn *amqp.Connection channel *amqp.Channel queue amqp.Queue msgs <-chan amqp.Delivery err error ) //连接MQServer if conn, err = amqp.Dial(recvAddr); err != nil{ fmt.Println("Connect RabbitMQ Err =", err, "Addr =", recvAddr) return } //需要关闭 defer conn.Close() //创建一个Channel,所有的连接都是通过Channel管理的 if channel, err = conn.Channel(); err != nil{ fmt.Println("Create Channel Err =", err) return } defer channel.Close() //创建交换机 if err = channel.ExchangeDeclare( Receive_Exchange_Name, // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ); err != nil{ fmt.Println("ExchangeDeclare Err =", err) return } //创建队列 if queue, err = channel.QueueDeclare( "", //队列名 false, //持久的 false, // delete when unused true, //独占的 false, nil, ); err != nil{ fmt.Println("QueueDeclare Err =", err) return } //交换机绑定队列 if err = channel.QueueBind(queue.Name, "lazy.#", Receive_Exchange_Name, false, nil); err != nil{ fmt.Println("QueueBind Err =", err) return } //读取数据 if msgs, err = channel.Consume( queue.Name, // queue "", // consumer true, // 自动消息确认 false, // exclusive false, // no-local false, // no-wait nil, ); err != nil{ fmt.Println("Consume Err =", err) return } go func(){ for msg := range msgs{ fmt.Printf("Received a message: %s\n", msg.Body) } }() time.Sleep(100*time.Second) }

receive_logs_topic2.go 代码如下:

package main import ( "fmt" "github.com/streadway/amqp" "time" ) const( recvAddr2 = "amqp://guest:guest@localhost:5672/" Receive_Exchange_Name2 = "logs_topic" ) func main(){ fmt.Println("haicoder(www.haicoder.net)") var( conn *amqp.Connection channel *amqp.Channel queue amqp.Queue msgs <-chan amqp.Delivery err error ) //连接MQServer if conn, err = amqp.Dial(recvAddr2); err != nil{ fmt.Println("Connect RabbitMQ Err =", err, "Addr =", recvAddr2) return } //需要关闭 defer conn.Close() //创建一个Channel,所有的连接都是通过Channel管理的 if channel, err = conn.Channel(); err != nil{ fmt.Println("Create Channel Err =", err) return } defer channel.Close() //创建交换机 if err = channel.ExchangeDeclare( Receive_Exchange_Name2, // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ); err != nil{ fmt.Println("ExchangeDeclare Err =", err) return } //创建队列 if queue, err = channel.QueueDeclare( "", //队列名 false, //持久的 false, // delete when unused true, //独占的 false, nil, ); err != nil{ fmt.Println("QueueDeclare Err =", err) return } //交换机绑定队列 if err = channel.QueueBind(queue.Name, "*.orange.*", Receive_Exchange_Name2, false, nil); err != nil{ fmt.Println("QueueBind Err =", err) return } //读取数据 if msgs, err = channel.Consume( queue.Name, // queue "", // consumer true, // 自动消息确认 false, // exclusive false, // no-local false, // no-wait nil, ); err != nil{ fmt.Println("Consume Err =", err) return } go func(){ for msg := range msgs{ fmt.Printf("Received a message: %s\n", msg.Body) } }() time.Sleep(100*time.Second) }

我们首先运行生产者,运行后,结果如下:

22_Go语言操作RabbitMQ Topic模式.png

我们再次运行消费者1,运行后,结果如下:

23_Go语言操作RabbitMQ Topic模式.png

最后,我们再次运行消费者2,运行后,结果如下:

24_Go语言操作RabbitMQ Topic模式.png

我们可以看到,生产者发送的 binding key 为 lazy.brown.fox,而消费者 1 的 binding key 为 lazy.#,消费者 2 的 binding key 为 *.orange.*,结果,我们生产者发送的消息,只有消费者 1 接受到了,消费者 2 并没有接收到。

现在, 我们修改消费者 2 的 binding key 为 *.brown.*,现在,我们再次运行生产者,运行后,结果如下:

25_Go语言操作RabbitMQ Topic模式.png

再次运行消费者1,运行后,结果如下:

26_Go语言操作RabbitMQ Topic模式.png

最后,我们再次运行消费者2,运行后,结果如下:

27_Go语言操作RabbitMQ Topic模式.png

这次,我们可以看到,消费者1 和消费者 2 都收到了消息。