这次,我们将使用 Go 语言 操作 RabbitMQ 并且使用工作队列模式,我们将创建一个工作队列,该队列将用于在多个工作者之间分配耗时的任务。
工作队列(又称任务队列)的主要思想是避免立即执行某些资源密集型任务并且不得不等待这些任务完成。相反,我们安排任务异步地同时或在当前任务之后完成。我们将任务封装为消息并将其发送到队列,在后台运行的工作进程将取出消息并最终执行任务。当你运行多个工作进程时,任务将在他们之间共享。
这个概念在 Web 应用中特别有用,因为在 Web 应用中不可能在较短的 HTTP 请求窗口内处理复杂的任务,(译注:例如注册时发送邮件或短信验证码等场景)。工作队列模式如下图所示:
现在,我们将发送代表复杂任务的字符串。我们没有实际的任务,例如调整图像大小或渲染 pdf 文件,所以我们通过借助 time.Sleep 函数模拟一些比较耗时的任务。我们会将一些包含 .
的字符串封装为消息发送到队列中,其中每有一个 .
就表示需要耗费 1 秒钟的工作,例如,hello… 表示一个将花费三秒钟的假任务。
我们将稍微修改上一个示例中的 send.go 代码,以允许从命令行发送任意消息。该程序会将任务安排到我们的工作队列中,因此我们将其命名为 new_task.go:
body := bodyFrom(os.Args) // 从参数中获取要发送的消息正文
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
下面是 bodyFrom 函数:
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
我们以前的 receive.go 程序也需要进行一些更改:它需要为消息正文中出现的每个 .
伪造一秒钟的工作。它将从队列中弹出消息并执行任务,因此我们将其称为 worker.go:
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte(".")) // 数一下有几个.
t := time.Duration(dot_count)
time.Sleep(t * time.Second) // 模拟耗时的任务
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
请注意,我们的假任务模拟执行时间。然后,我们就可以打开两个终端,分别执行 new_task.go 和 worker.go 了。
循环调度
使用任务队列的优点之一是能够轻松并行化工作。如果我们的工作正在积压,我们可以增加更多的工作者,这样就可以轻松扩展。
首先,让我们尝试同时运行两个 worker.go 脚本。它们都将从队列中获取消息,但是究竟是怎样呢?让我们来看看。你需要打开三个控制台。其中两个将运行 worker.go 脚本。这些控制台将成为我们的两个消费者,即 C1 和 C2。
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
在第三个控制台中,我们将发布新任务。启动消费者之后,你可以发布一些消息:
# shell 3
go run new_task.go msg1.
go run new_task.go msg2..
go run new_task.go msg3...
go run new_task.go msg4....
go run new_task.go msg5.....
然后我们在 shell1 和 shell2 两个窗口看到如下输出结果了:
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg1.
# => [x] Received a message: msg3...
# => [x] Received a message: msg5.....
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg2..
# => [x] Received a message: msg4....
默认情况下,RabbitMQ 将按顺序将每个消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。使用三个或者更多 worker 试一下。
消息确认
work 完成任务可能需要耗费几秒钟,如果一个 worker 在任务执行过程中宕机了该怎么办呢?我们当前的代码中,RabbitMQ 一旦向消费者传递了一条消息,便立即将其标记为删除。在这种情况下,如果你终止一个 worker 那么你就可能会丢失这个任务,我们还将丢失所有已经交付给这个 worker 的尚未处理的消息。
我们不想丢失任何任务,如果一个 worker 意外宕机了,那么我们希望将任务交付给其他 worker 来处理。
为了确保消息永不丢失,RabbitMQ 支持消息确认。消费者发送回一个确认(acknowledgement),以告知 RabbitMQ 已经接收,处理了特定的消息,并且 RabbitMQ 可以自由删除它。
如果使用者在不发送确认的情况下死亡(其通道已关闭,连接已关闭或 TCP 连接丢失),RabbitMQ 将了解消息未完全处理,并将对其重新排队。如果同时有其他消费者在线,它将很快将其重新分发给另一个消费者。这样,您可以确保即使工作者偶尔死亡也不会丢失任何消息。
没有任何消息超时;RabbitMQ 将在消费者死亡时重新传递消息。即使处理一条消息需要很长时间也没关系。
在本教程中,我们将使用手动消息确认,方法是为 “auto-ack” 参数传递一个 false,然后在完成任务后,使用 d.Ack(false) 从 worker 发送一个正确的确认(这将确认一次传递)。
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // 注意这里传false,关闭自动消息确认
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
fmt.Printf("ch.Consume failed, err:%v\n", err)
return
}
// 开启循环不断地消费消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false) // 手动传递消息确认
}
}()
使用这段代码,我们可以确保即使你在处理消息时使用 CTRL+C 杀死一个 worker,也不会丢失任何内容。在 worker 死后不久,所有未确认的消息都将被重新发送。
消息确认必须在接收消息的同一通道(Channel)上发送。尝试使用不同的通道(Channel)进行消息确认将导致通道级协议异常。
忘记确认
忘记确认是一个常见的错误。这是一个简单的错误,但后果是严重的。当你的客户机退出时,消息将被重新传递(这看起来像随机重新传递),但是 RabbitMQ 将消耗越来越多的内存,因为它无法释放任何未确认的消息。
为了调试这种错误,可以使用 rabbitmqctl 打印 messages_unacknowledged 字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在 Windows 平台,去掉 sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久化
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果 RabbitMQ 服务器停止运行,我们的任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。
首先,我们需要确保队列能够在 RabbitMQ 节点重新启动后继续运行。为此,我们需要声明它是持久的:
q, err := ch.QueueDeclare(
"hello", // name
true, // 声明为持久队列
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
虽然这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为 hello 的队列,它不是持久的。RabbitMQ 不允许你使用不同的参数重新定义现有队列,并将向任何尝试重新定义的程序返回错误。但是有一个快速的解决方法——让我们声明一个具有不同名称的队列,例如 task_queue:
q, err := ch.QueueDeclare(
"task_queue", // name
true, // 声明为持久队列
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
这种持久的选项更改需要同时应用于生产者代码和消费者代码。
在这一点上,我们确信即使 RabbitMQ 重新启动,任务队列队列也不会丢失。现在我们需要将消息标记为持久的——通过使用 amqp.Publishing 中的持久性选项 amqp.Persistent。
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // 立即
false, // 强制
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久(交付模式:瞬态/持久)
ContentType: "text/plain",
Body: []byte(body),
})
有关消息持久性的说明:将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘上,但是 RabbitMQ 接受了一条消息并且还没有保存它时,仍然有一个很短的时间窗口。
而且,RabbitMQ 并不是对每个消息都执行 fsync(2),它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但是对于我们的简单任务队列来说已经足够了。如果您需要更强有力的担保,那么您可以使用 publisher confirms。
公平分发
你可能已经注意到调度仍然不能完全按照我们的要求工作。例如,在一个有两个 worker 的情况下,当所有的奇数消息都是重消息而偶数消息都是轻消息时,一个 worker 将持续忙碌,而另一个 worker 几乎不做任何工作。嗯,RabbitMQ 对此一无所知,仍然会均匀地发送消息。
这是因为 RabbitMQ 只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。只是盲目地向消费者发送信息。
为了避免这种情况,我们可以将预取计数设置为 1。这告诉 RabbitMQ 不要一次向一个 worker 发出多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向 worker 发送新消息。相反,它将把它发送给下一个不忙的 worker。
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
关于队列大小的说明:如果所有的 worker 都很忙,你的 queue 随时可能会满。你会想继续关注这一点,也许需要增加更多的 worker,或者有一些其他的策略。
完整代码
我们的 new_task.go 的最终代码代入如下:
package main
import (
"fmt"
"github.com/streadway/amqp"
"os"
"strings"
"time"
)
const(
addr = "amqp://guest:guest@localhost:5672/"
)
func main(){
fmt.Println("haicoder(www.haicoder.net)")
var(
conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
err error
)
//连接MQServer
if conn, err = amqp.Dial(addr); err != nil{
fmt.Println("Connect RabbitMQ Err =", err, "Addr =", addr)
return
}
//需要关闭
defer conn.Close()
//创建一个Channel,所有的连接都是通过Channel管理的
if channel, err = conn.Channel(); err != nil{
fmt.Println("Create Channel Err =", err)
return
}
defer channel.Close()
//创建队列
if queue, err = channel.QueueDeclare(
"haicoder_workqueue", //队列名
true, //是否持久化
false, // delete when unused
false, //独占的
false,
nil); err != nil{
fmt.Println("QueueDeclare Err =", err)
return
}
//发送数据
for i := 0; i < 100; i++{
msg := fmt.Sprintf("Hello... %d", i+1)
if err = channel.Publish("", queue.Name, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent, //持久
ContentType: "text/plain",
Body: []byte(msg),
}); err != nil{
fmt.Println("Publish Err =", err)
return
}
fmt.Println("Send msg ok, msg =", msg)
time.Sleep(1*time.Second)
}
}
// bodyFrom 从命令行获取将要发送的消息内容
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
work.go 的代码如下:
package main
import (
"bytes"
"fmt"
"github.com/streadway/amqp"
"time"
)
const(
serverAddr = "amqp://guest:guest@localhost:5672/"
)
func main(){
fmt.Println("haicoder(www.haicoder.net)")
var(
conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
msgs <-chan amqp.Delivery
err error
)
//连接MQServer
if conn, err = amqp.Dial(serverAddr); err != nil{
fmt.Println("Connect RabbitMQ Err =", err, "Addr =", serverAddr)
return
}
//需要关闭
defer conn.Close()
//创建一个Channel,所有的连接都是通过Channel管理的
if channel, err = conn.Channel(); err != nil{
fmt.Println("Create Channel Err =", err)
return
}
defer channel.Close()
//创建队列
if queue, err = channel.QueueDeclare(
"haicoder_workqueue", //队列名
true, //持久的
false, // delete when unused
false, //独占的
false,
nil); err != nil{
fmt.Println("QueueDeclare Err =", err)
return
}
//每次只获取一条消息
err = channel.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
//读取数据
if msgs, err = channel.Consume(
queue.Name, // queue
"", // consumer
false, // 注意这里传false,关闭自动消息确认
false, // exclusive
false, // no-local
false, // no-wait
nil); err != nil{
fmt.Println("Consume Err =", err)
return
}
go func(){
for msg := range msgs{
fmt.Printf("Received a message: %s\n", msg.Body)
dotCount := bytes.Count(msg.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
fmt.Printf("Done")
_ = msg.Ack(false) // 手动传递消息确认
}
}()
time.Sleep(100*time.Second)
}
现在,我们启动 new_task.go 运行后,结果如下:
接着,我们启动接受者,启动后,第一个接受者如下图所示:
我们再次启动第二个接受者,如下图所示:
我们可以看到,两个消费者是轮流接受消息的。如果我们在运行过程中,关闭一个消费者,那么我们就会看到剩下的一个消费者会接受所有的消息。
现在,我们注释掉其中一个消费者的 Ack 代码,即 msg.Ack(false)
,此时被注释掉 Ack 的那个消费者运行如下图:
我们可以看到,此时该消费者消费了一条消息之后,不再继续消费了,这是因为,我们设置了每次只接受一条消息,并且我们手动关闭了 Ack 又没有发送 Ack,因此,此时消费者被认为还没有处理完消息,因此不会再接收到新的消息。
同时,被消费而忘记发送 Ack 的那条消息,等该消费者退出后,会再次被另一个消费者重新消费。