5.6.1 消息队列基础 #
消息队列是分布式系统中实现异步通信的重要中间件,它通过消息传递的方式实现系统组件之间的解耦。理解消息队列的基本概念和工作原理是构建可靠分布式系统的基础。
消息队列核心概念 #
基本组件 #
消息队列系统通常包含以下核心组件:
// 消息结构定义
type Message struct {
ID string `json:"id"`
Topic string `json:"topic"`
Key string `json:"key,omitempty"`
Value []byte `json:"value"`
Headers map[string]string `json:"headers,omitempty"`
Timestamp time.Time `json:"timestamp"`
Partition int32 `json:"partition,omitempty"`
Offset int64 `json:"offset,omitempty"`
Retry int `json:"retry"`
MaxRetries int `json:"max_retries"`
DeadLetter bool `json:"dead_letter"`
}
// 生产者接口
type Producer interface {
Send(ctx context.Context, msg *Message) error
SendAsync(ctx context.Context, msg *Message, callback func(error)) error
SendBatch(ctx context.Context, messages []*Message) error
Close() error
}
// 消费者接口
type Consumer interface {
Subscribe(topics []string) error
Consume(ctx context.Context) (<-chan *Message, error)
Commit(ctx context.Context, msg *Message) error
Close() error
}
// 消息队列管理器
type MessageQueue interface {
CreateProducer(config *ProducerConfig) (Producer, error)
CreateConsumer(config *ConsumerConfig) (Consumer, error)
CreateTopic(topic string, config *TopicConfig) error
DeleteTopic(topic string) error
GetTopicInfo(topic string) (*TopicInfo, error)
}
消息传递模式 #
消息队列支持多种消息传递模式:
// 点对点模式 (Point-to-Point)
type P2PQueue struct {
name string
messages chan *Message
consumers []Consumer
mu sync.RWMutex
}
func NewP2PQueue(name string, capacity int) *P2PQueue {
return &P2PQueue{
name: name,
messages: make(chan *Message, capacity),
consumers: make([]Consumer, 0),
}
}
func (q *P2PQueue) Send(msg *Message) error {
select {
case q.messages <- msg:
return nil
default:
return fmt.Errorf("队列已满")
}
}
func (q *P2PQueue) Receive() (*Message, error) {
select {
case msg := <-q.messages:
return msg, nil
case <-time.After(5 * time.Second):
return nil, fmt.Errorf("接收超时")
}
}
// 发布订阅模式 (Publish-Subscribe)
type PubSubTopic struct {
name string
subscribers map[string]chan *Message
mu sync.RWMutex
}
func NewPubSubTopic(name string) *PubSubTopic {
return &PubSubTopic{
name: name,
subscribers: make(map[string]chan *Message),
}
}
func (t *PubSubTopic) Subscribe(subscriberID string, bufferSize int) <-chan *Message {
t.mu.Lock()
defer t.mu.Unlock()
ch := make(chan *Message, bufferSize)
t.subscribers[subscriberID] = ch
return ch
}
func (t *PubSubTopic) Unsubscribe(subscriberID string) {
t.mu.Lock()
defer t.mu.Unlock()
if ch, exists := t.subscribers[subscriberID]; exists {
close(ch)
delete(t.subscribers, subscriberID)
}
}
func (t *PubSubTopic) Publish(msg *Message) error {
t.mu.RLock()
defer t.mu.RUnlock()
for subscriberID, ch := range t.subscribers {
select {
case ch <- msg:
// 消息发送成功
default:
// 订阅者缓冲区已满,记录日志
log.Printf("订阅者 %s 缓冲区已满,丢弃消息", subscriberID)
}
}
return nil
}
消息可靠性保证 #
消息持久化 #
// 消息存储接口
type MessageStore interface {
Store(msg *Message) error
Retrieve(id string) (*Message, error)
Delete(id string) error
List(topic string, offset, limit int) ([]*Message, error)
}
// 基于文件的消息存储实现
type FileMessageStore struct {
dataDir string
indexFile *os.File
dataFile *os.File
mu sync.RWMutex
index map[string]int64 // messageID -> file offset
}
func NewFileMessageStore(dataDir string) (*FileMessageStore, error) {
if err := os.MkdirAll(dataDir, 0755); err != nil {
return nil, err
}
indexPath := filepath.Join(dataDir, "index.dat")
dataPath := filepath.Join(dataDir, "messages.dat")
indexFile, err := os.OpenFile(indexPath, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
dataFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
indexFile.Close()
return nil, err
}
store := &FileMessageStore{
dataDir: dataDir,
indexFile: indexFile,
dataFile: dataFile,
index: make(map[string]int64),
}
// 加载索引
if err := store.loadIndex(); err != nil {
return nil, err
}
return store, nil
}
func (s *FileMessageStore) Store(msg *Message) error {
s.mu.Lock()
defer s.mu.Unlock()
// 序列化消息
data, err := json.Marshal(msg)
if err != nil {
return err
}
// 获取当前文件位置
offset, err := s.dataFile.Seek(0, io.SeekEnd)
if err != nil {
return err
}
// 写入消息长度和数据
length := uint32(len(data))
if err := binary.Write(s.dataFile, binary.LittleEndian, length); err != nil {
return err
}
if _, err := s.dataFile.Write(data); err != nil {
return err
}
// 强制刷新到磁盘
if err := s.dataFile.Sync(); err != nil {
return err
}
// 更新索引
s.index[msg.ID] = offset
// 持久化索引
return s.saveIndex()
}
func (s *FileMessageStore) Retrieve(id string) (*Message, error) {
s.mu.RLock()
defer s.mu.RUnlock()
offset, exists := s.index[id]
if !exists {
return nil, fmt.Errorf("消息不存在: %s", id)
}
// 定位到消息位置
if _, err := s.dataFile.Seek(offset, io.SeekStart); err != nil {
return nil, err
}
// 读取消息长度
var length uint32
if err := binary.Read(s.dataFile, binary.LittleEndian, &length); err != nil {
return nil, err
}
// 读取消息数据
data := make([]byte, length)
if _, err := io.ReadFull(s.dataFile, data); err != nil {
return nil, err
}
// 反序列化消息
var msg Message
if err := json.Unmarshal(data, &msg); err != nil {
return nil, err
}
return &msg, nil
}
消息确认机制 #
// 消息确认管理器
type AckManager struct {
pendingAcks map[string]*PendingAck
mu sync.RWMutex
timeout time.Duration
}
type PendingAck struct {
Message *Message
Timestamp time.Time
Retries int
Consumer string
}
func NewAckManager(timeout time.Duration) *AckManager {
manager := &AckManager{
pendingAcks: make(map[string]*PendingAck),
timeout: timeout,
}
// 启动超时检查
go manager.timeoutChecker()
return manager
}
func (am *AckManager) AddPendingAck(msg *Message, consumer string) {
am.mu.Lock()
defer am.mu.Unlock()
am.pendingAcks[msg.ID] = &PendingAck{
Message: msg,
Timestamp: time.Now(),
Retries: 0,
Consumer: consumer,
}
}
func (am *AckManager) Acknowledge(messageID string) error {
am.mu.Lock()
defer am.mu.Unlock()
if _, exists := am.pendingAcks[messageID]; exists {
delete(am.pendingAcks, messageID)
return nil
}
return fmt.Errorf("消息不存在或已确认: %s", messageID)
}
func (am *AckManager) timeoutChecker() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
am.checkTimeouts()
}
}
func (am *AckManager) checkTimeouts() {
am.mu.Lock()
defer am.mu.Unlock()
now := time.Now()
for messageID, pending := range am.pendingAcks {
if now.Sub(pending.Timestamp) > am.timeout {
// 消息确认超时,需要重新投递
pending.Retries++
pending.Timestamp = now
if pending.Retries > pending.Message.MaxRetries {
// 超过最大重试次数,发送到死信队列
pending.Message.DeadLetter = true
log.Printf("消息 %s 超过最大重试次数,发送到死信队列", messageID)
delete(am.pendingAcks, messageID)
} else {
// 重新投递消息
log.Printf("消息 %s 确认超时,重新投递 (重试次数: %d)", messageID, pending.Retries)
}
}
}
}
消息路由和分区 #
消息路由器 #
// 消息路由器
type MessageRouter struct {
routes map[string][]RouteRule
mu sync.RWMutex
}
type RouteRule struct {
Condition RouteCondition
Target string
Priority int
}
type RouteCondition struct {
HeaderMatch map[string]string
ContentMatch string
TopicPattern string
}
func NewMessageRouter() *MessageRouter {
return &MessageRouter{
routes: make(map[string][]RouteRule),
}
}
func (r *MessageRouter) AddRoute(topic string, rule RouteRule) {
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.routes[topic]; !exists {
r.routes[topic] = make([]RouteRule, 0)
}
r.routes[topic] = append(r.routes[topic], rule)
// 按优先级排序
sort.Slice(r.routes[topic], func(i, j int) bool {
return r.routes[topic][i].Priority > r.routes[topic][j].Priority
})
}
func (r *MessageRouter) Route(msg *Message) []string {
r.mu.RLock()
defer r.mu.RUnlock()
var targets []string
if rules, exists := r.routes[msg.Topic]; exists {
for _, rule := range rules {
if r.matchRule(msg, rule.Condition) {
targets = append(targets, rule.Target)
}
}
}
return targets
}
func (r *MessageRouter) matchRule(msg *Message, condition RouteCondition) bool {
// 检查头部匹配
for key, expectedValue := range condition.HeaderMatch {
if actualValue, exists := msg.Headers[key]; !exists || actualValue != expectedValue {
return false
}
}
// 检查内容匹配
if condition.ContentMatch != "" {
if !strings.Contains(string(msg.Value), condition.ContentMatch) {
return false
}
}
// 检查主题模式匹配
if condition.TopicPattern != "" {
matched, _ := filepath.Match(condition.TopicPattern, msg.Topic)
if !matched {
return false
}
}
return true
}
消息分区 #
// 分区器接口
type Partitioner interface {
Partition(msg *Message, numPartitions int) int
}
// 哈希分区器
type HashPartitioner struct{}
func (p *HashPartitioner) Partition(msg *Message, numPartitions int) int {
if msg.Key == "" {
// 如果没有键,使用随机分区
return rand.Intn(numPartitions)
}
// 使用键的哈希值进行分区
h := fnv.New32a()
h.Write([]byte(msg.Key))
return int(h.Sum32()) % numPartitions
}
// 轮询分区器
type RoundRobinPartitioner struct {
counter int64
}
func (p *RoundRobinPartitioner) Partition(msg *Message, numPartitions int) int {
partition := int(atomic.AddInt64(&p.counter, 1)) % numPartitions
return partition
}
// 自定义分区器
type CustomPartitioner struct {
partitionFunc func(*Message, int) int
}
func NewCustomPartitioner(fn func(*Message, int) int) *CustomPartitioner {
return &CustomPartitioner{
partitionFunc: fn,
}
}
func (p *CustomPartitioner) Partition(msg *Message, numPartitions int) int {
return p.partitionFunc(msg, numPartitions)
}
// 分区管理器
type PartitionManager struct {
partitioner Partitioner
partitions []*Partition
numPartitions int
}
type Partition struct {
ID int
Messages chan *Message
Offset int64
mu sync.RWMutex
}
func NewPartitionManager(numPartitions int, partitioner Partitioner) *PartitionManager {
partitions := make([]*Partition, numPartitions)
for i := 0; i < numPartitions; i++ {
partitions[i] = &Partition{
ID: i,
Messages: make(chan *Message, 1000),
Offset: 0,
}
}
return &PartitionManager{
partitioner: partitioner,
partitions: partitions,
numPartitions: numPartitions,
}
}
func (pm *PartitionManager) Send(msg *Message) error {
partitionID := pm.partitioner.Partition(msg, pm.numPartitions)
partition := pm.partitions[partitionID]
partition.mu.Lock()
msg.Partition = int32(partitionID)
msg.Offset = partition.Offset
partition.Offset++
partition.mu.Unlock()
select {
case partition.Messages <- msg:
return nil
default:
return fmt.Errorf("分区 %d 队列已满", partitionID)
}
}
func (pm *PartitionManager) Consume(partitionID int) (<-chan *Message, error) {
if partitionID < 0 || partitionID >= pm.numPartitions {
return nil, fmt.Errorf("无效的分区ID: %d", partitionID)
}
return pm.partitions[partitionID].Messages, nil
}
消息队列实现示例 #
简单内存消息队列 #
// 内存消息队列实现
type InMemoryMessageQueue struct {
topics map[string]*Topic
mu sync.RWMutex
closed bool
closeChan chan struct{}
}
type Topic struct {
Name string
Partitions []*TopicPartition
Subscribers map[string]*Subscriber
mu sync.RWMutex
}
type TopicPartition struct {
ID int
Messages []*Message
Offset int64
mu sync.RWMutex
}
type Subscriber struct {
ID string
Channel chan *Message
Filter func(*Message) bool
Active bool
}
func NewInMemoryMessageQueue() *InMemoryMessageQueue {
mq := &InMemoryMessageQueue{
topics: make(map[string]*Topic),
closeChan: make(chan struct{}),
}
return mq
}
func (mq *InMemoryMessageQueue) CreateTopic(name string, partitions int) error {
mq.mu.Lock()
defer mq.mu.Unlock()
if _, exists := mq.topics[name]; exists {
return fmt.Errorf("主题已存在: %s", name)
}
topic := &Topic{
Name: name,
Partitions: make([]*TopicPartition, partitions),
Subscribers: make(map[string]*Subscriber),
}
for i := 0; i < partitions; i++ {
topic.Partitions[i] = &TopicPartition{
ID: i,
Messages: make([]*Message, 0),
Offset: 0,
}
}
mq.topics[name] = topic
return nil
}
func (mq *InMemoryMessageQueue) Publish(msg *Message) error {
mq.mu.RLock()
topic, exists := mq.topics[msg.Topic]
mq.mu.RUnlock()
if !exists {
return fmt.Errorf("主题不存在: %s", msg.Topic)
}
// 选择分区
partitionID := 0
if len(topic.Partitions) > 1 {
if msg.Key != "" {
h := fnv.New32a()
h.Write([]byte(msg.Key))
partitionID = int(h.Sum32()) % len(topic.Partitions)
} else {
partitionID = rand.Intn(len(topic.Partitions))
}
}
partition := topic.Partitions[partitionID]
partition.mu.Lock()
msg.Partition = int32(partitionID)
msg.Offset = partition.Offset
partition.Messages = append(partition.Messages, msg)
partition.Offset++
partition.mu.Unlock()
// 通知订阅者
topic.mu.RLock()
for _, subscriber := range topic.Subscribers {
if subscriber.Active && (subscriber.Filter == nil || subscriber.Filter(msg)) {
select {
case subscriber.Channel <- msg:
default:
// 订阅者缓冲区已满,跳过
}
}
}
topic.mu.RUnlock()
return nil
}
func (mq *InMemoryMessageQueue) Subscribe(topicName, subscriberID string, bufferSize int, filter func(*Message) bool) (<-chan *Message, error) {
mq.mu.RLock()
topic, exists := mq.topics[topicName]
mq.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("主题不存在: %s", topicName)
}
topic.mu.Lock()
defer topic.mu.Unlock()
if _, exists := topic.Subscribers[subscriberID]; exists {
return nil, fmt.Errorf("订阅者已存在: %s", subscriberID)
}
subscriber := &Subscriber{
ID: subscriberID,
Channel: make(chan *Message, bufferSize),
Filter: filter,
Active: true,
}
topic.Subscribers[subscriberID] = subscriber
return subscriber.Channel, nil
}
func (mq *InMemoryMessageQueue) Unsubscribe(topicName, subscriberID string) error {
mq.mu.RLock()
topic, exists := mq.topics[topicName]
mq.mu.RUnlock()
if !exists {
return fmt.Errorf("主题不存在: %s", topicName)
}
topic.mu.Lock()
defer topic.mu.Unlock()
if subscriber, exists := topic.Subscribers[subscriberID]; exists {
subscriber.Active = false
close(subscriber.Channel)
delete(topic.Subscribers, subscriberID)
}
return nil
}
消息队列基础为分布式系统提供了可靠的异步通信机制。通过理解消息传递模式、可靠性保证和分区策略,我们可以构建高性能、高可用的消息系统。在下一节中,我们将深入学习 Kafka 的使用和开发。