5.6.2 Kafka 使用与开发 #
Apache Kafka 是一个高吞吐量的分布式消息系统,广泛应用于大数据处理、实时流处理和微服务架构中。本节将深入介绍 Kafka 的核心概念、架构设计,以及如何在 Go 语言中进行 Kafka 的开发和应用。
Kafka 核心概念 #
Kafka 架构组件 #
// Kafka 集群配置
type KafkaClusterConfig struct {
Brokers []string `json:"brokers"`
SecurityProtocol string `json:"security_protocol"`
SASLMechanism string `json:"sasl_mechanism"`
SASLUsername string `json:"sasl_username"`
SASLPassword string `json:"sasl_password"`
SSLConfig *SSLConfig `json:"ssl_config,omitempty"`
ClientID string `json:"client_id"`
Metadata map[string]string `json:"metadata"`
}
type SSLConfig struct {
CertFile string `json:"cert_file"`
KeyFile string `json:"key_file"`
CAFile string `json:"ca_file"`
SkipVerify bool `json:"skip_verify"`
}
// Topic 配置
type TopicConfig struct {
Name string `json:"name"`
NumPartitions int32 `json:"num_partitions"`
ReplicationFactor int16 `json:"replication_factor"`
ConfigEntries map[string]string `json:"config_entries"`
}
// 分区信息
type PartitionInfo struct {
Topic string `json:"topic"`
Partition int32 `json:"partition"`
Leader int32 `json:"leader"`
Replicas []int32 `json:"replicas"`
ISR []int32 `json:"isr"` // In-Sync Replicas
}
// Kafka 管理客户端
type KafkaAdmin struct {
client sarama.Client
admin sarama.ClusterAdmin
config *sarama.Config
}
func NewKafkaAdmin(brokers []string, config *KafkaClusterConfig) (*KafkaAdmin, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_6_0_0
saramaConfig.ClientID = config.ClientID
// 配置安全设置
if err := configureSecurity(saramaConfig, config); err != nil {
return nil, err
}
client, err := sarama.NewClient(brokers, saramaConfig)
if err != nil {
return nil, err
}
admin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
client.Close()
return nil, err
}
return &KafkaAdmin{
client: client,
admin: admin,
config: saramaConfig,
}, nil
}
func (ka *KafkaAdmin) CreateTopic(topicConfig *TopicConfig) error {
topicDetail := &sarama.TopicDetail{
NumPartitions: topicConfig.NumPartitions,
ReplicationFactor: topicConfig.ReplicationFactor,
ConfigEntries: topicConfig.ConfigEntries,
}
err := ka.admin.CreateTopic(topicConfig.Name, topicDetail, false)
if err != nil {
return fmt.Errorf("创建主题失败: %v", err)
}
return nil
}
func (ka *KafkaAdmin) ListTopics() (map[string]sarama.TopicDetail, error) {
return ka.admin.ListTopics()
}
func (ka *KafkaAdmin) DeleteTopic(topic string) error {
return ka.admin.DeleteTopic(topic)
}
func (ka *KafkaAdmin) GetPartitionInfo(topic string) ([]*PartitionInfo, error) {
partitions, err := ka.client.Partitions(topic)
if err != nil {
return nil, err
}
var partitionInfos []*PartitionInfo
for _, partition := range partitions {
leader, err := ka.client.Leader(topic, partition)
if err != nil {
continue
}
replicas, err := ka.client.Replicas(topic, partition)
if err != nil {
continue
}
isr, err := ka.client.InSyncReplicas(topic, partition)
if err != nil {
continue
}
partitionInfos = append(partitionInfos, &PartitionInfo{
Topic: topic,
Partition: partition,
Leader: leader.ID(),
Replicas: replicas,
ISR: isr,
})
}
return partitionInfos, nil
}
Kafka 生产者开发 #
同步生产者 #
// Kafka 生产者配置
type ProducerConfig struct {
Brokers []string `json:"brokers"`
ClientID string `json:"client_id"`
Acks sarama.RequiredAcks `json:"acks"`
Compression sarama.CompressionCodec `json:"compression"`
MaxMessageBytes int `json:"max_message_bytes"`
RetryMax int `json:"retry_max"`
RetryBackoff time.Duration `json:"retry_backoff"`
FlushFrequency time.Duration `json:"flush_frequency"`
FlushMessages int `json:"flush_messages"`
FlushBytes int `json:"flush_bytes"`
Partitioner string `json:"partitioner"`
Idempotent bool `json:"idempotent"`
SecurityConfig *KafkaClusterConfig `json:"security_config,omitempty"`
}
// Kafka 生产者
type KafkaProducer struct {
producer sarama.SyncProducer
config *ProducerConfig
partitioner sarama.Partitioner
logger *log.Logger
}
func NewKafkaProducer(config *ProducerConfig) (*KafkaProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_6_0_0
saramaConfig.ClientID = config.ClientID
// 生产者配置
saramaConfig.Producer.RequiredAcks = config.Acks
saramaConfig.Producer.Compression = config.Compression
saramaConfig.Producer.MaxMessageBytes = config.MaxMessageBytes
saramaConfig.Producer.Retry.Max = config.RetryMax
saramaConfig.Producer.Retry.Backoff = config.RetryBackoff
saramaConfig.Producer.Flush.Frequency = config.FlushFrequency
saramaConfig.Producer.Flush.Messages = config.FlushMessages
saramaConfig.Producer.Flush.Bytes = config.FlushBytes
saramaConfig.Producer.Idempotent = config.Idempotent
// 配置分区器
switch config.Partitioner {
case "hash":
saramaConfig.Producer.Partitioner = sarama.NewHashPartitioner
case "random":
saramaConfig.Producer.Partitioner = sarama.NewRandomPartitioner
case "roundrobin":
saramaConfig.Producer.Partitioner = sarama.NewRoundRobinPartitioner
default:
saramaConfig.Producer.Partitioner = sarama.NewHashPartitioner
}
// 配置安全设置
if config.SecurityConfig != nil {
if err := configureSecurity(saramaConfig, config.SecurityConfig); err != nil {
return nil, err
}
}
producer, err := sarama.NewSyncProducer(config.Brokers, saramaConfig)
if err != nil {
return nil, err
}
return &KafkaProducer{
producer: producer,
config: config,
logger: log.New(os.Stdout, "[KafkaProducer] ", log.LstdFlags),
}, nil
}
func (kp *KafkaProducer) SendMessage(topic, key string, value []byte, headers map[string]string) (*sarama.ProducerMessage, error) {
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(value),
}
if key != "" {
msg.Key = sarama.StringEncoder(key)
}
// 添加消息头
if len(headers) > 0 {
msg.Headers = make([]sarama.RecordHeader, 0, len(headers))
for k, v := range headers {
msg.Headers = append(msg.Headers, sarama.RecordHeader{
Key: []byte(k),
Value: []byte(v),
})
}
}
// 添加时间戳
msg.Timestamp = time.Now()
partition, offset, err := kp.producer.SendMessage(msg)
if err != nil {
kp.logger.Printf("发送消息失败: %v", err)
return nil, err
}
msg.Partition = partition
msg.Offset = offset
kp.logger.Printf("消息发送成功: topic=%s, partition=%d, offset=%d", topic, partition, offset)
return msg, nil
}
func (kp *KafkaProducer) SendMessages(messages []*sarama.ProducerMessage) error {
return kp.producer.SendMessages(messages)
}
func (kp *KafkaProducer) Close() error {
return kp.producer.Close()
}
异步生产者 #
// 异步 Kafka 生产者
type AsyncKafkaProducer struct {
producer sarama.AsyncProducer
config *ProducerConfig
logger *log.Logger
wg sync.WaitGroup
closed chan struct{}
}
func NewAsyncKafkaProducer(config *ProducerConfig) (*AsyncKafkaProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_6_0_0
saramaConfig.ClientID = config.ClientID
// 异步生产者配置
saramaConfig.Producer.RequiredAcks = config.Acks
saramaConfig.Producer.Compression = config.Compression
saramaConfig.Producer.Return.Successes = true
saramaConfig.Producer.Return.Errors = true
// 配置安全设置
if config.SecurityConfig != nil {
if err := configureSecurity(saramaConfig, config.SecurityConfig); err != nil {
return nil, err
}
}
producer, err := sarama.NewAsyncProducer(config.Brokers, saramaConfig)
if err != nil {
return nil, err
}
asyncProducer := &AsyncKafkaProducer{
producer: producer,
config: config,
logger: log.New(os.Stdout, "[AsyncKafkaProducer] ", log.LstdFlags),
closed: make(chan struct{}),
}
// 启动消息处理协程
asyncProducer.wg.Add(2)
go asyncProducer.handleSuccesses()
go asyncProducer.handleErrors()
return asyncProducer, nil
}
func (akp *AsyncKafkaProducer) SendMessageAsync(topic, key string, value []byte, headers map[string]string) {
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(value),
Timestamp: time.Now(),
}
if key != "" {
msg.Key = sarama.StringEncoder(key)
}
if len(headers) > 0 {
msg.Headers = make([]sarama.RecordHeader, 0, len(headers))
for k, v := range headers {
msg.Headers = append(msg.Headers, sarama.RecordHeader{
Key: []byte(k),
Value: []byte(v),
})
}
}
akp.producer.Input() <- msg
}
func (akp *AsyncKafkaProducer) handleSuccesses() {
defer akp.wg.Done()
for {
select {
case msg := <-akp.producer.Successes():
akp.logger.Printf("消息发送成功: topic=%s, partition=%d, offset=%d",
msg.Topic, msg.Partition, msg.Offset)
case <-akp.closed:
return
}
}
}
func (akp *AsyncKafkaProducer) handleErrors() {
defer akp.wg.Done()
for {
select {
case err := <-akp.producer.Errors():
akp.logger.Printf("消息发送失败: topic=%s, partition=%d, error=%v",
err.Msg.Topic, err.Msg.Partition, err.Err)
case <-akp.closed:
return
}
}
}
func (akp *AsyncKafkaProducer) Close() error {
close(akp.closed)
akp.producer.AsyncClose()
akp.wg.Wait()
return nil
}
Kafka 消费者开发 #
消费者组 #
// Kafka 消费者配置
type ConsumerConfig struct {
Brokers []string `json:"brokers"`
GroupID string `json:"group_id"`
ClientID string `json:"client_id"`
Topics []string `json:"topics"`
AutoOffsetReset string `json:"auto_offset_reset"`
EnableAutoCommit bool `json:"enable_auto_commit"`
AutoCommitInterval time.Duration `json:"auto_commit_interval"`
SessionTimeout time.Duration `json:"session_timeout"`
HeartbeatInterval time.Duration `json:"heartbeat_interval"`
MaxProcessingTime time.Duration `json:"max_processing_time"`
SecurityConfig *KafkaClusterConfig `json:"security_config,omitempty"`
}
// 消息处理器接口
type MessageHandler interface {
HandleMessage(ctx context.Context, message *sarama.ConsumerMessage) error
}
// 函数式消息处理器
type MessageHandlerFunc func(ctx context.Context, message *sarama.ConsumerMessage) error
func (f MessageHandlerFunc) HandleMessage(ctx context.Context, message *sarama.ConsumerMessage) error {
return f(ctx, message)
}
// Kafka 消费者组
type KafkaConsumerGroup struct {
consumerGroup sarama.ConsumerGroup
config *ConsumerConfig
handler MessageHandler
logger *log.Logger
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewKafkaConsumerGroup(config *ConsumerConfig, handler MessageHandler) (*KafkaConsumerGroup, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_6_0_0
saramaConfig.ClientID = config.ClientID
saramaConfig.Consumer.Group.Session.Timeout = config.SessionTimeout
saramaConfig.Consumer.Group.Heartbeat.Interval = config.HeartbeatInterval
saramaConfig.Consumer.MaxProcessingTime = config.MaxProcessingTime
saramaConfig.Consumer.Return.Errors = true
// 配置偏移量重置策略
switch config.AutoOffsetReset {
case "earliest":
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
case "latest":
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
default:
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
}
// 配置自动提交
if config.EnableAutoCommit {
saramaConfig.Consumer.Offsets.AutoCommit.Enable = true
saramaConfig.Consumer.Offsets.AutoCommit.Interval = config.AutoCommitInterval
}
// 配置安全设置
if config.SecurityConfig != nil {
if err := configureSecurity(saramaConfig, config.SecurityConfig); err != nil {
return nil, err
}
}
consumerGroup, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, saramaConfig)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
kcg := &KafkaConsumerGroup{
consumerGroup: consumerGroup,
config: config,
handler: handler,
logger: log.New(os.Stdout, "[KafkaConsumerGroup] ", log.LstdFlags),
ctx: ctx,
cancel: cancel,
}
return kcg, nil
}
func (kcg *KafkaConsumerGroup) Start() error {
kcg.wg.Add(2)
// 启动消费协程
go func() {
defer kcg.wg.Done()
for {
if err := kcg.consumerGroup.Consume(kcg.ctx, kcg.config.Topics, kcg); err != nil {
kcg.logger.Printf("消费错误: %v", err)
return
}
if kcg.ctx.Err() != nil {
return
}
}
}()
// 启动错误处理协程
go func() {
defer kcg.wg.Done()
for {
select {
case err := <-kcg.consumerGroup.Errors():
kcg.logger.Printf("消费者组错误: %v", err)
case <-kcg.ctx.Done():
return
}
}
}()
return nil
}
func (kcg *KafkaConsumerGroup) Stop() error {
kcg.cancel()
kcg.wg.Wait()
return kcg.consumerGroup.Close()
}
// 实现 sarama.ConsumerGroupHandler 接口
func (kcg *KafkaConsumerGroup) Setup(sarama.ConsumerGroupSession) error {
kcg.logger.Printf("消费者组会话开始")
return nil
}
func (kcg *KafkaConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error {
kcg.logger.Printf("消费者组会话结束")
return nil
}
func (kcg *KafkaConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message := <-claim.Messages():
if message == nil {
return nil
}
// 处理消息
if err := kcg.handler.HandleMessage(kcg.ctx, message); err != nil {
kcg.logger.Printf("消息处理失败: %v", err)
// 根据配置决定是否继续处理
continue
}
// 手动提交偏移量
if !kcg.config.EnableAutoCommit {
session.MarkMessage(message, "")
}
case <-kcg.ctx.Done():
return nil
}
}
}
手动分区消费 #
// 手动分区消费者
type ManualPartitionConsumer struct {
consumer sarama.Consumer
partitions map[string][]sarama.PartitionConsumer
config *ConsumerConfig
handler MessageHandler
logger *log.Logger
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewManualPartitionConsumer(config *ConsumerConfig, handler MessageHandler) (*ManualPartitionConsumer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_6_0_0
saramaConfig.ClientID = config.ClientID
// 配置安全设置
if config.SecurityConfig != nil {
if err := configureSecurity(saramaConfig, config.SecurityConfig); err != nil {
return nil, err
}
}
consumer, err := sarama.NewConsumer(config.Brokers, saramaConfig)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
mpc := &ManualPartitionConsumer{
consumer: consumer,
partitions: make(map[string][]sarama.PartitionConsumer),
config: config,
handler: handler,
logger: log.New(os.Stdout, "[ManualPartitionConsumer] ", log.LstdFlags),
ctx: ctx,
cancel: cancel,
}
return mpc, nil
}
func (mpc *ManualPartitionConsumer) Start() error {
for _, topic := range mpc.config.Topics {
partitions, err := mpc.consumer.Partitions(topic)
if err != nil {
return err
}
mpc.partitions[topic] = make([]sarama.PartitionConsumer, len(partitions))
for i, partition := range partitions {
var offset int64
switch mpc.config.AutoOffsetReset {
case "earliest":
offset = sarama.OffsetOldest
case "latest":
offset = sarama.OffsetNewest
default:
offset = sarama.OffsetNewest
}
pc, err := mpc.consumer.ConsumePartition(topic, partition, offset)
if err != nil {
return err
}
mpc.partitions[topic][i] = pc
// 为每个分区启动消费协程
mpc.wg.Add(1)
go mpc.consumePartition(topic, partition, pc)
}
}
return nil
}
func (mpc *ManualPartitionConsumer) consumePartition(topic string, partition int32, pc sarama.PartitionConsumer) {
defer mpc.wg.Done()
defer pc.Close()
for {
select {
case message := <-pc.Messages():
if message == nil {
return
}
if err := mpc.handler.HandleMessage(mpc.ctx, message); err != nil {
mpc.logger.Printf("消息处理失败: topic=%s, partition=%d, offset=%d, error=%v",
topic, partition, message.Offset, err)
}
case err := <-pc.Errors():
mpc.logger.Printf("分区消费错误: topic=%s, partition=%d, error=%v",
topic, partition, err)
case <-mpc.ctx.Done():
return
}
}
}
func (mpc *ManualPartitionConsumer) Stop() error {
mpc.cancel()
mpc.wg.Wait()
return mpc.consumer.Close()
}
Kafka 流处理 #
简单流处理器 #
// 流处理器接口
type StreamProcessor interface {
Process(ctx context.Context, input <-chan *sarama.ConsumerMessage, output chan<- *sarama.ProducerMessage) error
}
// 消息转换器
type MessageTransformer func(*sarama.ConsumerMessage) (*sarama.ProducerMessage, error)
// 简单流处理器实现
type SimpleStreamProcessor struct {
transformer MessageTransformer
logger *log.Logger
}
func NewSimpleStreamProcessor(transformer MessageTransformer) *SimpleStreamProcessor {
return &SimpleStreamProcessor{
transformer: transformer,
logger: log.New(os.Stdout, "[StreamProcessor] ", log.LstdFlags),
}
}
func (ssp *SimpleStreamProcessor) Process(ctx context.Context, input <-chan *sarama.ConsumerMessage, output chan<- *sarama.ProducerMessage) error {
for {
select {
case msg := <-input:
if msg == nil {
return nil
}
// 转换消息
transformedMsg, err := ssp.transformer(msg)
if err != nil {
ssp.logger.Printf("消息转换失败: %v", err)
continue
}
if transformedMsg != nil {
select {
case output <- transformedMsg:
case <-ctx.Done():
return ctx.Err()
}
}
case <-ctx.Done():
return ctx.Err()
}
}
}
// Kafka 流处理应用
type KafkaStreamApp struct {
consumer *KafkaConsumerGroup
producer *AsyncKafkaProducer
processor StreamProcessor
inputChan chan *sarama.ConsumerMessage
outputChan chan *sarama.ProducerMessage
logger *log.Logger
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewKafkaStreamApp(consumerConfig *ConsumerConfig, producerConfig *ProducerConfig, processor StreamProcessor) (*KafkaStreamApp, error) {
inputChan := make(chan *sarama.ConsumerMessage, 1000)
outputChan := make(chan *sarama.ProducerMessage, 1000)
// 创建消息处理器
handler := MessageHandlerFunc(func(ctx context.Context, message *sarama.ConsumerMessage) error {
select {
case inputChan <- message:
return nil
case <-ctx.Done():
return ctx.Err()
default:
return fmt.Errorf("输入通道已满")
}
})
consumer, err := NewKafkaConsumerGroup(consumerConfig, handler)
if err != nil {
return nil, err
}
producer, err := NewAsyncKafkaProducer(producerConfig)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
return &KafkaStreamApp{
consumer: consumer,
producer: producer,
processor: processor,
inputChan: inputChan,
outputChan: outputChan,
logger: log.New(os.Stdout, "[KafkaStreamApp] ", log.LstdFlags),
ctx: ctx,
cancel: cancel,
}, nil
}
func (ksa *KafkaStreamApp) Start() error {
// 启动消费者
if err := ksa.consumer.Start(); err != nil {
return err
}
// 启动流处理器
ksa.wg.Add(1)
go func() {
defer ksa.wg.Done()
if err := ksa.processor.Process(ksa.ctx, ksa.inputChan, ksa.outputChan); err != nil {
ksa.logger.Printf("流处理错误: %v", err)
}
}()
// 启动输出处理器
ksa.wg.Add(1)
go func() {
defer ksa.wg.Done()
for {
select {
case msg := <-ksa.outputChan:
ksa.producer.SendMessageAsync(msg.Topic,
string(msg.Key.(sarama.StringEncoder)),
msg.Value.(sarama.ByteEncoder), nil)
case <-ksa.ctx.Done():
return
}
}
}()
ksa.logger.Printf("Kafka流处理应用启动成功")
return nil
}
func (ksa *KafkaStreamApp) Stop() error {
ksa.cancel()
ksa.wg.Wait()
if err := ksa.consumer.Stop(); err != nil {
ksa.logger.Printf("停止消费者失败: %v", err)
}
if err := ksa.producer.Close(); err != nil {
ksa.logger.Printf("停止生产者失败: %v", err)
}
close(ksa.inputChan)
close(ksa.outputChan)
ksa.logger.Printf("Kafka流处理应用停止成功")
return nil
}
Kafka 作为高性能的分布式消息系统,为微服务架构提供了可靠的消息传递能力。通过掌握 Kafka 的生产者、消费者和流处理开发,我们可以构建高吞吐量、低延迟的实时数据处理系统。在下一节中,我们将学习 RabbitMQ 的使用和开发。