Golang操作RabbitMQ

Go 语言 中,我们要操作 RabbitMQ,我们可以使用 amqp 库,具体地址如下:

github.com/streadway/amqp

这里,我们使用 GoMod 工程,直接操作。

Go语言操作RabbitMQ程序

生产者

我们实现生产者向 RabbitMQ 中写入一条消息,然后消费者获取该消息这么一个简单的功能,具体流程图如下:

01_Go语言操作RabbitMQ HelloWorld.png

需要操作 RabbitMQ,那么我们首先需要连接 RabbitMQ 的服务器,具体代码如下:

// 尝试连接RabbitMQ,建立连接 // 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") defer conn.Close()

连接抽象了 socket 连接,并为我们处理协议版本协商和认证等。接下来,我们创建一个通道,这是大多数用于完成任务的 API 所在的位置:

// 接下来,我们创建一个通道,大多数API都是用过该通道操作的 ch, err := conn.Channel() defer ch.Close()

要发送,我们必须声明要发送到的队列。然后我们可以将消息发布到队列:

// 声明消息要发送到的队列 q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) body := "Hello World!" // 将消息发布到声明的队列 err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), })

声明队列是幂等的——仅当队列不存在时才创建。消息内容是一个字节数组,因此你可以在此处编码任何内容。

消费者

设置与发布者相同;我们打开一个连接和一个通道,并声明要消耗的队列。请注意,这与 send 发布到的队列匹配。

// 建立连接 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") defer conn.Close() // 获取channel ch, err := conn.Channel() defer ch.Close() // 声明队列 q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments )

请注意,我们也在这里声明队列。因为我们可能在发布者之前启动使用者,所以我们希望在尝试使用队列中的消息之前确保队列存在。

我们将告诉服务器将队列中的消息传递给我们。由于它将异步地向我们发送消息,因此我们将在 goroutine 中从通道(由 amqp::Consume 返回)中读取消息。

// 获取接收消息的Delivery通道 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever

完整代码

生产者代码如下:

package main import ( "fmt" "github.com/streadway/amqp" ) const( addr = "amqp://guest:guest@localhost:5672/" ) func main(){ fmt.Println("haicoder(www.haicoder.net)") var( conn *amqp.Connection channel *amqp.Channel queue amqp.Queue msg = "Hello HaiCoder" err error ) //连接MQServer if conn, err = amqp.Dial(addr); err != nil{ fmt.Println("Connect RabbitMQ Err =", err, "Addr =", addr) return } //需要关闭 defer conn.Close() //创建一个Channel,所有的连接都是通过Channel管理的 if channel, err = conn.Channel(); err != nil{ fmt.Println("Create Channel Err =", err) return } //创建队列 if queue, err = channel.QueueDeclare("haicoder", false, false, false, false, nil); err != nil{ fmt.Println("QueueDeclare Err =", err) return } //发送数据 if err = channel.Publish("", queue.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }); err != nil{ fmt.Println("Publish Err =", err) return } fmt.Println("Send msg ok, msg =", msg) }

消费者代码如下:

package main import ( "fmt" "github.com/streadway/amqp" "time" ) const( serverAddr = "amqp://guest:guest@localhost:5672/" ) func main(){ fmt.Println("haicoder(www.haicoder.net)") var( conn *amqp.Connection channel *amqp.Channel queue amqp.Queue msgs <-chan amqp.Delivery err error ) //连接MQServer if conn, err = amqp.Dial(serverAddr); err != nil{ fmt.Println("Connect RabbitMQ Err =", err, "Addr =", serverAddr) return } //需要关闭 defer conn.Close() //创建一个Channel,所有的连接都是通过Channel管理的 if channel, err = conn.Channel(); err != nil{ fmt.Println("Create Channel Err =", err) return } //创建队列 if queue, err = channel.QueueDeclare("haicoder", false, false, false, false, nil); err != nil{ fmt.Println("QueueDeclare Err =", err) return } //读取数据 if msgs, err = channel.Consume(queue.Name, "", true, false, false, false, nil); err != nil{ fmt.Println("Consume Err =", err) return } go func(){ for msg := range msgs{ fmt.Println("Receive Msg =", string(msg.Body)) } }() time.Sleep(100*time.Second) }

现在,我们首先运行生产者,运行后,结果如下:

02_Go语言操作RabbitMQ HelloWorld.png

接着,我们再次启动消费者,运行后,结果如下:

03_Go语言操作RabbitMQ HelloWorld.png

同时,我们打开网页,如果没有消费的话,可以看到显示了队列以及有一条消息,如下:

04_Go语言操作RabbitMQ HelloWorld.png

参数说明

创建队列的函数 QueueDeclare,相关参数说明如下:

参数 描述
name 队列名,如果为空,则系统自己创建一个名字。
durable 是否是持久队列,如果是,那么重启 MQ 之后,该队列不会被删除,否则,该队列会被删除。
autoDelete 是否自动删除。如果为true,当没有 Consumer 时,会被自动删除掉。
exclusive 是否独占。如果为true,只能有一个消费者监听这队列。
noWait 如果为True,那么其他连接尝试修改该队列,将会触发异常。
args 额外参数。

使用 Channel 的发送消息函数 Publish 的相关参数如下:

参数 描述
exchange 交换机名称。简单模式下交换机会使用默认的 “”。
key 路由名称。
mandatory 如果为 true,那么当没有合适的 RoutingKey 时,将会触发 Channel 的 NotifyReturn。
immediate 如果为 true,那么当没有合适的消费着时,将会触发 Channel 的 NotifyReturn。
msg 具体要发送消息的结构体。

使用 Chanel 消费函数 Consume 的相关参数如下:

参数 描述
queue 队列名称。
consumer 路由名称。
autoAck 是否自动确认。
exclusive 是否独占。
noLocal 未使用的参数。
noWait 如果为 True,那么其他连接尝试修改该队列,将会触发异常。
args 额外参数。