5.6.3 RabbitMQ 使用与开发

5.6.3 RabbitMQ 使用与开发 #

RabbitMQ 是一个功能丰富的消息代理,基于 AMQP 协议实现,提供了灵活的消息路由、可靠的消息传递和丰富的管理功能。本节将深入介绍 RabbitMQ 的核心概念、消息路由机制,以及如何在 Go 语言中进行 RabbitMQ 的开发和应用。

RabbitMQ 核心概念 #

AMQP 模型组件 #

// RabbitMQ 连接配置
type RabbitMQConfig struct {
    Host     string `json:"host"`
    Port     int    `json:"port"`
    Username string `json:"username"`
    Password string `json:"password"`
    VHost    string `json:"vhost"`
    SSL      bool   `json:"ssl"`
    SSLConfig *SSLConfig `json:"ssl_config,omitempty"`
}

// 交换器配置
type ExchangeConfig struct {
    Name       string            `json:"name"`
    Type       string            `json:"type"`
    Durable    bool              `json:"durable"`
    AutoDelete bool              `json:"auto_delete"`
    Internal   bool              `json:"internal"`
    NoWait     bool              `json:"no_wait"`
    Arguments  map[string]interface{} `json:"arguments,omitempty"`
}

// 队列配置
type QueueConfig struct {
    Name       string            `json:"name"`
    Durable    bool              `json:"durable"`
    AutoDelete bool              `json:"auto_delete"`
    Exclusive  bool              `json:"exclusive"`
    NoWait     bool              `json:"no_wait"`
    Arguments  map[string]interface{} `json:"arguments,omitempty"`
}

// 绑定配置
type BindingConfig struct {
    Queue      string            `json:"queue"`
    Exchange   string            `json:"exchange"`
    RoutingKey string            `json:"routing_key"`
    NoWait     bool              `json:"no_wait"`
    Arguments  map[string]interface{} `json:"arguments,omitempty"`
}

// RabbitMQ 管理客户端 type RabbitMQManager struct { conn *amqp.Connection channel *amqp.Channel config *RabbitMQConfig logger *log.Logger }

func NewRabbitMQManager(config *RabbitMQConfig) (*RabbitMQManager, error) { // 构建连接 URL var url string if config.SSL { url = fmt.Sprintf(“amqps://%s:%s@%s:%d/%s”, config.Username, config.Password, config.Host, config.Port, config.VHost) } else { url = fmt.Sprintf(“amqp://%s:%s@%s:%d/%s”, config.Username, config.Password, config.Host, config.Port, config.VHost) }

conn, err := amqp.Dial(url)
if err != nil {
    return nil, fmt.Errorf("连接RabbitMQ失败: %v", err)
}

channel, err := conn.Channel()
if err != nil {
    conn.Close()
    return nil, fmt.Errorf("创建通道失败: %v", err)
}

return &RabbitMQManager{
    conn:    conn,
    channel: channel,
    config:  config,
    logger:  log.New(os.Stdout, "[RabbitMQManager] ", log.LstdFlags),
}, nil

}

func (rm *RabbitMQManager) DeclareExchange(config *ExchangeConfig) error { err := rm.channel.ExchangeDeclare( config.Name, config.Type, config.Durable, config.AutoDelete, config.Internal, config.NoWait, config.Arguments, )

if err != nil {
    return fmt.Errorf("声明交换器失败: %v", err)
}

rm.logger.Printf("交换器声明成功: %s", config.Name)
return nil

}

func (rm *RabbitMQManager) DeclareQueue(config *QueueConfig) (*amqp.Queue, error) { queue, err := rm.channel.QueueDeclare( config.Name, config.Durable, config.AutoDelete, config.Exclusive, config.NoWait, config.Arguments, )

if err != nil {
    return nil, fmt.Errorf("声明队列失败: %v", err)
}

rm.logger.Printf("队列声明成功: %s", queue.Name)
return &queue, nil

}

func (rm *RabbitMQManager) BindQueue(config *BindingConfig) error { err := rm.channel.QueueBind( config.Queue, config.RoutingKey, config.Exchange, config.NoWait, config.Arguments, )

if err != nil {
    return fmt.Errorf("绑定队列失败: %v", err)
}

rm.logger.Printf("队列绑定成功: queue=%s, exchange=%s, routing_key=%s",
    config.Queue, config.Exchange, config.RoutingKey)
return nil

}

