5.6.2 Kafka 使用与开发

5.6.2 Kafka 使用与开发 #

Apache Kafka 是一个高吞吐量的分布式消息系统,广泛应用于大数据处理、实时流处理和微服务架构中。本节将深入介绍 Kafka 的核心概念、架构设计,以及如何在 Go 语言中进行 Kafka 的开发和应用。

Kafka 核心概念 #

Kafka 架构组件 #

// Kafka 集群配置
type KafkaClusterConfig struct {
    Brokers          []string          `json:"brokers"`
    SecurityProtocol string            `json:"security_protocol"`
    SASLMechanism    string            `json:"sasl_mechanism"`
    SASLUsername     string            `json:"sasl_username"`
    SASLPassword     string            `json:"sasl_password"`
    SSLConfig        *SSLConfig        `json:"ssl_config,omitempty"`
    ClientID         string            `json:"client_id"`
    Metadata         map[string]string `json:"metadata"`
}

type SSLConfig struct {
    CertFile   string `json:"cert_file"`
    KeyFile    string `json:"key_file"`
    CAFile     string `json:"ca_file"`
    SkipVerify bool   `json:"skip_verify"`
}

// Topic 配置
type TopicConfig struct {
    Name              string            `json:"name"`
    NumPartitions     int32             `json:"num_partitions"`
    ReplicationFactor int16             `json:"replication_factor"`
    ConfigEntries     map[string]string `json:"config_entries"`
}

// 分区信息
type PartitionInfo struct {
    Topic     string  `json:"topic"`
    Partition int32   `json:"partition"`
    Leader    int32   `json:"leader"`
    Replicas  []int32 `json:"replicas"`
    ISR       []int32 `json:"isr"` // In-Sync Replicas
}

// Kafka 管理客户端
type KafkaAdmin struct {
    client sarama.Client
    admin  sarama.ClusterAdmin
    config *sarama.Config
}

func NewKafkaAdmin(brokers []string, config *KafkaClusterConfig) (*KafkaAdmin, error) {
    saramaConfig := sarama.NewConfig()
    saramaConfig.Version = sarama.V2_6_0_0
    saramaConfig.ClientID = config.ClientID

    // 配置安全设置
    if err := configureSecurity(saramaConfig, config); err != nil {
        return nil, err
    }

    client, err := sarama.NewClient(brokers, saramaConfig)
    if err != nil {
        return nil, err
    }

    admin, err := sarama.NewClusterAdminFromClient(client)
    if err != nil {
        client.Close()
        return nil, err
    }

    return &KafkaAdmin{
        client: client,
        admin:  admin,
        config: saramaConfig,
    }, nil
}

func (ka *KafkaAdmin) CreateTopic(topicConfig *TopicConfig) error {
    topicDetail := &sarama.TopicDetail{
        NumPartitions:     topicConfig.NumPartitions,
        ReplicationFactor: topicConfig.ReplicationFactor,
        ConfigEntries:     topicConfig.ConfigEntries,
    }

    err := ka.admin.CreateTopic(topicConfig.Name, topicDetail, false)
    if err != nil {
        return fmt.Errorf("创建主题失败: %v", err)
    }

    return nil
}

func (ka *KafkaAdmin) ListTopics() (map[string]sarama.TopicDetail, error) {
    return ka.admin.ListTopics()
}

func (ka *KafkaAdmin) DeleteTopic(topic string) error {
    return ka.admin.DeleteTopic(topic)
}

func (ka *KafkaAdmin) GetPartitionInfo(topic string) ([]*PartitionInfo, error) {
    partitions, err := ka.client.Partitions(topic)
    if err != nil {
        return nil, err
    }

    var partitionInfos []*PartitionInfo
    for _, partition := range partitions {
        leader, err := ka.client.Leader(topic, partition)
        if err != nil {
            continue
        }

        replicas, err := ka.client.Replicas(topic, partition)
        if err != nil {
            continue
        }

        isr, err := ka.client.InSyncReplicas(topic, partition)
        if err != nil {
            continue
        }

        partitionInfos = append(partitionInfos, &PartitionInfo{
            Topic:     topic,
            Partition: partition,
            Leader:    leader.ID(),
            Replicas:  replicas,
            ISR:       isr,
        })
    }

    return partitionInfos, nil
}

