Skip to content
On this page

安装启动和基本使用

简介

RabbitMQ是一款可靠且成熟的消息传递和流媒体代理,可以轻松部署在云环境、本地环境以及本地计算机上

核心概念

RabbitMQ是一个消息代理(broker):它接受并转发消息

可以把它想象成是一个邮局:我们把要寄出的信件放入邮箱时,可以确信邮递员最终会将信件送到收件人手中 在这个类比中,RabbitMQ就像一个邮箱一个邮局一名邮递员

它有以下几个重要的概念:

  1. Message:RabbitMQ接收存储转发=>二进制数据块—-也就是message
  2. Producer:生产者是一个发送消息的用户应用程序
  3. Exchange:交换机一边接收来自生产者的消息,另一边将它们推送到队列中。 交换机必须确切地知道如何处理接收到的消息,包括如下几条:
    • 消息是否应该添加到到特定队列

    • 消息是否应该应添加到多个队列

    • 消息是否应该被丢弃

  4. Queue:队列是一个存储消息的缓冲区
  5. Consumer:消费者是接收消息的用户应用程序

安装和启动

Docker启动

直接使用docker run来启动一个image

TIP

  • 3.13-management版本自带了web管理界面,web界面的端口号默认是15672

  • MQ服务默认端口号是5672

shell
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management

或者使用docker-compose

shell
version: "2.8"
services:
  RabbitMQ:
    image: rabbitmq:3.13-management
    container_name: RabbitMQ
    restart: always
    ports:
      - "5672:5672"
      - "15672:15672"

使用homebrew安装启动教程

TIP

  • RabbitMQ是用Erlang编写的,因此需要先安装Erlang
  • 使用Homebrew安装Erlang
shell
brew install erlang
  • 安装RabbitMQ
shell
brew install rabbitmq

简单示例

原理如图所示:

TIP

  • RabbitMQ支持多种协议,我们使用AMQP 0-9-1,这是一种用于消息传递的开放通用协议

  • 有许多不同语言的RabbitMQ客户端,我们使用Go amqp客户端

初始化项目并下载依赖

shell
go mod init <your-module-name>
go get github.com/rabbitmq/amqp091-go

写一个错误处理的通用函数

Go
// FailOnError a helper function to check the return value for each amqp call
func FailOnError(err error, msg string) {
	if err != nil {
		log.Panicf("【%s】: %s", msg, err)
	}
}

生产者(Producer)

  1. 连接RabbitMQ服务
Go
// 1. connect to the RabbitMQ
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
shared.FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
  1. 开启一个channel
Go
// 2. open a channel
ch, err := conn.Channel()
shared.FailOnError(err, "create channel error")
defer ch.Close()

WARNING

  • 这里的ch不是go原生的channel,而是一个AMQP channel
Go
func (c *Connection) Channel() (*Channel, error)
  1. 声明一个队列,并发送消息
Go
// 3. declare a queue,it will only be created if it doesn't exist already
queue, err := ch.QueueDeclare(
    "first_queue", // queue name
    false,         // durable
    false,         //delete when unused
    false,         //exclusive
    false,         // no-wait
    nil,           // arguments
)
shared.FailOnError(err, "declare queue error")

withTimeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second)
defer cancelFunc()
// declare a message to be sent
messageBody := "hello, I am the first message, you have received me"
err = ch.PublishWithContext(
    withTimeoutCtx,
    "",         //exchange name
    queue.Name, // routing key
    false,      // mandatory
    false,      //immediate
    amqp091.Publishing{
        Body:        []byte(messageBody),
        ContentType: "text/plain",
    },
)
shared.FailOnError(err, "Publish error")
log.Printf(" [x] Sent %s\n", messageBody)

WARNING

  • 这里的queue只有不存在时才会被创建,但是ProducerConsumer的代码中都要声明,因为不知道ProducerConsumer哪个会先启动

消费者/接收者(Consumer)

我们启动一个Consumer,然后让其一直监听是否有可接收的匹配消息

  1. Producer一样,都需要创建链接channel声明队列
Go
// 1、create connection
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
shared.FailOnError(err, "create connection error")
defer conn.Close()

// open a channel
ch, err := conn.Channel()
shared.FailOnError(err, "open a channel error")
defer ch.Close()

// 3、declare queue
queue, err := ch.QueueDeclare(
    "first_queue",
    false,
    false,
    false,
    false,
    nil,
)
shared.FailOnError(err, "declare queue error")
  1. 消费消息
Go
// 4、consume message
message, err := ch.Consume(
    queue.Name,
    "",    // consumer
    true,  // auto-ack
    false, // exclusive
    false, // no-local
    false, // no-wait
    nil,   // args
)
shared.FailOnError(err, "consume queue message error")
waitCh := make(chan struct{})
go func() {
    for msg := range message {
        log.Printf("Received a message: %s \n", msg.Body)
    }
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-waitCh

TIP

  • 为了能持续接受到消息,这里使用channel做锁

完整代码

TIP

  • 这个例子中ProducerConsumer通过命名队列来确定发送和接受的消息

send.go

Go
package main

import (
	"context"
	"github.com/rabbitmq/amqp091-go"
	"go-rabbitmq/shared"
	"log"
	"time"
)

func connectAndSendMessage() {
	// 1. connect to the RabbitMQ
	conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
	shared.FailOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 2. open a channel
	ch, err := conn.Channel()
	shared.FailOnError(err, "create channel error")
	defer ch.Close()

	// 3. declare a queue,it will only be created if it doesn't exist already
	queue, err := ch.QueueDeclare(
		"first_queue", // queue name
		false,         // durable
		false,         //delete when unused
		false,         //exclusive
		false,         // no-wait
		nil,           // arguments
	)
	shared.FailOnError(err, "declare queue error")

	withTimeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second)
	defer cancelFunc()
	// declare a message to be sent
	messageBody := "hello, I am the first message, you have received me"
	err = ch.PublishWithContext(
		withTimeoutCtx,
		"",         //exchange name
		queue.Name, // routing key
		false,      // mandatory
		false,      //immediate
		amqp091.Publishing{
			Body:        []byte(messageBody),
			ContentType: "text/plain",
		},
	)
	shared.FailOnError(err, "Publish error")
	log.Printf(" [x] Sent %s\n", messageBody)
}

func main() {
	connectAndSendMessage()
}

receive.go

Go
package main

import (
	"github.com/rabbitmq/amqp091-go"
	"go-rabbitmq/shared"
	"log"
)

func createReceiverAndReceiveMessage() {
	// 1、create connection
	conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
	shared.FailOnError(err, "create connection error")
	defer conn.Close()

	// open a channel
	ch, err := conn.Channel()
	shared.FailOnError(err, "open a channel error")
	defer ch.Close()

	// 3、declare queue
	queue, err := ch.QueueDeclare(
		"first_queue",
		false,
		false,
		false,
		false,
		nil,
	)
	shared.FailOnError(err, "declare queue error")
	// 4、consume message
	message, err := ch.Consume(
		queue.Name,
		"",    // consumer
		true,  // auto-ack
		false, // exclusive
		false, // no-local
		false, // no-wait
		nil,   // args
	)
	shared.FailOnError(err, "consume queue message error")
	waitCh := make(chan struct{})
	go func() {
		for msg := range message {
			log.Printf("Received a message: %s \n", msg.Body)
		}
	}()
	log.Printf("[*] Waiting for messages. To exit press CTRL+C")
	<-waitCh
}

func main() {
	createReceiverAndReceiveMessage()
}