func (rm *RabbitMQManager) Close() error { if rm.channel != nil { rm.channel.Close() } if rm.conn != nil { return rm.conn.Close() } return nil }

RabbitMQ 生产者 #

基础生产者实现 #

// RabbitMQ 生产者配置
type ProducerConfig struct {
    Exchange     string            `json:"exchange"`
    RoutingKey   string            `json:"routing_key"`
    Mandatory    bool              `json:"mandatory"`
    Immediate    bool              `json:"immediate"`
    ContentType  string            `json:"content_type"`
    DeliveryMode uint8             `json:"delivery_mode"` // 1=非持久化, 2=持久化
    Priority     uint8             `json:"priority"`
    CorrelationID string           `json:"correlation_id"`
    ReplyTo      string            `json:"reply_to"`
    Expiration   string            `json:"expiration"`
    MessageID    string            `json:"message_id"`
    Timestamp    time.Time         `json:"timestamp"`
    Type         string            `json:"type"`
    UserID       string            `json:"user_id"`
    AppID        string            `json:"app_id"`
    Headers      map[string]interface{} `json:"headers,omitempty"`
}

// RabbitMQ 生产者
type RabbitMQProducer struct {
    conn    *amqp.Connection
    channel *amqp.Channel
    config  *RabbitMQConfig
    logger  *log.Logger
    closed  bool
    mu      sync.RWMutex
}

func NewRabbitMQProducer(config *RabbitMQConfig) (*RabbitMQProducer, error) {
    var url string
    if config.SSL {
        url = fmt.Sprintf("amqps://%s:%s@%s:%d/%s",
            config.Username, config.Password, config.Host, config.Port, config.VHost)
    } else {
        url = fmt.Sprintf("amqp://%s:%s@%s:%d/%s",
            config.Username, config.Password, config.Host, config.Port, config.VHost)
    }

    conn, err := amqp.Dial(url)
    if err != nil {
        return nil, fmt.Errorf("连接RabbitMQ失败: %v", err)
    }

    channel, err := conn.Channel()
    if err != nil {
        conn.Close()
        return nil, fmt.Errorf("创建通道失败: %v", err)
    }

    producer := &RabbitMQProducer{
        conn:    conn,
        channel: channel,
        config:  config,
        logger:  log.New(os.Stdout, "[RabbitMQProducer] ", log.LstdFlags),
    }

    // 启动连接监控
    go producer.monitorConnection()

    return producer, nil
}

func (rp *RabbitMQProducer) PublishMessage(body []byte, config *ProducerConfig) error {
    rp.mu.RLock()
    defer rp.mu.RUnlock()

    if rp.closed {
        return fmt.Errorf("生产者已关闭")
    }

    publishing := amqp.Publishing{
        ContentType:   config.ContentType,
        DeliveryMode:  config.DeliveryMode,
        Priority:      config.Priority,
        CorrelationId: config.CorrelationID,
        ReplyTo:       config.ReplyTo,
        Expiration:    config.Expiration,
        MessageId:     config.MessageID,
        Timestamp:     config.Timestamp,
        Type:          config.Type,
        UserId:        config.UserID,
        AppId:         config.AppID,
        Headers:       config.Headers,
        Body:          body,
    }

    err := rp.channel.Publish(
        config.Exchange,
        config.RoutingKey,
        config.Mandatory,
        config.Immediate,
        publishing,
    )

    if err != nil {
        rp.logger.Printf("发布消息失败: %v", err)
        return err
    }

    rp.logger.Printf("消息发布成功: exchange=%s, routing_key=%s",
        config.Exchange, config.RoutingKey)
    return nil
}

func (rp *RabbitMQProducer) monitorConnection() {
    closeChan := make(chan *amqp.Error)
    rp.conn.NotifyClose(closeChan)

    for err := range closeChan {
        if err != nil {
            rp.logger.Printf("连接断开: %v", err)
            // 实现重连逻辑
            rp.reconnect()
        }
    }
}