Kafka 生产者开发 #

同步生产者 #

// Kafka 生产者配置
type ProducerConfig struct {
    Brokers           []string          `json:"brokers"`
    ClientID          string            `json:"client_id"`
    Acks              sarama.RequiredAcks `json:"acks"`
    Compression       sarama.CompressionCodec `json:"compression"`
    MaxMessageBytes   int               `json:"max_message_bytes"`
    RetryMax          int               `json:"retry_max"`
    RetryBackoff      time.Duration     `json:"retry_backoff"`
    FlushFrequency    time.Duration     `json:"flush_frequency"`
    FlushMessages     int               `json:"flush_messages"`
    FlushBytes        int               `json:"flush_bytes"`
    Partitioner       string            `json:"partitioner"`
    Idempotent        bool              `json:"idempotent"`
    SecurityConfig    *KafkaClusterConfig `json:"security_config,omitempty"`
}

// Kafka 生产者
type KafkaProducer struct {
    producer   sarama.SyncProducer
    config     *ProducerConfig
    partitioner sarama.Partitioner
    logger     *log.Logger
}

func NewKafkaProducer(config *ProducerConfig) (*KafkaProducer, error) {
    saramaConfig := sarama.NewConfig()
    saramaConfig.Version = sarama.V2_6_0_0
    saramaConfig.ClientID = config.ClientID

    // 生产者配置
    saramaConfig.Producer.RequiredAcks = config.Acks
    saramaConfig.Producer.Compression = config.Compression
    saramaConfig.Producer.MaxMessageBytes = config.MaxMessageBytes
    saramaConfig.Producer.Retry.Max = config.RetryMax
    saramaConfig.Producer.Retry.Backoff = config.RetryBackoff
    saramaConfig.Producer.Flush.Frequency = config.FlushFrequency
    saramaConfig.Producer.Flush.Messages = config.FlushMessages
    saramaConfig.Producer.Flush.Bytes = config.FlushBytes
    saramaConfig.Producer.Idempotent = config.Idempotent

    // 配置分区器
    switch config.Partitioner {
    case "hash":
        saramaConfig.Producer.Partitioner = sarama.NewHashPartitioner
    case "random":
        saramaConfig.Producer.Partitioner = sarama.NewRandomPartitioner
    case "roundrobin":
        saramaConfig.Producer.Partitioner = sarama.NewRoundRobinPartitioner
    default:
        saramaConfig.Producer.Partitioner = sarama.NewHashPartitioner
    }

    // 配置安全设置
    if config.SecurityConfig != nil {
        if err := configureSecurity(saramaConfig, config.SecurityConfig); err != nil {
            return nil, err
        }
    }

    producer, err := sarama.NewSyncProducer(config.Brokers, saramaConfig)
    if err != nil {
        return nil, err
    }

    return &KafkaProducer{
        producer: producer,
        config:   config,
        logger:   log.New(os.Stdout, "[KafkaProducer] ", log.LstdFlags),
    }, nil
}

func (kp *KafkaProducer) SendMessage(topic, key string, value []byte, headers map[string]string) (*sarama.ProducerMessage, error) {
    msg := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.ByteEncoder(value),
    }

    if key != "" {
        msg.Key = sarama.StringEncoder(key)
    }

    // 添加消息头
    if len(headers) > 0 {
        msg.Headers = make([]sarama.RecordHeader, 0, len(headers))
        for k, v := range headers {
            msg.Headers = append(msg.Headers, sarama.RecordHeader{
                Key:   []byte(k),
                Value: []byte(v),
            })
        }
    }

    // 添加时间戳
    msg.Timestamp = time.Now()

    partition, offset, err := kp.producer.SendMessage(msg)
    if err != nil {
        kp.logger.Printf("发送消息失败: %v", err)
        return nil, err
    }

    msg.Partition = partition
    msg.Offset = offset

    kp.logger.Printf("消息发送成功: topic=%s, partition=%d, offset=%d", topic, partition, offset)
    return msg, nil
}

