5.6.1 消息队列基础

5.6.1 消息队列基础 #

消息队列是分布式系统中实现异步通信的重要中间件,它通过消息传递的方式实现系统组件之间的解耦。理解消息队列的基本概念和工作原理是构建可靠分布式系统的基础。

消息队列核心概念 #

基本组件 #

消息队列系统通常包含以下核心组件:

// 消息结构定义
type Message struct {
    ID          string            `json:"id"`
    Topic       string            `json:"topic"`
    Key         string            `json:"key,omitempty"`
    Value       []byte            `json:"value"`
    Headers     map[string]string `json:"headers,omitempty"`
    Timestamp   time.Time         `json:"timestamp"`
    Partition   int32             `json:"partition,omitempty"`
    Offset      int64             `json:"offset,omitempty"`
    Retry       int               `json:"retry"`
    MaxRetries  int               `json:"max_retries"`
    DeadLetter  bool              `json:"dead_letter"`
}

// 生产者接口
type Producer interface {
    Send(ctx context.Context, msg *Message) error
    SendAsync(ctx context.Context, msg *Message, callback func(error)) error
    SendBatch(ctx context.Context, messages []*Message) error
    Close() error
}

// 消费者接口
type Consumer interface {
    Subscribe(topics []string) error
    Consume(ctx context.Context) (<-chan *Message, error)
    Commit(ctx context.Context, msg *Message) error
    Close() error
}

// 消息队列管理器
type MessageQueue interface {
    CreateProducer(config *ProducerConfig) (Producer, error)
    CreateConsumer(config *ConsumerConfig) (Consumer, error)
    CreateTopic(topic string, config *TopicConfig) error
    DeleteTopic(topic string) error
    GetTopicInfo(topic string) (*TopicInfo, error)
}

消息传递模式 #

消息队列支持多种消息传递模式:

// 点对点模式 (Point-to-Point)
type P2PQueue struct {
    name     string
    messages chan *Message
    consumers []Consumer
    mu       sync.RWMutex
}

func NewP2PQueue(name string, capacity int) *P2PQueue {
    return &P2PQueue{
        name:     name,
        messages: make(chan *Message, capacity),
        consumers: make([]Consumer, 0),
    }
}

func (q *P2PQueue) Send(msg *Message) error {
    select {
    case q.messages <- msg:
        return nil
    default:
        return fmt.Errorf("队列已满")
    }
}

func (q *P2PQueue) Receive() (*Message, error) {
    select {
    case msg := <-q.messages:
        return msg, nil
    case <-time.After(5 * time.Second):
        return nil, fmt.Errorf("接收超时")
    }
}

// 发布订阅模式 (Publish-Subscribe)
type PubSubTopic struct {
    name        string
    subscribers map[string]chan *Message
    mu          sync.RWMutex
}

func NewPubSubTopic(name string) *PubSubTopic {
    return &PubSubTopic{
        name:        name,
        subscribers: make(map[string]chan *Message),
    }
}

func (t *PubSubTopic) Subscribe(subscriberID string, bufferSize int) <-chan *Message {
    t.mu.Lock()
    defer t.mu.Unlock()

    ch := make(chan *Message, bufferSize)
    t.subscribers[subscriberID] = ch
    return ch
}

func (t *PubSubTopic) Unsubscribe(subscriberID string) {
    t.mu.Lock()
    defer t.mu.Unlock()

    if ch, exists := t.subscribers[subscriberID]; exists {
        close(ch)
        delete(t.subscribers, subscriberID)
    }
}

func (t *PubSubTopic) Publish(msg *Message) error {
    t.mu.RLock()
    defer t.mu.RUnlock()

    for subscriberID, ch := range t.subscribers {
        select {
        case ch <- msg:
            // 消息发送成功
        default:
            // 订阅者缓冲区已满,记录日志
            log.Printf("订阅者 %s 缓冲区已满,丢弃消息", subscriberID)
        }
    }

    return nil
}

消息可靠性保证 #

消息持久化 #

// 消息存储接口
type MessageStore interface {
    Store(msg *Message) error
    Retrieve(id string) (*Message, error)
    Delete(id string) error
    List(topic string, offset, limit int) ([]*Message, error)
}

// 基于文件的消息存储实现
type FileMessageStore struct {
    dataDir    string
    indexFile  *os.File
    dataFile   *os.File
    mu         sync.RWMutex
    index      map[string]int64 // messageID -> file offset
}

