Golang RabbitMQ PUB SUB

在上一个教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务只传递给一个工作者。在这一部分中,我们将做一些完全不同的事情,我们将向多个消费者传递一条消息。这种模式称为 “发布/订阅”。

为了说明这个模式,我们将构建一个简单的日志系统。它将由两个程序组成,第一个程序将发出日志消息,第二个程序将接收并打印它们。

在我们的日志系统中,接收器程序的每个运行副本都将获得消息。这样我们就可以运行一个接收器并将日志定向到磁盘;同时我们还可以运行另一个接收器并在屏幕上查看日志。

本质上,发布的日志消息将被广播到所有接收器。

交换机

在本教程的前几部分中,我们向队列发送消息和从队列接收消息。现在是时候在 RabbitMQ 中引入完整的消息传递模型了。让我们快速回顾一下之前教程中介绍的内容:

  • 生产者是发送消息的应用程序。
  • 队列是存储消息的缓冲区。
  • 使用者是接收消息的应用程序。

RabbitMQ 中消息传递模型的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者通常甚至不知道消息是否会被传递到任何队列。

相反,生产者只能将消息发送到交换机。交换机是一件非常简单的事情。一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交易所必须确切地知道如何处理它收到的消息。是否应将其附加到特定队列?它应该附加到许多队列中吗?或者它应该被丢弃。其规则由交换类型定义。

11_Go语言操作RabbitMQ发布订阅模式.png

有几种可用的交换类型:直连交换机、主题交换机、头交换机和扇形交换机。我们将关注扇形交换机。让我们创建一个这种类型的交换,并将其称为日志:

err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments )

扇形交换机非常简单,它只是将接收到的所有消息广播到它知道的所有队列。

查看所有交换机

要列出服务器上的交换机,可以运行 rabbitmqctl:

sudo rabbitmqctl list_exchanges

在此列表中,将有一些 amq.* 交换和默认(未命名)交换。它们是默认创建的,但目前不太可能需要使用它们。

默认交换机

在本教程的前几部分中,我们对交换一无所知,但仍然能够向队列发送消息。因为我们使用的是默认的交换,由空字符串("")标识。回想一下我们以前是如何发布消息的:

err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), } )

在这里,我们使用默认的或匿名的交换:消息被路由到由 routing key 参数指定的名称(如果存在)的队列。现在,我们可以发布到命名的交换机:

err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) body := bodyFrom(os.Args) err = ch.Publish( "logs", // exchange "", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })

临时队列

您可能还记得,我们以前使用的队列具有特定的名称。能够命名队列对我们来说至关重要,我们需要将相关的工作者指向同一个队列。当您希望在生产者和消费者之间共享队列时,为队列指定名称非常重要。

但我们的记录员不是这样的。我们希望听到所有日志消息,而不仅仅是其中的一个子集。我们也只对当前的流消息感兴趣,而不是对旧消息感兴趣。要解决这个问题,我们需要两件事。

  1. 首先,当我们连接到 RabbitMQ 时,我们需要一个新的、空的队列。为此,我们可以创建一个随机名称的队列,或者,更好的是,让服务器为我们选择一个随机的队列名称。
  2. 其次,一旦我们断开消费者的连接,队列就应该被自动删除。

在 amqp 客户机中,当我们将队列名称作为空字符串提供时,我们将使用生成的名称创建一个非持久队列:

q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments )

当方法返回时,队列实例包含由 RabbitMQ 生成的随机队列名称。例如,它可能看起来像 amq.gen-JzTY20BRgKO-HjmUJj0wLg。当声明它的连接关闭时,队列将被删除,因为它被声明为独占的。

Bindings

12_Go语言操作RabbitMQ发布订阅模式.png

我们已经创建了扇形交换机和队列。现在我们需要告诉交换机向我们的队列发送消息。交换机和队列之间的关系称为绑定。