func (kp *KafkaProducer) SendMessages(messages []*sarama.ProducerMessage) error {
    return kp.producer.SendMessages(messages)
}

func (kp *KafkaProducer) Close() error {
    return kp.producer.Close()
}

异步生产者 #

// 异步 Kafka 生产者
type AsyncKafkaProducer struct {
    producer sarama.AsyncProducer
    config   *ProducerConfig
    logger   *log.Logger
    wg       sync.WaitGroup
    closed   chan struct{}
}

func NewAsyncKafkaProducer(config *ProducerConfig) (*AsyncKafkaProducer, error) {
    saramaConfig := sarama.NewConfig()
    saramaConfig.Version = sarama.V2_6_0_0
    saramaConfig.ClientID = config.ClientID

    // 异步生产者配置
    saramaConfig.Producer.RequiredAcks = config.Acks
    saramaConfig.Producer.Compression = config.Compression
    saramaConfig.Producer.Return.Successes = true
    saramaConfig.Producer.Return.Errors = true

    // 配置安全设置
    if config.SecurityConfig != nil {
        if err := configureSecurity(saramaConfig, config.SecurityConfig); err != nil {
            return nil, err
        }
    }

    producer, err := sarama.NewAsyncProducer(config.Brokers, saramaConfig)
    if err != nil {
        return nil, err
    }

    asyncProducer := &AsyncKafkaProducer{
        producer: producer,
        config:   config,
        logger:   log.New(os.Stdout, "[AsyncKafkaProducer] ", log.LstdFlags),
        closed:   make(chan struct{}),
    }

    // 启动消息处理协程
    asyncProducer.wg.Add(2)
    go asyncProducer.handleSuccesses()
    go asyncProducer.handleErrors()

    return asyncProducer, nil
}

func (akp *AsyncKafkaProducer) SendMessageAsync(topic, key string, value []byte, headers map[string]string) {
    msg := &sarama.ProducerMessage{
        Topic:     topic,
        Value:     sarama.ByteEncoder(value),
        Timestamp: time.Now(),
    }

    if key != "" {
        msg.Key = sarama.StringEncoder(key)
    }

    if len(headers) > 0 {
        msg.Headers = make([]sarama.RecordHeader, 0, len(headers))
        for k, v := range headers {
            msg.Headers = append(msg.Headers, sarama.RecordHeader{
                Key:   []byte(k),
                Value: []byte(v),
            })
        }
    }

    akp.producer.Input() <- msg
}

func (akp *AsyncKafkaProducer) handleSuccesses() {
    defer akp.wg.Done()

    for {
        select {
        case msg := <-akp.producer.Successes():
            akp.logger.Printf("消息发送成功: topic=%s, partition=%d, offset=%d",
                msg.Topic, msg.Partition, msg.Offset)
        case <-akp.closed:
            return
        }
    }
}

func (akp *AsyncKafkaProducer) handleErrors() {
    defer akp.wg.Done()

    for {
        select {
        case err := <-akp.producer.Errors():
            akp.logger.Printf("消息发送失败: topic=%s, partition=%d, error=%v",
                err.Msg.Topic, err.Msg.Partition, err.Err)
        case <-akp.closed:
            return
        }
    }
}

func (akp *AsyncKafkaProducer) Close() error {
    close(akp.closed)
    akp.producer.AsyncClose()
    akp.wg.Wait()
    return nil
}

Kafka 消费者开发 #

消费者组 #

// Kafka 消费者配置
type ConsumerConfig struct {
    Brokers        []string          `json:"brokers"`
    GroupID        string            `json:"group_id"`
    ClientID       string            `json:"client_id"`
    Topics         []string          `json:"topics"`
    AutoOffsetReset string           `json:"auto_offset_reset"`
    EnableAutoCommit bool            `json:"enable_auto_commit"`
    AutoCommitInterval time.Duration `json:"auto_commit_interval"`
    SessionTimeout   time.Duration   `json:"session_timeout"`
    HeartbeatInterval time.Duration  `json:"heartbeat_interval"`
    MaxProcessingTime time.Duration  `json:"max_processing_time"`
    SecurityConfig   *KafkaClusterConfig `json:"security_config,omitempty"`
}