func NewFileMessageStore(dataDir string) (*FileMessageStore, error) {
    if err := os.MkdirAll(dataDir, 0755); err != nil {
        return nil, err
    }

    indexPath := filepath.Join(dataDir, "index.dat")
    dataPath := filepath.Join(dataDir, "messages.dat")

    indexFile, err := os.OpenFile(indexPath, os.O_CREATE|os.O_RDWR, 0644)
    if err != nil {
        return nil, err
    }

    dataFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
    if err != nil {
        indexFile.Close()
        return nil, err
    }

    store := &FileMessageStore{
        dataDir:   dataDir,
        indexFile: indexFile,
        dataFile:  dataFile,
        index:     make(map[string]int64),
    }

    // 加载索引
    if err := store.loadIndex(); err != nil {
        return nil, err
    }

    return store, nil
}

func (s *FileMessageStore) Store(msg *Message) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    // 序列化消息
    data, err := json.Marshal(msg)
    if err != nil {
        return err
    }

    // 获取当前文件位置
    offset, err := s.dataFile.Seek(0, io.SeekEnd)
    if err != nil {
        return err
    }

    // 写入消息长度和数据
    length := uint32(len(data))
    if err := binary.Write(s.dataFile, binary.LittleEndian, length); err != nil {
        return err
    }

    if _, err := s.dataFile.Write(data); err != nil {
        return err
    }

    // 强制刷新到磁盘
    if err := s.dataFile.Sync(); err != nil {
        return err
    }

    // 更新索引
    s.index[msg.ID] = offset

    // 持久化索引
    return s.saveIndex()
}

func (s *FileMessageStore) Retrieve(id string) (*Message, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    offset, exists := s.index[id]
    if !exists {
        return nil, fmt.Errorf("消息不存在: %s", id)
    }

    // 定位到消息位置
    if _, err := s.dataFile.Seek(offset, io.SeekStart); err != nil {
        return nil, err
    }

    // 读取消息长度
    var length uint32
    if err := binary.Read(s.dataFile, binary.LittleEndian, &length); err != nil {
        return nil, err
    }

    // 读取消息数据
    data := make([]byte, length)
    if _, err := io.ReadFull(s.dataFile, data); err != nil {
        return nil, err
    }

    // 反序列化消息
    var msg Message
    if err := json.Unmarshal(data, &msg); err != nil {
        return nil, err
    }

    return &msg, nil
}

消息确认机制 #

// 消息确认管理器
type AckManager struct {
    pendingAcks map[string]*PendingAck
    mu          sync.RWMutex
    timeout     time.Duration
}

type PendingAck struct {
    Message   *Message
    Timestamp time.Time
    Retries   int
    Consumer  string
}

func NewAckManager(timeout time.Duration) *AckManager {
    manager := &AckManager{
        pendingAcks: make(map[string]*PendingAck),
        timeout:     timeout,
    }

    // 启动超时检查
    go manager.timeoutChecker()

    return manager
}

func (am *AckManager) AddPendingAck(msg *Message, consumer string) {
    am.mu.Lock()
    defer am.mu.Unlock()

    am.pendingAcks[msg.ID] = &PendingAck{
        Message:   msg,
        Timestamp: time.Now(),
        Retries:   0,
        Consumer:  consumer,
    }
}

func (am *AckManager) Acknowledge(messageID string) error {
    am.mu.Lock()
    defer am.mu.Unlock()

    if _, exists := am.pendingAcks[messageID]; exists {
        delete(am.pendingAcks, messageID)
        return nil
    }

    return fmt.Errorf("消息不存在或已确认: %s", messageID)
}

func (am *AckManager) timeoutChecker() {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for range ticker.C {
        am.checkTimeouts()
    }
}

func (am *AckManager) checkTimeouts() {
    am.mu.Lock()
    defer am.mu.Unlock()

    now := time.Now()
    for messageID, pending := range am.pendingAcks {
        if now.Sub(pending.Timestamp) > am.timeout {
            // 消息确认超时,需要重新投递
            pending.Retries++
            pending.Timestamp = now

            if pending.Retries > pending.Message.MaxRetries {
                // 超过最大重试次数,发送到死信队列
                pending.Message.DeadLetter = true
                log.Printf("消息 %s 超过最大重试次数,发送到死信队列", messageID)
                delete(am.pendingAcks, messageID)
            } else {
                // 重新投递消息
                log.Printf("消息 %s 确认超时,重新投递 (重试次数: %d)", messageID, pending.Retries)
            }
        }
    }
}

消息路由和分区 #

消息路由器 #

// 消息路由器
type MessageRouter struct {
    routes map[string][]RouteRule
    mu     sync.RWMutex
}

