Skip to content
On this page

发布订阅

之前的代码实现的都是一对一的模式,也就是一个任务分配给一个消费者去消费,但是其实也可以实现一个任务分配给多个消费者

可以使用发布/订阅模式来实现,这种模式下,要使用到交换机(Exchange)

交换机

交换机(Exchange)的作用从Producer处接收消息(Message),然后将消息发送到队列(Queues)

交换机(Exchange)必须明确自己如何去处理消息,这个规则取决于交换机的类型

交换机(Exchange)有这几种类型:TopicDirectHeadersFanout

之前的代码中都没有用到交换机(Exchange),都是直接声明一个具名队列

如下:函数ch.PublishWithContextexchange name是空字符串,这时会使用一个默认的交换机

Go
// 3. declare a queue,it will only be created if it doesn't exist already
queue, err := ch.QueueDeclare(
    "second_queue", // queue name
    true,           // durable
    false,          //delete when unused
    false,          //exclusive
    false,          // no-wait
    nil,            // arguments
)

// publish message
err = ch.PublishWithContext(
    withTimeoutCtx,
    "",         //exchange name
    queue.Name, // routing key
    false,      // mandatory
    false,      //immediate
    amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        Body:         []byte(messageBody),
        ContentType:  "text/plain",
    },
)

Direct(直连模式)

交换机(Exchange)将消息发送到某个匹配的队列中,匹配规则是:这个队列的binding key(其实也可以叫routing key)要和消息的routing key一致

TIP

  • 队列的binding key在调用ch.QueueBind方法时指定

  • 消息的routing key在调用ch.PublishWithContext方法中指定

适用于明确指定的路由,例如,处理特定类型的任务,简单示例代码如下:

producer.go

Go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
    "directLogs", // exchange name
    "direct",      // exchange type
    true,          // durable
    false,         // auto-deleted
    false,         // internal
    false,         // no-wait
    nil,           // arguments
)
// xxxx 其他代码
err = ch.PublishWithContext(
    ctx, // context
    "directLogs", // exchange name
    "directRoutingKey", // routing key
    false, // mandatory
    false, // immediate
    amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType:  "text/plain",
        Body:         []byte(messageBody),
    },
)

consumer.go

Go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
    "directLogs", // exchange name
    "direct",      // exchange type
    true,          // durable
    false,         // auto-deleted
    false,         // internal
    false,         // no-wait
    nil,           // arguments
)
q, err := ch.QueueDeclare(
    "directQueue", // name
    true,    // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
)
// 将交换机和队列进行绑定
err = ch.QueueBind(
    q.Name,        // queue name
    "directRoutingKey", // binding key
    "directLogs", // exchange name
    false,
    nil,
)
// xxxx 其他代码

Topic(主题模式)

将接收到的消息放到和交换机指定的routing key匹配的队列里面

额外增加使用*(匹配一个单词)或使用#(匹配多个单词)

比起Direct模式,在验证routing key的时候,多了匹配规则

producer.go

Go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
    "topicTask", // exchange name
    "topic",      // exchange type
    true,          // durable
    false,         // auto-deleted
    false,         // internal
    false,         // no-wait
    nil,           // arguments
)
// xxxx 其他代码
err = ch.PublishWithContext(
    ctx, // context
    "topicTask", // exchange name
    "topicRoutingKey.abc", // routing key 
    false, // mandatory
    false, // immediate
    amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType:  "text/plain",
        Body:         []byte(messageBody),
    },
)

consumer.go

Go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
    "topicTask", // exchange name
    "topic",      // exchange type
    true,          // durable
    false,         // auto-deleted
    false,         // internal
    false,         // no-wait
    nil,           // arguments
)
q, err := ch.QueueDeclare(
    "topicQueue", // name
    true,    // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
)
// 将交换机和队列进行绑定
err = ch.QueueBind(
    q.Name,        // queue name
    "topicRoutingKey.*", // routing key 多了一个匹配规则
    "topicTask", // exchange name
    false,
    nil,
)
// xxxx 其他代码

Headers(头部模式)

使用消息头属性(headers)来路由消息,而不是路由键,可以匹配多个头

使用Headers模式,不用指定routing key

amqp.Table的数据类型是Map

Go
type Table map[string]interface{}

producer.go

Go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
    "headersTask", // exchange name
    "headers",      // exchange type
    true,          // durable
    false,         // auto-deleted
    false,         // internal
    false,         // no-wait
    nil,           // arguments
)
// xxxx 其他代码
err = ch.PublishWithContext(
    ctx, // context
    "headersTask", // exchange name
    "", // routing key Headers模式下不用指定routing key
    false, // mandatory
    false, // immediate
    amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType:  "text/plain",
        Body:         []byte(messageBody),
        Headers: amqp.Table{ // Headers模式下,会检查该字段,要传该字段
            "format": "pdf",
            "type":   "report",
        },
    },
)

consumer.go