// 消息处理器接口
type MessageHandler interface {
    HandleMessage(ctx context.Context, message *sarama.ConsumerMessage) error
}

// 函数式消息处理器
type MessageHandlerFunc func(ctx context.Context, message *sarama.ConsumerMessage) error

func (f MessageHandlerFunc) HandleMessage(ctx context.Context, message *sarama.ConsumerMessage) error {
    return f(ctx, message)
}

// Kafka 消费者组
type KafkaConsumerGroup struct {
    consumerGroup sarama.ConsumerGroup
    config        *ConsumerConfig
    handler       MessageHandler
    logger        *log.Logger
    ctx           context.Context
    cancel        context.CancelFunc
    wg            sync.WaitGroup
}

func NewKafkaConsumerGroup(config *ConsumerConfig, handler MessageHandler) (*KafkaConsumerGroup, error) {
    saramaConfig := sarama.NewConfig()
    saramaConfig.Version = sarama.V2_6_0_0
    saramaConfig.ClientID = config.ClientID
    saramaConfig.Consumer.Group.Session.Timeout = config.SessionTimeout
    saramaConfig.Consumer.Group.Heartbeat.Interval = config.HeartbeatInterval
    saramaConfig.Consumer.MaxProcessingTime = config.MaxProcessingTime
    saramaConfig.Consumer.Return.Errors = true

    // 配置偏移量重置策略
    switch config.AutoOffsetReset {
    case "earliest":
        saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
    case "latest":
        saramaConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
    default:
        saramaConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
    }

    // 配置自动提交
    if config.EnableAutoCommit {
        saramaConfig.Consumer.Offsets.AutoCommit.Enable = true
        saramaConfig.Consumer.Offsets.AutoCommit.Interval = config.AutoCommitInterval
    }

    // 配置安全设置
    if config.SecurityConfig != nil {
        if err := configureSecurity(saramaConfig, config.SecurityConfig); err != nil {
            return nil, err
        }
    }

    consumerGroup, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, saramaConfig)
    if err != nil {
        return nil, err
    }

    ctx, cancel := context.WithCancel(context.Background())

    kcg := &KafkaConsumerGroup{
        consumerGroup: consumerGroup,
        config:        config,
        handler:       handler,
        logger:        log.New(os.Stdout, "[KafkaConsumerGroup] ", log.LstdFlags),
        ctx:           ctx,
        cancel:        cancel,
    }

    return kcg, nil
}

func (kcg *KafkaConsumerGroup) Start() error {
    kcg.wg.Add(2)

    // 启动消费协程
    go func() {
        defer kcg.wg.Done()
        for {
            if err := kcg.consumerGroup.Consume(kcg.ctx, kcg.config.Topics, kcg); err != nil {
                kcg.logger.Printf("消费错误: %v", err)
                return
            }

            if kcg.ctx.Err() != nil {
                return
            }
        }
    }()

    // 启动错误处理协程
    go func() {
        defer kcg.wg.Done()
        for {
            select {
            case err := <-kcg.consumerGroup.Errors():
                kcg.logger.Printf("消费者组错误: %v", err)
            case <-kcg.ctx.Done():
                return
            }
        }
    }()

    return nil
}

func (kcg *KafkaConsumerGroup) Stop() error {
    kcg.cancel()
    kcg.wg.Wait()
    return kcg.consumerGroup.Close()
}

// 实现 sarama.ConsumerGroupHandler 接口
func (kcg *KafkaConsumerGroup) Setup(sarama.ConsumerGroupSession) error {
    kcg.logger.Printf("消费者组会话开始")
    return nil
}

