工作队列
循环分发消息
默认情况下,
RabbitMQ
将按顺序将每条消息发送给下一个消费者,每个Consumer
将收到相同数量的消息,这种分发消息的方式称为循环轮询
我们可以同时开启多个协程去消费消息来测试这一特性,消费消息的我们称之为
worker
写一个通用的接收消息的通用函数
consumeMessage
(worker.go),然后开启3
个协程去消费消息可以看到,每个message被消费后,会等待一段时间,才能去消费下一个消息(消息文案中包含几个
.
,就等待10-.的个数
秒)
for i := 1; i <= 3; i++ {
go consumeMessage(ch, queue.Name, i)
}
func consumeMessage(ch *amqp.Channel, queueName string, serialNumber int) {
// 4、consume message
message, err := ch.Consume(
queueName,
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
shared.FailOnError(err, "consume queue message error")
for msg := range message {
fmt.Printf("【NO %d】 Received a message: %s", serialNumber, msg.Body)
dotCount := bytes.Count(msg.Body, []byte("."))
duration := time.Duration(10 - dotCount)
time.Sleep(duration * time.Second)
fmt.Printf("Done \n")
}
}
而在生产消息时(task.go),一次性发送多条
for i := 0; i < 10; i++ {
messageBody := fmt.Sprintf("我是第[%d]条消息%s \n", i+1, strings.Repeat(".", i+1))
err = ch.PublishWithContext(
withTimeoutCtx,
"", //exchange name
queue.Name, // routing key
false, // mandatory
false, //immediate
amqp.Publishing{
Body: []byte(messageBody),
ContentType: "text/plain",
},
)
shared.FailOnError(err, "Publish message error")
fmt.Printf(" [x] Sent %s\n", messageBody)
}
先运行消费者代码
worker.go
,然后执行生产者代码task.go
,打印如下:
# workQueues/task.go 打印
[x] Sent 我是第[1]条消息.
[x] Sent 我是第[2]条消息..
[x] Sent 我是第[3]条消息...
[x] Sent 我是第[4]条消息....
[x] Sent 我是第[5]条消息.....
[x] Sent 我是第[6]条消息......
[x] Sent 我是第[7]条消息.......
[x] Sent 我是第[8]条消息........
[x] Sent 我是第[9]条消息.........
[x] Sent 我是第[10]条消息..........
# go run workQueues/worker.go 打印
【NO 1】 Received a message: 我是第[3]条消息..., Will wait [7s]
【NO 3】 Received a message: 我是第[1]条消息., Will wait [9s]
【NO 2】 Received a message: 我是第[2]条消息.., Will wait [8s]
【NO 1】Done
【NO 1】 Received a message: 我是第[6]条消息......, Will wait [4s]
【NO 2】Done
【NO 2】 Received a message: 我是第[5]条消息....., Will wait [5s]
【NO 3】Done
【NO 3】 Received a message: 我是第[4]条消息...., Will wait [6s]
【NO 1】Done
【NO 1】 Received a message: 我是第[9]条消息........., Will wait [1s]
【NO 1】Done
【NO 2】Done
【NO 2】 Received a message: 我是第[8]条消息........, Will wait [2s]
【NO 2】Done
【NO 3】Done
【NO 3】 Received a message: 我是第[7]条消息......., Will wait [3s]
【NO 3】Done
【NO 3】 Received a message: 我是第[10]条消息.........., Will wait [0s]
【NO 3】Done
可以看到上面的打印,消息是轮训发放的,就算要等很久才能消费,也会按轮训的顺序分配
序号【
NO1
】消费第3、6、9
条消息序号【
NO2
】消费第2、5、8
条消息序号【
NO3
】消费第1、4、7、10
条消息
但是其实
第4条
消息【NO1】
来消费是比较合适的,因为它等待的时间最短,可以更早地去处理下一条消息要想达到这样的效果,可以通过
ch.Qos
方法,将prefetch
设置为1
,它表示在消费者确认消息之前,RabbitMQ
可以发送给消费者的最大消息数
,也就是告诉RabbutMQ
一次不要给一个工作者多于1条
消息,这通常用于确保消费者逐条处理消息,避免消息堆积。
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
prefetchCount(int)
:
解释: 预取计数(Prefetch Count)定义了在消费者确认消息之前,RabbitMQ 可以发送给消费者的最大消息数。简单来说,就是
消费者在确认之前最多可以收到多少条消息
。
示例中的值: 1 表示消费者在确认一条消息之前最多只能收到一条消息。这通常用于确保消费者逐条处理消息,避免消息堆积。
prefetchSize(int)
:
解释: 预取大小(Prefetch Size)定义了在消费者确认消息之前,RabbitMQ可以发送给消费者的最大字节数。通常情况下,这个值设置为
0
,表示不限制大小
。
示例中的值: 0 表示没有预取大小的限制。
global(bool)
:
解释: 全局标志(Global Flag)决定了
QoS
设置是应用于整个通道还是仅应用于当前消费者。
示例中的值:
false
表示QoS
设置仅应用于当前消费者
如果设置为
true
,则QoS设置应用于整个channel上所有的消费者
调整后输出如下:
【NO 2】 Received a message: 我是第[3]条消息..., Will wait [7s]
【NO 3】 Received a message: 我是第[1]条消息., Will wait [9s]
【NO 1】 Received a message: 我是第[2]条消息.., Will wait [8s]
【NO 2】Done
【NO 2】 Received a message: 我是第[4]条消息...., Will wait [6s]
【NO 1】Done
【NO 1】 Received a message: 我是第[5]条消息....., Will wait [5s]
【NO 3】Done
【NO 3】 Received a message: 我是第[6]条消息......, Will wait [4s]
【NO 3】Done
【NO 2】Done
【NO 1】Done
【NO 2】 Received a message: 我是第[8]条消息........, Will wait [2s]
【NO 3】 Received a message: 我是第[7]条消息......., Will wait [3s]
【NO 1】 Received a message: 我是第[9]条消息........., Will wait [1s]
【NO 1】Done
【NO 1】 Received a message: 我是第[10]条消息.........., Will wait [0s]
【NO 1】Done
【NO 2】Done
【NO 3】Done
如上:【NO2】先消费第
3
条消息,如果没有上面的设置那么它完成后会先去消费第6
条消息,但是通过Qos
方法设置后,先消费第4
条消息
消息确认
现在这个场景下,如果我们启动
worker.go
后,然后启动task.go
,此时消息如果还没有接收完,就断开worker.go
,那么此时刚才未处理的消息就会消失,再次 执行worker.go
也不会接收到任何消息了
TIP
- 上面的现象是因为在调用
ch.Consume
函数时,将第3个参数auto-ack
设置为了true
,这表明只要Producer
成功发送消息后,就将该消息标记为已确认
,默认该消息已经完成处理,不会管Consumer
是否收到
要处理上面的问题,需要以下步骤
1、将
auto-ack
设置为false
2、消息处理完成后手动调用
Ack
函数确认已收到消息
worker.go
修改如下:
func consumeMessage(ch *amqp.Channel, queueName string, serialNumber int) {
// 4、consume message
message, err := ch.Consume(
queueName,
"", // consumer
false, // auto-ack 不自动确认消息
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
shared.FailOnError(err, "consume queue message error")
go func() {
for msg := range message {
dotCount := bytes.Count(msg.Body, []byte("."))
duration := time.Duration(10 - dotCount)
fmt.Printf("【NO %d】 Received a message: %s, Will wait [%s]\n", serialNumber, msg.Body, duration*time.Second)
time.Sleep(duration * time.Second)
fmt.Printf("Done \n")
// Manually confirm that the message has been delivered
msg.Ack(false)
}
}()
}
消息持久化
现在的代码中,如果在
发送/接收
消息的过程中,RabbitMQ
服务挂了,或者出错退出了
,那么消息就会丢失为了解决这个问题,可以设置
消息持久化
,对于Producer
和Consumer
都要设置
1、发送方
- 调用
ch.PublishWithContext
函数时,设置消息body(amqp.Publishing)
时,将模式DeliveryMode
改为持久化- 声明队列调用
ch.QueueDeclare
函数时,将第二个参数durable
设置为true
task.go
修改如下:
// 3. declare a queue,it will only be created if it doesn't exist already
queue, err := ch.QueueDeclare(
"second_queue", // queue name
true, // durable
false, //delete when unused
false, //exclusive
false, // no-wait
nil, // arguments
)
err = ch.PublishWithContext(
withTimeoutCtx,
"", //exchange name
queue.Name, // routing key
false, // mandatory
false, //immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent, // DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
Body: []byte(messageBody),
ContentType: "text/plain",
},
)
2、接收方
- 声明队列调用
ch.QueueDeclare
函数时,将第二个参数durable
设置为true
// 3. declare a queue,it will only be created if it doesn't exist already
queue, err := ch.QueueDeclare(
"second_queue", // queue name
true, // durable
false, //delete when unused
false, //exclusive
false, // no-wait
nil, // arguments
)