func (rp *RabbitMQProducer) reconnect() {
    rp.mu.Lock()
    defer rp.mu.Unlock()

    if rp.closed {
        return
    }

    // 关闭旧连接
    if rp.channel != nil {
        rp.channel.Close()
    }
    if rp.conn != nil {
        rp.conn.Close()
    }

    // 重新连接
    for i := 0; i < 5; i++ {
        var url string
        if rp.config.SSL {
            url = fmt.Sprintf("amqps://%s:%s@%s:%d/%s",
                rp.config.Username, rp.config.Password,
                rp.config.Host, rp.config.Port, rp.config.VHost)
        } else {
            url = fmt.Sprintf("amqp://%s:%s@%s:%d/%s",
                rp.config.Username, rp.config.Password,
                rp.config.Host, rp.config.Port, rp.config.VHost)
        }

        conn, err := amqp.Dial(url)
        if err != nil {
            rp.logger.Printf("重连失败 (尝试 %d/5): %v", i+1, err)
            time.Sleep(time.Duration(i+1) * time.Second)
            continue
        }

        channel, err := conn.Channel()
        if err != nil {
            conn.Close()
            rp.logger.Printf("创建通道失败 (尝试 %d/5): %v", i+1, err)
            time.Sleep(time.Duration(i+1) * time.Second)
            continue
        }

        rp.conn = conn
        rp.channel = channel
        rp.logger.Printf("重连成功")

        // 重新启动连接监控
        go rp.monitorConnection()
        return
    }

    rp.logger.Printf("重连失败,生产者将关闭")
    rp.closed = true
}

func (rp *RabbitMQProducer) Close() error {
    rp.mu.Lock()
    defer rp.mu.Unlock()

    rp.closed = true

    if rp.channel != nil {
        rp.channel.Close()
    }
    if rp.conn != nil {
        return rp.conn.Close()
    }

    return nil
}

确认模式生产者 #

// 支持发布确认的生产者
type ConfirmProducer struct {
    *RabbitMQProducer
    confirmChan chan amqp.Confirmation
    deliveryTag uint64
    mu          sync.Mutex
}

func NewConfirmProducer(config *RabbitMQConfig) (*ConfirmProducer, error) {
    producer, err := NewRabbitMQProducer(config)
    if err != nil {
        return nil, err
    }

    // 启用发布确认模式
    if err := producer.channel.Confirm(false); err != nil {
        producer.Close()
        return nil, fmt.Errorf("启用确认模式失败: %v", err)
    }

    confirmChan := make(chan amqp.Confirmation, 1000)
    producer.channel.NotifyPublish(confirmChan)

    cp := &ConfirmProducer{
        RabbitMQProducer: producer,
        confirmChan:      confirmChan,
        deliveryTag:      0,
    }

    // 启动确认处理协程
    go cp.handleConfirmations()

    return cp, nil
}

func (cp *ConfirmProducer) PublishWithConfirm(body []byte, config *ProducerConfig, timeout time.Duration) error {
    cp.mu.Lock()
    cp.deliveryTag++
    currentTag := cp.deliveryTag
    cp.mu.Unlock()

    // 发布消息
    if err := cp.PublishMessage(body, config); err != nil {
        return err
    }

    // 等待确认
    timer := time.NewTimer(timeout)
    defer timer.Stop()

    for {
        select {
        case confirm := <-cp.confirmChan:
            if confirm.DeliveryTag == currentTag {
                if confirm.Ack {
                    cp.logger.Printf("消息确认成功: delivery_tag=%d", currentTag)
                    return nil
                } else {
                    return fmt.Errorf("消息被拒绝: delivery_tag=%d", currentTag)
                }
            }
        case <-timer.C:
            return fmt.Errorf("确认超时: delivery_tag=%d", currentTag)
        }
    }
}

func (cp *ConfirmProducer) handleConfirmations() {
    for confirm := range cp.confirmChan {
        if !confirm.Ack {
            cp.logger.Printf("消息发布失败: delivery_tag=%d", confirm.DeliveryTag)
        }
    }
}

RabbitMQ 消费者 #

基础消费者实现 #

// 消费者配置
type ConsumerConfig struct {
    Queue       string            `json:"queue"`
    Consumer    string            `json:"consumer"`
    AutoAck     bool              `json:"auto_ack"`
    Exclusive   bool              `json:"exclusive"`
    NoLocal     bool              `json:"no_local"`
    NoWait      bool              `json:"no_wait"`
    Arguments   map[string]interface{} `json:"arguments,omitempty"`
    PrefetchCount int             `json:"prefetch_count"`
    PrefetchSize  int             `json:"prefetch_size"`
    Global        bool            `json:"global"`
}

// 消息处理器接口
type MessageHandler interface {
    HandleMessage(ctx context.Context, delivery amqp.Delivery) error
}

type MessageHandlerFunc func(ctx context.Context, delivery amqp.Delivery) error