func (kcg *KafkaConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error {
    kcg.logger.Printf("消费者组会话结束")
    return nil
}

func (kcg *KafkaConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for {
        select {
        case message := <-claim.Messages():
            if message == nil {
                return nil
            }

            // 处理消息
            if err := kcg.handler.HandleMessage(kcg.ctx, message); err != nil {
                kcg.logger.Printf("消息处理失败: %v", err)
                // 根据配置决定是否继续处理
                continue
            }

            // 手动提交偏移量
            if !kcg.config.EnableAutoCommit {
                session.MarkMessage(message, "")
            }

        case <-kcg.ctx.Done():
            return nil
        }
    }
}

手动分区消费 #

// 手动分区消费者
type ManualPartitionConsumer struct {
    consumer   sarama.Consumer
    partitions map[string][]sarama.PartitionConsumer
    config     *ConsumerConfig
    handler    MessageHandler
    logger     *log.Logger
    ctx        context.Context
    cancel     context.CancelFunc
    wg         sync.WaitGroup
}

func NewManualPartitionConsumer(config *ConsumerConfig, handler MessageHandler) (*ManualPartitionConsumer, error) {
    saramaConfig := sarama.NewConfig()
    saramaConfig.Version = sarama.V2_6_0_0
    saramaConfig.ClientID = config.ClientID

    // 配置安全设置
    if config.SecurityConfig != nil {
        if err := configureSecurity(saramaConfig, config.SecurityConfig); err != nil {
            return nil, err
        }
    }

    consumer, err := sarama.NewConsumer(config.Brokers, saramaConfig)
    if err != nil {
        return nil, err
    }

    ctx, cancel := context.WithCancel(context.Background())

    mpc := &ManualPartitionConsumer{
        consumer:   consumer,
        partitions: make(map[string][]sarama.PartitionConsumer),
        config:     config,
        handler:    handler,
        logger:     log.New(os.Stdout, "[ManualPartitionConsumer] ", log.LstdFlags),
        ctx:        ctx,
        cancel:     cancel,
    }

    return mpc, nil
}

func (mpc *ManualPartitionConsumer) Start() error {
    for _, topic := range mpc.config.Topics {
        partitions, err := mpc.consumer.Partitions(topic)
        if err != nil {
            return err
        }

        mpc.partitions[topic] = make([]sarama.PartitionConsumer, len(partitions))

        for i, partition := range partitions {
            var offset int64
            switch mpc.config.AutoOffsetReset {
            case "earliest":
                offset = sarama.OffsetOldest
            case "latest":
                offset = sarama.OffsetNewest
            default:
                offset = sarama.OffsetNewest
            }

            pc, err := mpc.consumer.ConsumePartition(topic, partition, offset)
            if err != nil {
                return err
            }

            mpc.partitions[topic][i] = pc

            // 为每个分区启动消费协程
            mpc.wg.Add(1)
            go mpc.consumePartition(topic, partition, pc)
        }
    }

    return nil
}

func (mpc *ManualPartitionConsumer) consumePartition(topic string, partition int32, pc sarama.PartitionConsumer) {
    defer mpc.wg.Done()
    defer pc.Close()

    for {
        select {
        case message := <-pc.Messages():
            if message == nil {
                return
            }

            if err := mpc.handler.HandleMessage(mpc.ctx, message); err != nil {
                mpc.logger.Printf("消息处理失败: topic=%s, partition=%d, offset=%d, error=%v",
                    topic, partition, message.Offset, err)
            }

        case err := <-pc.Errors():
            mpc.logger.Printf("分区消费错误: topic=%s, partition=%d, error=%v",
                topic, partition, err)

        case <-mpc.ctx.Done():
            return
        }
    }
}

func (mpc *ManualPartitionConsumer) Stop() error {
    mpc.cancel()
    mpc.wg.Wait()
    return mpc.consumer.Close()
}

Kafka 流处理 #

简单流处理器 #

// 流处理器接口
type StreamProcessor interface {
    Process(ctx context.Context, input <-chan *sarama.ConsumerMessage, output chan<- *sarama.ProducerMessage) error
}

// 消息转换器
type MessageTransformer func(*sarama.ConsumerMessage) (*sarama.ProducerMessage, error)

// 简单流处理器实现
type SimpleStreamProcessor struct {
    transformer MessageTransformer
    logger      *log.Logger
}

func NewSimpleStreamProcessor(transformer MessageTransformer) *SimpleStreamProcessor {
    return &SimpleStreamProcessor{
        transformer: transformer,
        logger:      log.New(os.Stdout, "[StreamProcessor] ", log.LstdFlags),
    }
}

func (ssp *SimpleStreamProcessor) Process(ctx context.Context, input <-chan *sarama.ConsumerMessage, output chan<- *sarama.ProducerMessage) error {
    for {
        select {
        case msg := <-input:
            if msg == nil {
                return nil
            }

            // 转换消息
            transformedMsg, err := ssp.transformer(msg)
            if err != nil {
                ssp.logger.Printf("消息转换失败: %v", err)
                continue
            }

            if transformedMsg != nil {
                select {
                case output <- transformedMsg:
                case <-ctx.Done():
                    return ctx.Err()
                }
            }

        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

// Kafka 流处理应用
type KafkaStreamApp struct {
    consumer  *KafkaConsumerGroup
    producer  *AsyncKafkaProducer
    processor StreamProcessor
    inputChan chan *sarama.ConsumerMessage
    outputChan chan *sarama.ProducerMessage
    logger    *log.Logger
    ctx       context.Context
    cancel    context.CancelFunc
    wg        sync.WaitGroup
}

func NewKafkaStreamApp(consumerConfig *ConsumerConfig, producerConfig *ProducerConfig, processor StreamProcessor) (*KafkaStreamApp, error) {
    inputChan := make(chan *sarama.ConsumerMessage, 1000)
    outputChan := make(chan *sarama.ProducerMessage, 1000)

    // 创建消息处理器
    handler := MessageHandlerFunc(func(ctx context.Context, message *sarama.ConsumerMessage) error {
        select {
        case inputChan <- message:
            return nil
        case <-ctx.Done():
            return ctx.Err()
        default:
            return fmt.Errorf("输入通道已满")
        }
    })

    consumer, err := NewKafkaConsumerGroup(consumerConfig, handler)
    if err != nil {
        return nil, err
    }

    producer, err := NewAsyncKafkaProducer(producerConfig)
    if err != nil {
        return nil, err
    }

    ctx, cancel := context.WithCancel(context.Background())

    return &KafkaStreamApp{
        consumer:   consumer,
        producer:   producer,
        processor:  processor,
        inputChan:  inputChan,
        outputChan: outputChan,
        logger:     log.New(os.Stdout, "[KafkaStreamApp] ", log.LstdFlags),
        ctx:        ctx,
        cancel:     cancel,
    }, nil
}

func (ksa *KafkaStreamApp) Start() error {
    // 启动消费者
    if err := ksa.consumer.Start(); err != nil {
        return err
    }

    // 启动流处理器
    ksa.wg.Add(1)
    go func() {
        defer ksa.wg.Done()
        if err := ksa.processor.Process(ksa.ctx, ksa.inputChan, ksa.outputChan); err != nil {
            ksa.logger.Printf("流处理错误: %v", err)
        }
    }()

    // 启动输出处理器
    ksa.wg.Add(1)
    go func() {
        defer ksa.wg.Done()
        for {
            select {
            case msg := <-ksa.outputChan:
                ksa.producer.SendMessageAsync(msg.Topic,
                    string(msg.Key.(sarama.StringEncoder)),
                    msg.Value.(sarama.ByteEncoder), nil)
            case <-ksa.ctx.Done():
                return
            }
        }
    }()

    ksa.logger.Printf("Kafka流处理应用启动成功")
    return nil
}

func (ksa *KafkaStreamApp) Stop() error {
    ksa.cancel()
    ksa.wg.Wait()

    if err := ksa.consumer.Stop(); err != nil {
        ksa.logger.Printf("停止消费者失败: %v", err)
    }

    if err := ksa.producer.Close(); err != nil {
        ksa.logger.Printf("停止生产者失败: %v", err)
    }

    close(ksa.inputChan)
    close(ksa.outputChan)

    ksa.logger.Printf("Kafka流处理应用停止成功")
    return nil
}

Kafka 作为高性能的分布式消息系统,为微服务架构提供了可靠的消息传递能力。通过掌握 Kafka 的生产者、消费者和流处理开发,我们可以构建高吞吐量、低延迟的实时数据处理系统。在下一节中,我们将学习 RabbitMQ 的使用和开发。