安装启动和基本使用
简介
RabbitMQ
是一款可靠且成熟的消息传递和流媒体代理,可以轻松部署在云环境、本地环境以及本地计算机上
核心概念
RabbitMQ
是一个消息代理
(broker):它接受并转发消息
可以把它想象成是一个
邮局
:我们把要寄出的信件放入邮箱时,可以确信邮递员最终会将信件送到收件人手中 在这个类比中,RabbitMQ
就像一个邮箱
、一个邮局
和一名邮递员
它有以下几个重要的概念:
Message
:RabbitMQ接收
、存储
和转发
=>二进制数据块
—-也就是message
Producer
:生产者是一个发送消息的用户应用程序Exchange
:交换机一边接收来自生产者的消息
,另一边将它们推送到队列
中。 交换机必须确切地知道如何处理接收到的消息,包括如下几条:消息是否应该添加到到特定队列
消息是否应该应添加到多个队列
消息是否应该被丢弃
Queue
:队列是一个存储消息的缓冲区Consumer
:消费者是接收消息的用户应用程序
安装和启动
Docker启动
直接使用
docker run
来启动一个image
TIP
3.13-management
版本自带了web管理界面
,web界面的端口号默认是15672
MQ
服务默认端口号是5672
shell
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
或者使用
docker-compose
shell
version: "2.8"
services:
RabbitMQ:
image: rabbitmq:3.13-management
container_name: RabbitMQ
restart: always
ports:
- "5672:5672"
- "15672:15672"
使用homebrew
安装启动教程
TIP
RabbitMQ
是用Erlang
编写的,因此需要先安装Erlang
- 使用
Homebrew
安装Erlang
shell
brew install erlang
- 安装
RabbitMQ
shell
brew install rabbitmq
- 具体教程点击Mac安装教程
简单示例
原理如图所示:
- 这里使用Go语言演示
TIP
RabbitMQ支持多种协议,我们使用
AMQP 0-9-1
,这是一种用于消息传递的开放通用协议有许多不同语言的RabbitMQ客户端,我们使用
Go amqp
客户端
初始化项目并下载依赖
shell
go mod init <your-module-name>
go get github.com/rabbitmq/amqp091-go
写一个错误处理的通用函数
Go
// FailOnError a helper function to check the return value for each amqp call
func FailOnError(err error, msg string) {
if err != nil {
log.Panicf("【%s】: %s", msg, err)
}
}
生产者(Producer)
- 连接RabbitMQ服务
Go
// 1. connect to the RabbitMQ
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
shared.FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
- 开启一个
channel
Go
// 2. open a channel
ch, err := conn.Channel()
shared.FailOnError(err, "create channel error")
defer ch.Close()
WARNING
- 这里的
ch
不是go原生的channel,而是一个AMQP channel
Go
func (c *Connection) Channel() (*Channel, error)
- 声明一个队列,并发送消息
Go
// 3. declare a queue,it will only be created if it doesn't exist already
queue, err := ch.QueueDeclare(
"first_queue", // queue name
false, // durable
false, //delete when unused
false, //exclusive
false, // no-wait
nil, // arguments
)
shared.FailOnError(err, "declare queue error")
withTimeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second)
defer cancelFunc()
// declare a message to be sent
messageBody := "hello, I am the first message, you have received me"
err = ch.PublishWithContext(
withTimeoutCtx,
"", //exchange name
queue.Name, // routing key
false, // mandatory
false, //immediate
amqp091.Publishing{
Body: []byte(messageBody),
ContentType: "text/plain",
},
)
shared.FailOnError(err, "Publish error")
log.Printf(" [x] Sent %s\n", messageBody)
WARNING
- 这里的
queue
只有不存在时才会被创建,但是Producer
和Consumer
的代码中都要声明,因为不知道Producer
和Consumer
哪个会先启动
消费者/接收者(Consumer)
我们启动一个Consumer,然后让其一直监听是否有可接收的匹配消息
- 和
Producer
一样,都需要创建链接
、channel
、声明队列
Go
// 1、create connection
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
shared.FailOnError(err, "create connection error")
defer conn.Close()
// open a channel
ch, err := conn.Channel()
shared.FailOnError(err, "open a channel error")
defer ch.Close()
// 3、declare queue
queue, err := ch.QueueDeclare(
"first_queue",
false,
false,
false,
false,
nil,
)
shared.FailOnError(err, "declare queue error")
- 消费消息
Go
// 4、consume message
message, err := ch.Consume(
queue.Name,
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
shared.FailOnError(err, "consume queue message error")
waitCh := make(chan struct{})
go func() {
for msg := range message {
log.Printf("Received a message: %s \n", msg.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-waitCh
TIP
- 为了能持续接受到消息,这里使用
channel
做锁
完整代码
TIP
- 这个例子中
Producer
和Consumer
通过命名队列
来确定发送和接受的消息
send.go
Go
package main
import (
"context"
"github.com/rabbitmq/amqp091-go"
"go-rabbitmq/shared"
"log"
"time"
)
func connectAndSendMessage() {
// 1. connect to the RabbitMQ
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
shared.FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 2. open a channel
ch, err := conn.Channel()
shared.FailOnError(err, "create channel error")
defer ch.Close()
// 3. declare a queue,it will only be created if it doesn't exist already
queue, err := ch.QueueDeclare(
"first_queue", // queue name
false, // durable
false, //delete when unused
false, //exclusive
false, // no-wait
nil, // arguments
)
shared.FailOnError(err, "declare queue error")
withTimeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second)
defer cancelFunc()
// declare a message to be sent
messageBody := "hello, I am the first message, you have received me"
err = ch.PublishWithContext(
withTimeoutCtx,
"", //exchange name
queue.Name, // routing key
false, // mandatory
false, //immediate
amqp091.Publishing{
Body: []byte(messageBody),
ContentType: "text/plain",
},
)
shared.FailOnError(err, "Publish error")
log.Printf(" [x] Sent %s\n", messageBody)
}
func main() {
connectAndSendMessage()
}
receive.go
Go
package main
import (
"github.com/rabbitmq/amqp091-go"
"go-rabbitmq/shared"
"log"
)
func createReceiverAndReceiveMessage() {
// 1、create connection
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
shared.FailOnError(err, "create connection error")
defer conn.Close()
// open a channel
ch, err := conn.Channel()
shared.FailOnError(err, "open a channel error")
defer ch.Close()
// 3、declare queue
queue, err := ch.QueueDeclare(
"first_queue",
false,
false,
false,
false,
nil,
)
shared.FailOnError(err, "declare queue error")
// 4、consume message
message, err := ch.Consume(
queue.Name,
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
shared.FailOnError(err, "consume queue message error")
waitCh := make(chan struct{})
go func() {
for msg := range message {
log.Printf("Received a message: %s \n", msg.Body)
}
}()
log.Printf("[*] Waiting for messages. To exit press CTRL+C")
<-waitCh
}
func main() {
createReceiverAndReceiveMessage()
}