func (f MessageHandlerFunc) HandleMessage(ctx context.Context, delivery amqp.Delivery) error {
    return f(ctx, delivery)
}

// RabbitMQ 消费者
type RabbitMQConsumer struct {
    conn     *amqp.Connection
    channel  *amqp.Channel
    config   *RabbitMQConfig
    handler  MessageHandler
    logger   *log.Logger
    ctx      context.Context
    cancel   context.CancelFunc
    wg       sync.WaitGroup
    closed   bool
    mu       sync.RWMutex
}

func NewRabbitMQConsumer(config *RabbitMQConfig, handler MessageHandler) (*RabbitMQConsumer, error) {
    var url string
    if config.SSL {
        url = fmt.Sprintf("amqps://%s:%s@%s:%d/%s",
            config.Username, config.Password, config.Host, config.Port, config.VHost)
    } else {
        url = fmt.Sprintf("amqp://%s:%s@%s:%d/%s",
            config.Username, config.Password, config.Host, config.Port, config.VHost)
    }

    conn, err := amqp.Dial(url)
    if err != nil {
        return nil, fmt.Errorf("连接RabbitMQ失败: %v", err)
    }

    channel, err := conn.Channel()
    if err != nil {
        conn.Close()
        return nil, fmt.Errorf("创建通道失败: %v", err)
    }

    ctx, cancel := context.WithCancel(context.Background())

    consumer := &RabbitMQConsumer{
        conn:    conn,
        channel: channel,
        config:  config,
        handler: handler,
        logger:  log.New(os.Stdout, "[RabbitMQConsumer] ", log.LstdFlags),
        ctx:     ctx,
        cancel:  cancel,
    }

    return consumer, nil
}

func (rc *RabbitMQConsumer) StartConsuming(config *ConsumerConfig) error {
    rc.mu.Lock()
    defer rc.mu.Unlock()

    if rc.closed {
        return fmt.Errorf("消费者已关闭")
    }

    // 设置QoS
    if config.PrefetchCount > 0 || config.PrefetchSize > 0 {
        err := rc.channel.Qos(config.PrefetchCount, config.PrefetchSize, config.Global)
        if err != nil {
            return fmt.Errorf("设置QoS失败: %v", err)
        }
    }

    // 开始消费
    deliveries, err := rc.channel.Consume(
        config.Queue,
        config.Consumer,
        config.AutoAck,
        config.Exclusive,
        config.NoLocal,
        config.NoWait,
        config.Arguments,
    )

    if err != nil {
        return fmt.Errorf("开始消费失败: %v", err)
    }

    rc.wg.Add(1)
    go rc.consumeMessages(deliveries, config.AutoAck)

    rc.logger.Printf("开始消费队列: %s", config.Queue)
    return nil
}

func (rc *RabbitMQConsumer) consumeMessages(deliveries <-chan amqp.Delivery, autoAck bool) {
    defer rc.wg.Done()

    for {
        select {
        case delivery, ok := <-deliveries:
            if !ok {
                rc.logger.Printf("消费通道已关闭")
                return
            }

            // 处理消息
            if err := rc.handler.HandleMessage(rc.ctx, delivery); err != nil {
                rc.logger.Printf("消息处理失败: %v", err)

                if !autoAck {
                    // 拒绝消息并重新入队
                    delivery.Nack(false, true)
                }
            } else {
                if !autoAck {
                    // 确认消息
                    delivery.Ack(false)
                }
            }

        case <-rc.ctx.Done():
            return
        }
    }
}

func (rc *RabbitMQConsumer) Stop() error {
    rc.mu.Lock()
    defer rc.mu.Unlock()

    rc.closed = true
    rc.cancel()
    rc.wg.Wait()

    if rc.channel != nil {
        rc.channel.Close()
    }
    if rc.conn != nil {
        return rc.conn.Close()
    }

    return nil
}

消息路由模式 #

直接路由 (Direct Exchange) #

// 直接路由示例
type DirectRoutingExample struct {
    manager *RabbitMQManager
}

func NewDirectRoutingExample(config *RabbitMQConfig) (*DirectRoutingExample, error) {
    manager, err := NewRabbitMQManager(config)
    if err != nil {
        return nil, err
    }

    return &DirectRoutingExample{
        manager: manager,
    }, nil
}