err = ch.QueueBind( q.Name, // queue name "", // routing key "logs", // exchange false, nil, )

从现在起,日志交换机将把消息附加到我们的队列中。

查看所有的bindings

你可以使用如下命令查看所有的 binding:

rabbitmqctl list_bindings

案例

13_Go语言操作RabbitMQ发布订阅模式.png

producer 程序发出日志消息,看起来与上一个教程没有太大区别。最重要的变化是,我们现在希望将消息发布到日志交换,而不是无名的日志交换。发送时需要提供 routingKey,但扇出交换忽略其值。

emit_log.go:

package main import ( "fmt" "github.com/streadway/amqp" "time" ) const( addr = "amqp://guest:guest@localhost:5672/" ) func main(){ fmt.Println("haicoder(www.haicoder.net)") var( conn *amqp.Connection channel *amqp.Channel err error ) //连接MQServer if conn, err = amqp.Dial(addr); err != nil{ fmt.Println("Connect RabbitMQ Err =", err, "Addr =", addr) return } //需要关闭 defer conn.Close() //创建一个Channel,所有的连接都是通过Channel管理的 if channel, err = conn.Channel(); err != nil{ fmt.Println("Create Channel Err =", err) return } defer channel.Close() //创建交换机 if err = channel.ExchangeDeclare( "logs", //name "fanout", //type true, false, false, false, nil, ); err != nil{ fmt.Println("ExchangeDeclare Err =", err) return } //直接向交换机发送数据即可 for i := 0; i < 100; i++{ msg := fmt.Sprintf("Hello... %d", i+1) if err = channel.Publish("logs", "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }); err != nil{ fmt.Println("Publish Err =", err) return } fmt.Println("Send msg ok, msg =", msg) time.Sleep(5*time.Second) } }

如你所见,建立连接后我们宣布交换。此步骤是必需的,因为禁止发布到不存在的 exchange。

如果还没有队列绑定到 exchange,消息将丢失,但这对我们来说没关系;如果还没有消费者在侦听,我们可以安全地丢弃消息。

receive_logs.go:

package main import ( "fmt" "github.com/streadway/amqp" "time" ) const( serverAddr = "amqp://guest:guest@localhost:5672/" ) func main(){ fmt.Println("haicoder(www.haicoder.net)") var( conn *amqp.Connection channel *amqp.Channel queue amqp.Queue msgs <-chan amqp.Delivery err error ) //连接MQServer if conn, err = amqp.Dial(serverAddr); err != nil{ fmt.Println("Connect RabbitMQ Err =", err, "Addr =", serverAddr) return } //需要关闭 defer conn.Close() //创建一个Channel,所有的连接都是通过Channel管理的 if channel, err = conn.Channel(); err != nil{ fmt.Println("Create Channel Err =", err) return } defer channel.Close() //创建交换机 if err = channel.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ); err != nil{ fmt.Println("ExchangeDeclare Err =", err) return } //创建队列 if queue, err = channel.QueueDeclare( "haicoder_workqueue", //队列名 true, //持久的 false, // delete when unused false, //独占的 false, nil, ); err != nil{ fmt.Println("QueueDeclare Err =", err) return } //交换机绑定队列 if err = channel.QueueBind(queue.Name, "", "logs", false, nil); err != nil{ fmt.Println("QueueBind Err =", err) return } //读取数据 if msgs, err = channel.Consume( queue.Name, // queue "", // consumer true, // 自动消息确认 false, // exclusive false, // no-local false, // no-wait nil, ); err != nil{ fmt.Println("Consume Err =", err) return } go func(){ for msg := range msgs{ fmt.Printf("Received a message: %s\n", msg.Body) } }() time.Sleep(100*time.Second) }

我们首先启动发送端,运行结果如下:

14_Go语言操作RabbitMQ发布订阅模式.png

我们再次运行接收端,结果如下:

15_Go语言操作RabbitMQ发布订阅模式.png

我们可以看到,我们实现了生产者发送消息,接受者接受消息的功能,同时,我们需要注意,这里我们生产者只需要将消息发送到交换机就行了,接受者也要声明同样的交换机,然后绑定队列并接受。