5.6.3 RabbitMQ 使用与开发 #
RabbitMQ 是一个功能丰富的消息代理,基于 AMQP 协议实现,提供了灵活的消息路由、可靠的消息传递和丰富的管理功能。本节将深入介绍 RabbitMQ 的核心概念、消息路由机制,以及如何在 Go 语言中进行 RabbitMQ 的开发和应用。
RabbitMQ 核心概念 #
AMQP 模型组件 #
// RabbitMQ 连接配置
type RabbitMQConfig struct {
Host string `json:"host"`
Port int `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
VHost string `json:"vhost"`
SSL bool `json:"ssl"`
SSLConfig *SSLConfig `json:"ssl_config,omitempty"`
}
// 交换器配置
type ExchangeConfig struct {
Name string `json:"name"`
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Internal bool `json:"internal"`
NoWait bool `json:"no_wait"`
Arguments map[string]interface{} `json:"arguments,omitempty"`
}
// 队列配置
type QueueConfig struct {
Name string `json:"name"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Exclusive bool `json:"exclusive"`
NoWait bool `json:"no_wait"`
Arguments map[string]interface{} `json:"arguments,omitempty"`
}
// 绑定配置
type BindingConfig struct {
Queue string `json:"queue"`
Exchange string `json:"exchange"`
RoutingKey string `json:"routing_key"`
NoWait bool `json:"no_wait"`
Arguments map[string]interface{} `json:"arguments,omitempty"`
}
// RabbitMQ 管理客户端 type RabbitMQManager struct { conn *amqp.Connection channel *amqp.Channel config *RabbitMQConfig logger *log.Logger }
func NewRabbitMQManager(config *RabbitMQConfig) (*RabbitMQManager, error) { // 构建连接 URL var url string if config.SSL { url = fmt.Sprintf(“amqps://%s:%s@%s:%d/%s”, config.Username, config.Password, config.Host, config.Port, config.VHost) } else { url = fmt.Sprintf(“amqp://%s:%s@%s:%d/%s”, config.Username, config.Password, config.Host, config.Port, config.VHost) }
conn, err := amqp.Dial(url)
if err != nil {
return nil, fmt.Errorf("连接RabbitMQ失败: %v", err)
}
channel, err := conn.Channel()
if err != nil {
conn.Close()
return nil, fmt.Errorf("创建通道失败: %v", err)
}
return &RabbitMQManager{
conn: conn,
channel: channel,
config: config,
logger: log.New(os.Stdout, "[RabbitMQManager] ", log.LstdFlags),
}, nil
}
func (rm *RabbitMQManager) DeclareExchange(config *ExchangeConfig) error { err := rm.channel.ExchangeDeclare( config.Name, config.Type, config.Durable, config.AutoDelete, config.Internal, config.NoWait, config.Arguments, )
if err != nil {
return fmt.Errorf("声明交换器失败: %v", err)
}
rm.logger.Printf("交换器声明成功: %s", config.Name)
return nil
}
func (rm *RabbitMQManager) DeclareQueue(config *QueueConfig) (*amqp.Queue, error) { queue, err := rm.channel.QueueDeclare( config.Name, config.Durable, config.AutoDelete, config.Exclusive, config.NoWait, config.Arguments, )
if err != nil {
return nil, fmt.Errorf("声明队列失败: %v", err)
}
rm.logger.Printf("队列声明成功: %s", queue.Name)
return &queue, nil
}
func (rm *RabbitMQManager) BindQueue(config *BindingConfig) error { err := rm.channel.QueueBind( config.Queue, config.RoutingKey, config.Exchange, config.NoWait, config.Arguments, )
if err != nil {
return fmt.Errorf("绑定队列失败: %v", err)
}
rm.logger.Printf("队列绑定成功: queue=%s, exchange=%s, routing_key=%s",
config.Queue, config.Exchange, config.RoutingKey)
return nil
}
func (rm *RabbitMQManager) Close() error { if rm.channel != nil { rm.channel.Close() } if rm.conn != nil { return rm.conn.Close() } return nil }
RabbitMQ 生产者 #
基础生产者实现 #
// RabbitMQ 生产者配置
type ProducerConfig struct {
Exchange string `json:"exchange"`
RoutingKey string `json:"routing_key"`
Mandatory bool `json:"mandatory"`
Immediate bool `json:"immediate"`
ContentType string `json:"content_type"`
DeliveryMode uint8 `json:"delivery_mode"` // 1=非持久化, 2=持久化
Priority uint8 `json:"priority"`
CorrelationID string `json:"correlation_id"`
ReplyTo string `json:"reply_to"`
Expiration string `json:"expiration"`
MessageID string `json:"message_id"`
Timestamp time.Time `json:"timestamp"`
Type string `json:"type"`
UserID string `json:"user_id"`
AppID string `json:"app_id"`
Headers map[string]interface{} `json:"headers,omitempty"`
}
// RabbitMQ 生产者
type RabbitMQProducer struct {
conn *amqp.Connection
channel *amqp.Channel
config *RabbitMQConfig
logger *log.Logger
closed bool
mu sync.RWMutex
}
func NewRabbitMQProducer(config *RabbitMQConfig) (*RabbitMQProducer, error) {
var url string
if config.SSL {
url = fmt.Sprintf("amqps://%s:%s@%s:%d/%s",
config.Username, config.Password, config.Host, config.Port, config.VHost)
} else {
url = fmt.Sprintf("amqp://%s:%s@%s:%d/%s",
config.Username, config.Password, config.Host, config.Port, config.VHost)
}
conn, err := amqp.Dial(url)
if err != nil {
return nil, fmt.Errorf("连接RabbitMQ失败: %v", err)
}
channel, err := conn.Channel()
if err != nil {
conn.Close()
return nil, fmt.Errorf("创建通道失败: %v", err)
}
producer := &RabbitMQProducer{
conn: conn,
channel: channel,
config: config,
logger: log.New(os.Stdout, "[RabbitMQProducer] ", log.LstdFlags),
}
// 启动连接监控
go producer.monitorConnection()
return producer, nil
}
func (rp *RabbitMQProducer) PublishMessage(body []byte, config *ProducerConfig) error {
rp.mu.RLock()
defer rp.mu.RUnlock()
if rp.closed {
return fmt.Errorf("生产者已关闭")
}
publishing := amqp.Publishing{
ContentType: config.ContentType,
DeliveryMode: config.DeliveryMode,
Priority: config.Priority,
CorrelationId: config.CorrelationID,
ReplyTo: config.ReplyTo,
Expiration: config.Expiration,
MessageId: config.MessageID,
Timestamp: config.Timestamp,
Type: config.Type,
UserId: config.UserID,
AppId: config.AppID,
Headers: config.Headers,
Body: body,
}
err := rp.channel.Publish(
config.Exchange,
config.RoutingKey,
config.Mandatory,
config.Immediate,
publishing,
)
if err != nil {
rp.logger.Printf("发布消息失败: %v", err)
return err
}
rp.logger.Printf("消息发布成功: exchange=%s, routing_key=%s",
config.Exchange, config.RoutingKey)
return nil
}
func (rp *RabbitMQProducer) monitorConnection() {
closeChan := make(chan *amqp.Error)
rp.conn.NotifyClose(closeChan)
for err := range closeChan {
if err != nil {
rp.logger.Printf("连接断开: %v", err)
// 实现重连逻辑
rp.reconnect()
}
}
}
func (rp *RabbitMQProducer) reconnect() {
rp.mu.Lock()
defer rp.mu.Unlock()
if rp.closed {
return
}
// 关闭旧连接
if rp.channel != nil {
rp.channel.Close()
}
if rp.conn != nil {
rp.conn.Close()
}
// 重新连接
for i := 0; i < 5; i++ {
var url string
if rp.config.SSL {
url = fmt.Sprintf("amqps://%s:%s@%s:%d/%s",
rp.config.Username, rp.config.Password,
rp.config.Host, rp.config.Port, rp.config.VHost)
} else {
url = fmt.Sprintf("amqp://%s:%s@%s:%d/%s",
rp.config.Username, rp.config.Password,
rp.config.Host, rp.config.Port, rp.config.VHost)
}
conn, err := amqp.Dial(url)
if err != nil {
rp.logger.Printf("重连失败 (尝试 %d/5): %v", i+1, err)
time.Sleep(time.Duration(i+1) * time.Second)
continue
}
channel, err := conn.Channel()
if err != nil {
conn.Close()
rp.logger.Printf("创建通道失败 (尝试 %d/5): %v", i+1, err)
time.Sleep(time.Duration(i+1) * time.Second)
continue
}
rp.conn = conn
rp.channel = channel
rp.logger.Printf("重连成功")
// 重新启动连接监控
go rp.monitorConnection()
return
}
rp.logger.Printf("重连失败,生产者将关闭")
rp.closed = true
}
func (rp *RabbitMQProducer) Close() error {
rp.mu.Lock()
defer rp.mu.Unlock()
rp.closed = true
if rp.channel != nil {
rp.channel.Close()
}
if rp.conn != nil {
return rp.conn.Close()
}
return nil
}
确认模式生产者 #
// 支持发布确认的生产者
type ConfirmProducer struct {
*RabbitMQProducer
confirmChan chan amqp.Confirmation
deliveryTag uint64
mu sync.Mutex
}
func NewConfirmProducer(config *RabbitMQConfig) (*ConfirmProducer, error) {
producer, err := NewRabbitMQProducer(config)
if err != nil {
return nil, err
}
// 启用发布确认模式
if err := producer.channel.Confirm(false); err != nil {
producer.Close()
return nil, fmt.Errorf("启用确认模式失败: %v", err)
}
confirmChan := make(chan amqp.Confirmation, 1000)
producer.channel.NotifyPublish(confirmChan)
cp := &ConfirmProducer{
RabbitMQProducer: producer,
confirmChan: confirmChan,
deliveryTag: 0,
}
// 启动确认处理协程
go cp.handleConfirmations()
return cp, nil
}
func (cp *ConfirmProducer) PublishWithConfirm(body []byte, config *ProducerConfig, timeout time.Duration) error {
cp.mu.Lock()
cp.deliveryTag++
currentTag := cp.deliveryTag
cp.mu.Unlock()
// 发布消息
if err := cp.PublishMessage(body, config); err != nil {
return err
}
// 等待确认
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case confirm := <-cp.confirmChan:
if confirm.DeliveryTag == currentTag {
if confirm.Ack {
cp.logger.Printf("消息确认成功: delivery_tag=%d", currentTag)
return nil
} else {
return fmt.Errorf("消息被拒绝: delivery_tag=%d", currentTag)
}
}
case <-timer.C:
return fmt.Errorf("确认超时: delivery_tag=%d", currentTag)
}
}
}
func (cp *ConfirmProducer) handleConfirmations() {
for confirm := range cp.confirmChan {
if !confirm.Ack {
cp.logger.Printf("消息发布失败: delivery_tag=%d", confirm.DeliveryTag)
}
}
}
RabbitMQ 消费者 #
基础消费者实现 #
// 消费者配置
type ConsumerConfig struct {
Queue string `json:"queue"`
Consumer string `json:"consumer"`
AutoAck bool `json:"auto_ack"`
Exclusive bool `json:"exclusive"`
NoLocal bool `json:"no_local"`
NoWait bool `json:"no_wait"`
Arguments map[string]interface{} `json:"arguments,omitempty"`
PrefetchCount int `json:"prefetch_count"`
PrefetchSize int `json:"prefetch_size"`
Global bool `json:"global"`
}
// 消息处理器接口
type MessageHandler interface {
HandleMessage(ctx context.Context, delivery amqp.Delivery) error
}
type MessageHandlerFunc func(ctx context.Context, delivery amqp.Delivery) error
func (f MessageHandlerFunc) HandleMessage(ctx context.Context, delivery amqp.Delivery) error {
return f(ctx, delivery)
}
// RabbitMQ 消费者
type RabbitMQConsumer struct {
conn *amqp.Connection
channel *amqp.Channel
config *RabbitMQConfig
handler MessageHandler
logger *log.Logger
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
closed bool
mu sync.RWMutex
}
func NewRabbitMQConsumer(config *RabbitMQConfig, handler MessageHandler) (*RabbitMQConsumer, error) {
var url string
if config.SSL {
url = fmt.Sprintf("amqps://%s:%s@%s:%d/%s",
config.Username, config.Password, config.Host, config.Port, config.VHost)
} else {
url = fmt.Sprintf("amqp://%s:%s@%s:%d/%s",
config.Username, config.Password, config.Host, config.Port, config.VHost)
}
conn, err := amqp.Dial(url)
if err != nil {
return nil, fmt.Errorf("连接RabbitMQ失败: %v", err)
}
channel, err := conn.Channel()
if err != nil {
conn.Close()
return nil, fmt.Errorf("创建通道失败: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
consumer := &RabbitMQConsumer{
conn: conn,
channel: channel,
config: config,
handler: handler,
logger: log.New(os.Stdout, "[RabbitMQConsumer] ", log.LstdFlags),
ctx: ctx,
cancel: cancel,
}
return consumer, nil
}
func (rc *RabbitMQConsumer) StartConsuming(config *ConsumerConfig) error {
rc.mu.Lock()
defer rc.mu.Unlock()
if rc.closed {
return fmt.Errorf("消费者已关闭")
}
// 设置QoS
if config.PrefetchCount > 0 || config.PrefetchSize > 0 {
err := rc.channel.Qos(config.PrefetchCount, config.PrefetchSize, config.Global)
if err != nil {
return fmt.Errorf("设置QoS失败: %v", err)
}
}
// 开始消费
deliveries, err := rc.channel.Consume(
config.Queue,
config.Consumer,
config.AutoAck,
config.Exclusive,
config.NoLocal,
config.NoWait,
config.Arguments,
)
if err != nil {
return fmt.Errorf("开始消费失败: %v", err)
}
rc.wg.Add(1)
go rc.consumeMessages(deliveries, config.AutoAck)
rc.logger.Printf("开始消费队列: %s", config.Queue)
return nil
}
func (rc *RabbitMQConsumer) consumeMessages(deliveries <-chan amqp.Delivery, autoAck bool) {
defer rc.wg.Done()
for {
select {
case delivery, ok := <-deliveries:
if !ok {
rc.logger.Printf("消费通道已关闭")
return
}
// 处理消息
if err := rc.handler.HandleMessage(rc.ctx, delivery); err != nil {
rc.logger.Printf("消息处理失败: %v", err)
if !autoAck {
// 拒绝消息并重新入队
delivery.Nack(false, true)
}
} else {
if !autoAck {
// 确认消息
delivery.Ack(false)
}
}
case <-rc.ctx.Done():
return
}
}
}
func (rc *RabbitMQConsumer) Stop() error {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.closed = true
rc.cancel()
rc.wg.Wait()
if rc.channel != nil {
rc.channel.Close()
}
if rc.conn != nil {
return rc.conn.Close()
}
return nil
}
消息路由模式 #
直接路由 (Direct Exchange) #
// 直接路由示例
type DirectRoutingExample struct {
manager *RabbitMQManager
}
func NewDirectRoutingExample(config *RabbitMQConfig) (*DirectRoutingExample, error) {
manager, err := NewRabbitMQManager(config)
if err != nil {
return nil, err
}
return &DirectRoutingExample{
manager: manager,
}, nil
}
func (dre *DirectRoutingExample) Setup() error {
// 声明直接交换器
exchangeConfig := &ExchangeConfig{
Name: "direct_logs",
Type: "direct",
Durable: true,
}
if err := dre.manager.DeclareExchange(exchangeConfig); err != nil {
return err
}
// 声明队列
severities := []string{"info", "warning", "error"}
for _, severity := range severities {
queueConfig := &QueueConfig{
Name: fmt.Sprintf("logs_%s", severity),
Durable: true,
}
if _, err := dre.manager.DeclareQueue(queueConfig); err != nil {
return err
}
// 绑定队列到交换器
bindingConfig := &BindingConfig{
Queue: queueConfig.Name,
Exchange: exchangeConfig.Name,
RoutingKey: severity,
}
if err := dre.manager.BindQueue(bindingConfig); err != nil {
return err
}
}
return nil
}
主题路由 (Topic Exchange) #
// 主题路由示例
type TopicRoutingExample struct {
manager *RabbitMQManager
}
func NewTopicRoutingExample(config *RabbitMQConfig) (*TopicRoutingExample, error) {
manager, err := NewRabbitMQManager(config)
if err != nil {
return nil, err
}
return &TopicRoutingExample{
manager: manager,
}, nil
}
func (tre *TopicRoutingExample) Setup() error {
// 声明主题交换器
exchangeConfig := &ExchangeConfig{
Name: "topic_logs",
Type: "topic",
Durable: true,
}
if err := tre.manager.DeclareExchange(exchangeConfig); err != nil {
return err
}
// 定义路由模式和对应的队列
routingPatterns := map[string]string{
"all_logs": "#", // 接收所有消息
"kern_logs": "kern.*", // 接收所有kern开头的消息
"critical": "*.critical", // 接收所有critical级别的消息
"kern_critical": "kern.critical", // 接收kern.critical消息
}
for queueName, pattern := range routingPatterns {
queueConfig := &QueueConfig{
Name: queueName,
Durable: true,
}
if _, err := tre.manager.DeclareQueue(queueConfig); err != nil {
return err
}
bindingConfig := &BindingConfig{
Queue: queueName,
Exchange: exchangeConfig.Name,
RoutingKey: pattern,
}
if err := tre.manager.BindQueue(bindingConfig); err != nil {
return err
}
}
return nil
}
头部路由 (Headers Exchange) #
// 头部路由示例
type HeadersRoutingExample struct {
manager *RabbitMQManager
}
func NewHeadersRoutingExample(config *RabbitMQConfig) (*HeadersRoutingExample, error) {
manager, err := NewRabbitMQManager(config)
if err != nil {
return nil, err
}
return &HeadersRoutingExample{
manager: manager,
}, nil
}
func (hre *HeadersRoutingExample) Setup() error {
// 声明头部交换器
exchangeConfig := &ExchangeConfig{
Name: "headers_exchange",
Type: "headers",
Durable: true,
}
if err := hre.manager.DeclareExchange(exchangeConfig); err != nil {
return err
}
// 创建基于头部匹配的队列
queueConfigs := []struct {
name string
arguments map[string]interface{}
}{
{
name: "pdf_queue",
arguments: map[string]interface{}{
"x-match": "all",
"format": "pdf",
"type": "report",
},
},
{
name: "image_queue",
arguments: map[string]interface{}{
"x-match": "any",
"format": "jpg",
"format2": "png",
},
},
}
for _, config := range queueConfigs {
queueConfig := &QueueConfig{
Name: config.name,
Durable: true,
}
if _, err := hre.manager.DeclareQueue(queueConfig); err != nil {
return err
}
bindingConfig := &BindingConfig{
Queue: config.name,
Exchange: exchangeConfig.Name,
Arguments: config.arguments,
}
if err := hre.manager.BindQueue(bindingConfig); err != nil {
return err
}
}
return nil
}
RabbitMQ 提供了丰富的消息路由功能和可靠的消息传递机制,通过不同类型的交换器可以实现灵活的消息分发策略。掌握 RabbitMQ 的开发技巧将帮助我们构建健壮的消息驱动应用。在下一节中,我们将学习事件驱动架构的设计和实现。