func (dre *DirectRoutingExample) Setup() error {
    // 声明直接交换器
    exchangeConfig := &ExchangeConfig{
        Name:    "direct_logs",
        Type:    "direct",
        Durable: true,
    }

    if err := dre.manager.DeclareExchange(exchangeConfig); err != nil {
        return err
    }

    // 声明队列
    severities := []string{"info", "warning", "error"}
    for _, severity := range severities {
        queueConfig := &QueueConfig{
            Name:    fmt.Sprintf("logs_%s", severity),
            Durable: true,
        }

        if _, err := dre.manager.DeclareQueue(queueConfig); err != nil {
            return err
        }

        // 绑定队列到交换器
        bindingConfig := &BindingConfig{
            Queue:      queueConfig.Name,
            Exchange:   exchangeConfig.Name,
            RoutingKey: severity,
        }

        if err := dre.manager.BindQueue(bindingConfig); err != nil {
            return err
        }
    }

    return nil
}

主题路由 (Topic Exchange) #

// 主题路由示例
type TopicRoutingExample struct {
    manager *RabbitMQManager
}

func NewTopicRoutingExample(config *RabbitMQConfig) (*TopicRoutingExample, error) {
    manager, err := NewRabbitMQManager(config)
    if err != nil {
        return nil, err
    }

    return &TopicRoutingExample{
        manager: manager,
    }, nil
}

func (tre *TopicRoutingExample) Setup() error {
    // 声明主题交换器
    exchangeConfig := &ExchangeConfig{
        Name:    "topic_logs",
        Type:    "topic",
        Durable: true,
    }

    if err := tre.manager.DeclareExchange(exchangeConfig); err != nil {
        return err
    }

    // 定义路由模式和对应的队列
    routingPatterns := map[string]string{
        "all_logs":     "#",                    // 接收所有消息
        "kern_logs":    "kern.*",               // 接收所有kern开头的消息
        "critical":     "*.critical",           // 接收所有critical级别的消息
        "kern_critical": "kern.critical",       // 接收kern.critical消息
    }

    for queueName, pattern := range routingPatterns {
        queueConfig := &QueueConfig{
            Name:    queueName,
            Durable: true,
        }

        if _, err := tre.manager.DeclareQueue(queueConfig); err != nil {
            return err
        }

        bindingConfig := &BindingConfig{
            Queue:      queueName,
            Exchange:   exchangeConfig.Name,
            RoutingKey: pattern,
        }

        if err := tre.manager.BindQueue(bindingConfig); err != nil {
            return err
        }
    }

    return nil
}

头部路由 (Headers Exchange) #

// 头部路由示例
type HeadersRoutingExample struct {
    manager *RabbitMQManager
}

func NewHeadersRoutingExample(config *RabbitMQConfig) (*HeadersRoutingExample, error) {
    manager, err := NewRabbitMQManager(config)
    if err != nil {
        return nil, err
    }

    return &HeadersRoutingExample{
        manager: manager,
    }, nil
}

func (hre *HeadersRoutingExample) Setup() error {
    // 声明头部交换器
    exchangeConfig := &ExchangeConfig{
        Name:    "headers_exchange",
        Type:    "headers",
        Durable: true,
    }

    if err := hre.manager.DeclareExchange(exchangeConfig); err != nil {
        return err
    }

    // 创建基于头部匹配的队列
    queueConfigs := []struct {
        name      string
        arguments map[string]interface{}
    }{
        {
            name: "pdf_queue",
            arguments: map[string]interface{}{
                "x-match": "all",
                "format":  "pdf",
                "type":    "report",
            },
        },
        {
            name: "image_queue",
            arguments: map[string]interface{}{
                "x-match": "any",
                "format":  "jpg",
                "format2": "png",
            },
        },
    }

    for _, config := range queueConfigs {
        queueConfig := &QueueConfig{
            Name:    config.name,
            Durable: true,
        }

        if _, err := hre.manager.DeclareQueue(queueConfig); err != nil {
            return err
        }

        bindingConfig := &BindingConfig{
            Queue:     config.name,
            Exchange:  exchangeConfig.Name,
            Arguments: config.arguments,
        }

        if err := hre.manager.BindQueue(bindingConfig); err != nil {
            return err
        }
    }

    return nil
}

RabbitMQ 提供了丰富的消息路由功能和可靠的消息传递机制,通过不同类型的交换器可以实现灵活的消息分发策略。掌握 RabbitMQ 的开发技巧将帮助我们构建健壮的消息驱动应用。在下一节中,我们将学习事件驱动架构的设计和实现。