type RouteRule struct {
    Condition RouteCondition
    Target    string
    Priority  int
}

type RouteCondition struct {
    HeaderMatch map[string]string
    ContentMatch string
    TopicPattern string
}

func NewMessageRouter() *MessageRouter {
    return &MessageRouter{
        routes: make(map[string][]RouteRule),
    }
}

func (r *MessageRouter) AddRoute(topic string, rule RouteRule) {
    r.mu.Lock()
    defer r.mu.Unlock()

    if _, exists := r.routes[topic]; !exists {
        r.routes[topic] = make([]RouteRule, 0)
    }

    r.routes[topic] = append(r.routes[topic], rule)

    // 按优先级排序
    sort.Slice(r.routes[topic], func(i, j int) bool {
        return r.routes[topic][i].Priority > r.routes[topic][j].Priority
    })
}

func (r *MessageRouter) Route(msg *Message) []string {
    r.mu.RLock()
    defer r.mu.RUnlock()

    var targets []string

    if rules, exists := r.routes[msg.Topic]; exists {
        for _, rule := range rules {
            if r.matchRule(msg, rule.Condition) {
                targets = append(targets, rule.Target)
            }
        }
    }

    return targets
}

func (r *MessageRouter) matchRule(msg *Message, condition RouteCondition) bool {
    // 检查头部匹配
    for key, expectedValue := range condition.HeaderMatch {
        if actualValue, exists := msg.Headers[key]; !exists || actualValue != expectedValue {
            return false
        }
    }

    // 检查内容匹配
    if condition.ContentMatch != "" {
        if !strings.Contains(string(msg.Value), condition.ContentMatch) {
            return false
        }
    }

    // 检查主题模式匹配
    if condition.TopicPattern != "" {
        matched, _ := filepath.Match(condition.TopicPattern, msg.Topic)
        if !matched {
            return false
        }
    }

    return true
}

消息分区 #

// 分区器接口
type Partitioner interface {
    Partition(msg *Message, numPartitions int) int
}

// 哈希分区器
type HashPartitioner struct{}

func (p *HashPartitioner) Partition(msg *Message, numPartitions int) int {
    if msg.Key == "" {
        // 如果没有键,使用随机分区
        return rand.Intn(numPartitions)
    }

    // 使用键的哈希值进行分区
    h := fnv.New32a()
    h.Write([]byte(msg.Key))
    return int(h.Sum32()) % numPartitions
}

// 轮询分区器
type RoundRobinPartitioner struct {
    counter int64
}

func (p *RoundRobinPartitioner) Partition(msg *Message, numPartitions int) int {
    partition := int(atomic.AddInt64(&p.counter, 1)) % numPartitions
    return partition
}

// 自定义分区器
type CustomPartitioner struct {
    partitionFunc func(*Message, int) int
}

func NewCustomPartitioner(fn func(*Message, int) int) *CustomPartitioner {
    return &CustomPartitioner{
        partitionFunc: fn,
    }
}

func (p *CustomPartitioner) Partition(msg *Message, numPartitions int) int {
    return p.partitionFunc(msg, numPartitions)
}

// 分区管理器
type PartitionManager struct {
    partitioner   Partitioner
    partitions    []*Partition
    numPartitions int
}

type Partition struct {
    ID       int
    Messages chan *Message
    Offset   int64
    mu       sync.RWMutex
}

func NewPartitionManager(numPartitions int, partitioner Partitioner) *PartitionManager {
    partitions := make([]*Partition, numPartitions)
    for i := 0; i < numPartitions; i++ {
        partitions[i] = &Partition{
            ID:       i,
            Messages: make(chan *Message, 1000),
            Offset:   0,
        }
    }

    return &PartitionManager{
        partitioner:   partitioner,
        partitions:    partitions,
        numPartitions: numPartitions,
    }
}

func (pm *PartitionManager) Send(msg *Message) error {
    partitionID := pm.partitioner.Partition(msg, pm.numPartitions)
    partition := pm.partitions[partitionID]

    partition.mu.Lock()
    msg.Partition = int32(partitionID)
    msg.Offset = partition.Offset
    partition.Offset++
    partition.mu.Unlock()

    select {
    case partition.Messages <- msg:
        return nil
    default:
        return fmt.Errorf("分区 %d 队列已满", partitionID)
    }
}

func (pm *PartitionManager) Consume(partitionID int) (<-chan *Message, error) {
    if partitionID < 0 || partitionID >= pm.numPartitions {
        return nil, fmt.Errorf("无效的分区ID: %d", partitionID)
    }

    return pm.partitions[partitionID].Messages, nil
}

