Skip to content
On this page

工作队列

循环分发消息

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者,每个Consumer将收到相同数量的消息,这种分发消息的方式称为循环轮询

我们可以同时开启多个协程去消费消息来测试这一特性,消费消息的我们称之为worker

写一个通用的接收消息的通用函数consumeMessage(worker.go),然后开启3个协程去消费消息

可以看到,每个message被消费后,会等待一段时间,才能去消费下一个消息(消息文案中包含几个.,就等待10-.的个数秒)

Go
for i := 1; i <= 3; i++ {
    go consumeMessage(ch, queue.Name, i)
}
func consumeMessage(ch *amqp.Channel, queueName string, serialNumber int) {
	// 4、consume message
	message, err := ch.Consume(
		queueName,
		"",    // consumer
		true,  // auto-ack
		false, // exclusive
		false, // no-local
		false, // no-wait
		nil,   // args
	)
	shared.FailOnError(err, "consume queue message error")
	for msg := range message {
		fmt.Printf("【NO %d】 Received a message: %s", serialNumber, msg.Body)
		dotCount := bytes.Count(msg.Body, []byte("."))
		duration := time.Duration(10 - dotCount)
		time.Sleep(duration * time.Second)
		fmt.Printf("Done \n")
	}
}

而在生产消息时(task.go),一次性发送多条

Go
for i := 0; i < 10; i++ {
    messageBody := fmt.Sprintf("我是第[%d]条消息%s \n", i+1, strings.Repeat(".", i+1))
    err = ch.PublishWithContext(
        withTimeoutCtx,
        "",         //exchange name
        queue.Name, // routing key
        false,      // mandatory
        false,      //immediate
        amqp.Publishing{
            Body:        []byte(messageBody),
            ContentType: "text/plain",
        },
    )
    shared.FailOnError(err, "Publish message error")
    fmt.Printf(" [x] Sent %s\n", messageBody)
}

先运行消费者代码worker.go,然后执行生产者代码task.go,打印如下:

shell
# workQueues/task.go 打印
 [x] Sent 我是第[1]条消息.
 [x] Sent 我是第[2]条消息..
 [x] Sent 我是第[3]条消息...
 [x] Sent 我是第[4]条消息....
 [x] Sent 我是第[5]条消息.....
 [x] Sent 我是第[6]条消息......
 [x] Sent 我是第[7]条消息.......
 [x] Sent 我是第[8]条消息........
 [x] Sent 我是第[9]条消息.........
 [x] Sent 我是第[10]条消息..........
 
# go run workQueues/worker.go 打印
【NO 1 Received a message: 我是第[3]条消息..., Will wait [7s]
【NO 3 Received a message: 我是第[1]条消息., Will wait [9s]
【NO 2 Received a message: 我是第[2]条消息.., Will wait [8s]
【NO 1】Done
【NO 1 Received a message: 我是第[6]条消息......, Will wait [4s]
【NO 2】Done
【NO 2 Received a message: 我是第[5]条消息....., Will wait [5s]
【NO 3】Done
【NO 3 Received a message: 我是第[4]条消息...., Will wait [6s]
【NO 1】Done
【NO 1 Received a message: 我是第[9]条消息........., Will wait [1s]
【NO 1】Done
【NO 2】Done
【NO 2 Received a message: 我是第[8]条消息........, Will wait [2s]
【NO 2】Done
【NO 3】Done
【NO 3 Received a message: 我是第[7]条消息......., Will wait [3s]
【NO 3】Done
【NO 3 Received a message: 我是第[10]条消息.........., Will wait [0s]
【NO 3】Done

可以看到上面的打印,消息是轮训发放的,就算要等很久才能消费,也会按轮训的顺序分配

序号【NO1】消费第3、6、9条消息

序号【NO2】消费第2、5、8条消息

序号【NO3】消费第1、4、7、10条消息

但是其实第4条消息【NO1】来消费是比较合适的,因为它等待的时间最短,可以更早地去处理下一条消息

要想达到这样的效果,可以通过ch.Qos方法,将prefetch设置为1,它表示在消费者确认消息之前,RabbitMQ可以发送给消费者的最大消息数,也就是告诉RabbutMQ一次不要给一个工作者多于1条消息,这通常用于确保消费者逐条处理消息,避免消息堆积。

Go
err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
)
  • prefetchCount(int):

解释: 预取计数(Prefetch Count)定义了在消费者确认消息之前,RabbitMQ 可以发送给消费者的最大消息数。简单来说,就是消费者在确认之前最多可以收到多少条消息

示例中的值: 1 表示消费者在确认一条消息之前最多只能收到一条消息。这通常用于确保消费者逐条处理消息,避免消息堆积。

  • prefetchSize(int):