Go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
    "headersTask", // exchange name
    "headers",      // exchange type
    true,          // durable
    false,         // auto-deleted
    false,         // internal
    false,         // no-wait
    nil,           // arguments
)
q, err := ch.QueueDeclare(
    "headersQueue", // name
    true,    // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
)
// 将交换机和队列进行绑定
err = ch.QueueBind(
    q.Name,        // queue name
    "", // routing key
    "headersTask", // exchange name
    false,
    amqp.Table{ // 此处要和上面发送时保持一致
        "format": "pdf",
        "type":   "report",
    },
)
// xxxx 其他代码

Fanout(广播模式)

把消息放入交换机所有的队列,实现广播功能

Fanout模式会忽略routing key

producer.go

Go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
    "fanoutTask", // exchange name
    "fanout",      // exchange type
    true,          // durable
    false,         // auto-deleted
    false,         // internal
    false,         // no-wait
    nil,           // arguments
)
// xxxx 其他代码
err = ch.PublishWithContext(
    ctx, // context
    "fanoutTask", // exchange name
    "", // routing key Fanout 模式下不用指定routing key,会忽略该字段
    false, // mandatory
    false, // immediate
    amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType:  "text/plain",
        Body:         []byte(messageBody),
    },
)

consumer.go

Go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
    "fanoutTask", // exchange name
    "fanout",      // exchange type
    true,          // durable
    false,         // auto-deleted
    false,         // internal
    false,         // no-wait
    nil,           // arguments
)
q, err := ch.QueueDeclare(
    "fanoutQueue", // name
    true,    // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
)
// 将交换机和队列进行绑定
err = ch.QueueBind(
    q.Name,        // queue name
    "", // routing key
    "fanoutTask", // exchange name
    false,
    nil
)
// xxxx 其他代码

一个简单案例

生产者生产出一些日志消息,所有的消费者接收消息并打印出来,这种场景可以使用Fanout(广播模式)

完整代码如下:

生产方emit.go

Go
package main

import (
	"context"
	"fmt"
	amqp "github.com/rabbitmq/amqp091-go"
	"go-rabbitmq/shared"
	"strings"
	"time"
)

func startUpAndSend() {
	// create connection
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
	shared.FailOnError(err, "create connection error")
	defer conn.Close()

	// create channel
	ch, err := conn.Channel()
	shared.FailOnError(err, "create channel error")
	defer ch.Close()

	// declare exchange
	err = ch.ExchangeDeclare(
		"logsExchange",
		"fanout", // exchange type => headers, topic, direct, fanout
		false,    // durable
		false,    // auto-delete
		false,    // internal
		false,    // no-wait
		nil,      // args
	)
	shared.FailOnError(err, "declare exchange error")

	// publish message
	ctx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second)
	defer cancelFunc()
	for i := 0; i < 6; i++ {
		go func(i int) {
			messageBody := fmt.Sprintf("我是第[%d]条消息%s", i+1, strings.Repeat(".", i+1))
			err = ch.PublishWithContext(
				ctx,
				"logsExchange",
				"",
				false,
				false,
				amqp.Publishing{
					DeliveryMode: amqp.Persistent,
					ContentType:  "text/plain",
					Body:         []byte(messageBody),
				},
			)
			shared.FailOnError(err, "publish message error")
			fmt.Printf(" [x] Sent %s\n", messageBody)
		}(i)
	}
	time.Sleep(6 * time.Second)
}

func main() {
	startUpAndSend()
}

消费方receive.go

Go
package main

import (
	"fmt"
	amqp "github.com/rabbitmq/amqp091-go"
	"go-rabbitmq/shared"
)

func startUpAndReceive() {
	// create connection
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
	shared.FailOnError(err, "create connection error")
	defer conn.Close()

	// create channel
	ch, err := conn.Channel()
	shared.FailOnError(err, "create channel error")
	defer ch.Close()

	// declare exchange
	err = ch.ExchangeDeclare(
		"logsExchange",
		"fanout", // exchange type => headers, topic, direct, fanout
		false,    // durable
		false,    // auto-delete
		false,    // internal
		false,    // no-wait
		nil,      // args
	)
	shared.FailOnError(err, "declare exchange error")

	// declare unnamed queue
	queue, err := ch.QueueDeclare("", false, false, true, false, nil)
	shared.FailOnError(err, "declare queue error")

	// bind exchange and queue
	err = ch.QueueBind(queue.Name, "", "logsExchange", false, nil)
	shared.FailOnError(err, "bind exchange and queue error")

	// consume message
	lockChan := make(chan struct{})
	messages, err := ch.Consume(queue.Name, "", true, false, false, false, nil)
	shared.FailOnError(err, "consume message error")
	go func() {
		for msg := range messages {
			fmt.Printf("Received a message: %s\n", msg.Body)
		}
	}()
	fmt.Println("[*] Waiting for messages. To exit press CTRL+C")
	<-lockChan
}

func main() {
	startUpAndReceive()
}

上面代码,在多个窗口分别运行receive.go中,然后运行emit.go,每个窗口都会接收到发送的数据