发布订阅
之前的代码实现的都是
一对一
的模式,也就是一个任务分配给一个消费者去消费
,但是其实也可以实现一个任务分配给多个消费者
可以使用
发布/订阅
模式来实现,这种模式下,要使用到交换机(Exchange)
交换机
交换机(Exchange)
的作用从Producer
处接收消息(Message)
,然后将消息发送到队列(Queues)
中
交换机(Exchange)
必须明确自己如何去处理消息,这个规则取决于交换机的类型
交换机(Exchange)
有这几种类型:Topic
、Direct
、Headers
、Fanout
之前的代码中都没有用到
交换机(Exchange)
,都是直接声明一个具名队列
如下:函数
ch.PublishWithContext
中exchange name
是空字符串,这时会使用一个默认的交换机
// 3. declare a queue,it will only be created if it doesn't exist already
queue, err := ch.QueueDeclare(
"second_queue", // queue name
true, // durable
false, //delete when unused
false, //exclusive
false, // no-wait
nil, // arguments
)
// publish message
err = ch.PublishWithContext(
withTimeoutCtx,
"", //exchange name
queue.Name, // routing key
false, // mandatory
false, //immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
Body: []byte(messageBody),
ContentType: "text/plain",
},
)
Direct
(直连模式)
交换机(Exchange)
将消息发送到某个匹配的队列中
,匹配规则是:这个队列的binding key(其实也可以叫routing key)
要和消息的routing key
一致
TIP
队列的
binding key
在调用ch.QueueBind
方法时指定消息的
routing key
在调用ch.PublishWithContext
方法中指定
适用于明确指定的路由,例如,处理特定类型的任务,简单示例代码如下:
producer.go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
"directLogs", // exchange name
"direct", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
// xxxx 其他代码
err = ch.PublishWithContext(
ctx, // context
"directLogs", // exchange name
"directRoutingKey", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(messageBody),
},
)
consumer.go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
"directLogs", // exchange name
"direct", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
q, err := ch.QueueDeclare(
"directQueue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
// 将交换机和队列进行绑定
err = ch.QueueBind(
q.Name, // queue name
"directRoutingKey", // binding key
"directLogs", // exchange name
false,
nil,
)
// xxxx 其他代码
Topic
(主题模式)
将接收到的消息放到和交换机指定的
routing key
匹配的队列
里面额外增加使用
*
(匹配一个单词
)或使用#
(匹配多个单词
)
比起
Direct
模式,在验证routing key
的时候,多了匹配规则
producer.go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
"topicTask", // exchange name
"topic", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
// xxxx 其他代码
err = ch.PublishWithContext(
ctx, // context
"topicTask", // exchange name
"topicRoutingKey.abc", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(messageBody),
},
)
consumer.go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
"topicTask", // exchange name
"topic", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
q, err := ch.QueueDeclare(
"topicQueue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
// 将交换机和队列进行绑定
err = ch.QueueBind(
q.Name, // queue name
"topicRoutingKey.*", // routing key 多了一个匹配规则
"topicTask", // exchange name
false,
nil,
)
// xxxx 其他代码
Headers
(头部模式)
使用
消息头属性(headers)
来路由消息,而不是路由键
,可以匹配多个头使用
Headers
模式,不用指定routing key
amqp.Table
的数据类型是Map
type Table map[string]interface{}
producer.go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
"headersTask", // exchange name
"headers", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
// xxxx 其他代码
err = ch.PublishWithContext(
ctx, // context
"headersTask", // exchange name
"", // routing key Headers模式下不用指定routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(messageBody),
Headers: amqp.Table{ // Headers模式下,会检查该字段,要传该字段
"format": "pdf",
"type": "report",
},
},
)
consumer.go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
"headersTask", // exchange name
"headers", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
q, err := ch.QueueDeclare(
"headersQueue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
// 将交换机和队列进行绑定
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"headersTask", // exchange name
false,
amqp.Table{ // 此处要和上面发送时保持一致
"format": "pdf",
"type": "report",
},
)
// xxxx 其他代码
Fanout
(广播模式)
把消息放入
交换机所有的队列
,实现广播功能
Fanout
模式会忽略routing key
producer.go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
"fanoutTask", // exchange name
"fanout", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
// xxxx 其他代码
err = ch.PublishWithContext(
ctx, // context
"fanoutTask", // exchange name
"", // routing key Fanout 模式下不用指定routing key,会忽略该字段
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(messageBody),
},
)
consumer.go
// xxxx 其他代码
// declare exchange
err = ch.ExchangeDeclare(
"fanoutTask", // exchange name
"fanout", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
q, err := ch.QueueDeclare(
"fanoutQueue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
// 将交换机和队列进行绑定
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"fanoutTask", // exchange name
false,
nil
)
// xxxx 其他代码
一个简单案例
生产者生产出一些日志消息,所有的消费者接收消息并打印出来,这种场景可以使用
Fanout(广播模式)
完整代码如下:
生产方
emit.go
package main
import (
"context"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"go-rabbitmq/shared"
"strings"
"time"
)
func startUpAndSend() {
// create connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
shared.FailOnError(err, "create connection error")
defer conn.Close()
// create channel
ch, err := conn.Channel()
shared.FailOnError(err, "create channel error")
defer ch.Close()
// declare exchange
err = ch.ExchangeDeclare(
"logsExchange",
"fanout", // exchange type => headers, topic, direct, fanout
false, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil, // args
)
shared.FailOnError(err, "declare exchange error")
// publish message
ctx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second)
defer cancelFunc()
for i := 0; i < 6; i++ {
go func(i int) {
messageBody := fmt.Sprintf("我是第[%d]条消息%s", i+1, strings.Repeat(".", i+1))
err = ch.PublishWithContext(
ctx,
"logsExchange",
"",
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(messageBody),
},
)
shared.FailOnError(err, "publish message error")
fmt.Printf(" [x] Sent %s\n", messageBody)
}(i)
}
time.Sleep(6 * time.Second)
}
func main() {
startUpAndSend()
}
消费方
receive.go
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"go-rabbitmq/shared"
)
func startUpAndReceive() {
// create connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
shared.FailOnError(err, "create connection error")
defer conn.Close()
// create channel
ch, err := conn.Channel()
shared.FailOnError(err, "create channel error")
defer ch.Close()
// declare exchange
err = ch.ExchangeDeclare(
"logsExchange",
"fanout", // exchange type => headers, topic, direct, fanout
false, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil, // args
)
shared.FailOnError(err, "declare exchange error")
// declare unnamed queue
queue, err := ch.QueueDeclare("", false, false, true, false, nil)
shared.FailOnError(err, "declare queue error")
// bind exchange and queue
err = ch.QueueBind(queue.Name, "", "logsExchange", false, nil)
shared.FailOnError(err, "bind exchange and queue error")
// consume message
lockChan := make(chan struct{})
messages, err := ch.Consume(queue.Name, "", true, false, false, false, nil)
shared.FailOnError(err, "consume message error")
go func() {
for msg := range messages {
fmt.Printf("Received a message: %s\n", msg.Body)
}
}()
fmt.Println("[*] Waiting for messages. To exit press CTRL+C")
<-lockChan
}
func main() {
startUpAndReceive()
}
上面代码,在多个窗口分别运行
receive.go
中,然后运行emit.go
,每个窗口都会接收到发送的数据