消息队列实现示例 #

简单内存消息队列 #

// 内存消息队列实现
type InMemoryMessageQueue struct {
    topics    map[string]*Topic
    mu        sync.RWMutex
    closed    bool
    closeChan chan struct{}
}

type Topic struct {
    Name         string
    Partitions   []*TopicPartition
    Subscribers  map[string]*Subscriber
    mu           sync.RWMutex
}

type TopicPartition struct {
    ID       int
    Messages []*Message
    Offset   int64
    mu       sync.RWMutex
}

type Subscriber struct {
    ID       string
    Channel  chan *Message
    Filter   func(*Message) bool
    Active   bool
}

func NewInMemoryMessageQueue() *InMemoryMessageQueue {
    mq := &InMemoryMessageQueue{
        topics:    make(map[string]*Topic),
        closeChan: make(chan struct{}),
    }

    return mq
}

func (mq *InMemoryMessageQueue) CreateTopic(name string, partitions int) error {
    mq.mu.Lock()
    defer mq.mu.Unlock()

    if _, exists := mq.topics[name]; exists {
        return fmt.Errorf("主题已存在: %s", name)
    }

    topic := &Topic{
        Name:        name,
        Partitions:  make([]*TopicPartition, partitions),
        Subscribers: make(map[string]*Subscriber),
    }

    for i := 0; i < partitions; i++ {
        topic.Partitions[i] = &TopicPartition{
            ID:       i,
            Messages: make([]*Message, 0),
            Offset:   0,
        }
    }

    mq.topics[name] = topic
    return nil
}

func (mq *InMemoryMessageQueue) Publish(msg *Message) error {
    mq.mu.RLock()
    topic, exists := mq.topics[msg.Topic]
    mq.mu.RUnlock()

    if !exists {
        return fmt.Errorf("主题不存在: %s", msg.Topic)
    }

    // 选择分区
    partitionID := 0
    if len(topic.Partitions) > 1 {
        if msg.Key != "" {
            h := fnv.New32a()
            h.Write([]byte(msg.Key))
            partitionID = int(h.Sum32()) % len(topic.Partitions)
        } else {
            partitionID = rand.Intn(len(topic.Partitions))
        }
    }

    partition := topic.Partitions[partitionID]
    partition.mu.Lock()
    msg.Partition = int32(partitionID)
    msg.Offset = partition.Offset
    partition.Messages = append(partition.Messages, msg)
    partition.Offset++
    partition.mu.Unlock()

    // 通知订阅者
    topic.mu.RLock()
    for _, subscriber := range topic.Subscribers {
        if subscriber.Active && (subscriber.Filter == nil || subscriber.Filter(msg)) {
            select {
            case subscriber.Channel <- msg:
            default:
                // 订阅者缓冲区已满,跳过
            }
        }
    }
    topic.mu.RUnlock()

    return nil
}

func (mq *InMemoryMessageQueue) Subscribe(topicName, subscriberID string, bufferSize int, filter func(*Message) bool) (<-chan *Message, error) {
    mq.mu.RLock()
    topic, exists := mq.topics[topicName]
    mq.mu.RUnlock()

    if !exists {
        return nil, fmt.Errorf("主题不存在: %s", topicName)
    }

    topic.mu.Lock()
    defer topic.mu.Unlock()

    if _, exists := topic.Subscribers[subscriberID]; exists {
        return nil, fmt.Errorf("订阅者已存在: %s", subscriberID)
    }

    subscriber := &Subscriber{
        ID:      subscriberID,
        Channel: make(chan *Message, bufferSize),
        Filter:  filter,
        Active:  true,
    }

    topic.Subscribers[subscriberID] = subscriber
    return subscriber.Channel, nil
}

func (mq *InMemoryMessageQueue) Unsubscribe(topicName, subscriberID string) error {
    mq.mu.RLock()
    topic, exists := mq.topics[topicName]
    mq.mu.RUnlock()

    if !exists {
        return fmt.Errorf("主题不存在: %s", topicName)
    }

    topic.mu.Lock()
    defer topic.mu.Unlock()

    if subscriber, exists := topic.Subscribers[subscriberID]; exists {
        subscriber.Active = false
        close(subscriber.Channel)
        delete(topic.Subscribers, subscriberID)
    }

    return nil
}

消息队列基础为分布式系统提供了可靠的异步通信机制。通过理解消息传递模式、可靠性保证和分区策略,我们可以构建高性能、高可用的消息系统。在下一节中,我们将深入学习 Kafka 的使用和开发。