解释: 预取大小(Prefetch Size)定义了在消费者确认消息之前,RabbitMQ可以发送给消费者的最大字节数。通常情况下,这个值设置为0,表示不限制大小

示例中的值: 0 表示没有预取大小的限制。

  • global(bool):

解释: 全局标志(Global Flag)决定了QoS设置是应用于整个通道还是仅应用于当前消费者。

示例中的值: false表示QoS设置仅应用于当前消费者

如果设置为true,则QoS设置应用于整个channel上所有的消费者

调整后输出如下:

shell
【NO 2 Received a message: 我是第[3]条消息..., Will wait [7s]
【NO 3 Received a message: 我是第[1]条消息., Will wait [9s]
【NO 1 Received a message: 我是第[2]条消息.., Will wait [8s]
【NO 2】Done
【NO 2 Received a message: 我是第[4]条消息...., Will wait [6s]
【NO 1】Done
【NO 1 Received a message: 我是第[5]条消息....., Will wait [5s]
【NO 3】Done
【NO 3 Received a message: 我是第[6]条消息......, Will wait [4s]
【NO 3】Done
【NO 2】Done
【NO 1】Done
【NO 2 Received a message: 我是第[8]条消息........, Will wait [2s]
【NO 3 Received a message: 我是第[7]条消息......., Will wait [3s]
【NO 1 Received a message: 我是第[9]条消息........., Will wait [1s]
【NO 1】Done
【NO 1 Received a message: 我是第[10]条消息.........., Will wait [0s]
【NO 1】Done
【NO 2】Done
【NO 3】Done

如上:【NO2】先消费第3条消息,如果没有上面的设置那么它完成后会先去消费第6条消息,但是通过Qos方法设置后,先消费第4条消息

消息确认

现在这个场景下,如果我们启动worker.go后,然后启动task.go,此时消息如果还没有接收完,就断开worker.go,那么此时刚才未处理的消息就会消失,再次 执行worker.go也不会接收到任何消息了

TIP

  • 上面的现象是因为在调用ch.Consume函数时,将第3个参数auto-ack设置为了true,这表明只要Producer成功发送消息后,就将该消息标记为已确认,默认该消息已经完成处理,不会管Consumer是否收到

要处理上面的问题,需要以下步骤

1、将auto-ack设置为false

2、消息处理完成后手动调用Ack函数确认已收到消息

worker.go修改如下:

Go
func consumeMessage(ch *amqp.Channel, queueName string, serialNumber int) {
	// 4、consume message
	message, err := ch.Consume(
		queueName,
		"",    // consumer
		false, // auto-ack 不自动确认消息
		false, // exclusive
		false, // no-local
		false, // no-wait
		nil,   // args
	)
	shared.FailOnError(err, "consume queue message error")
	go func() {
		for msg := range message {
			dotCount := bytes.Count(msg.Body, []byte("."))
			duration := time.Duration(10 - dotCount)
			fmt.Printf("【NO %d】 Received a message: %s, Will wait [%s]\n", serialNumber, msg.Body, duration*time.Second)
			time.Sleep(duration * time.Second)
			fmt.Printf("Done \n")
			// Manually confirm that the message has been delivered
			msg.Ack(false)
		}
	}()
}

消息持久化

现在的代码中,如果在发送/接收消息的过程中,RabbitMQ服务挂了,或者出错退出了,那么消息就会丢失

为了解决这个问题,可以设置消息持久化,对于ProducerConsumer都要设置

1、发送方

  • 调用ch.PublishWithContext函数时,设置消息body(amqp.Publishing)时,将模式DeliveryMode改为持久化
  • 声明队列调用ch.QueueDeclare函数时,将第二个参数durable设置为true

task.go修改如下:

Go
// 3. declare a queue,it will only be created if it doesn't exist already
queue, err := ch.QueueDeclare(
    "second_queue", // queue name
    true,           // durable
    false,          //delete when unused
    false,          //exclusive
    false,          // no-wait
    nil,            // arguments
)
err = ch.PublishWithContext(
    withTimeoutCtx,
    "",         //exchange name
    queue.Name, // routing key
    false,      // mandatory
    false,      //immediate
    amqp.Publishing{
        DeliveryMode: amqp.Persistent, // DeliveryMode  uint8  // Transient (0 or 1) or Persistent (2)
        Body:         []byte(messageBody),
        ContentType:  "text/plain",
    },
)

2、接收方

  • 声明队列调用ch.QueueDeclare函数时,将第二个参数durable设置为true
Go
// 3. declare a queue,it will only be created if it doesn't exist already
queue, err := ch.QueueDeclare(
    "second_queue", // queue name
    true,           // durable
    false,          //delete when unused
    false,          //exclusive
    false,          // no-wait
    nil,            // arguments
)