5.1.4 微服务通信模式

5.1.4 微服务通信模式 #

微服务间的通信是分布式系统设计的核心问题。本节将深入探讨各种微服务通信模式,包括同步通信、异步消息传递、事件驱动架构等,并提供具体的 Go 语言实现。

通信模式分类 #

1. 同步 vs 异步通信 #

// 通信模式定义
type CommunicationPattern string

const (
    SynchronousPattern  CommunicationPattern = "synchronous"
    AsynchronousPattern CommunicationPattern = "asynchronous"
)

type CommunicationStyle struct {
    Pattern     CommunicationPattern
    Protocol    string
    Reliability string
    Latency     string
    Coupling    string
    UseCase     []string
}

// 通信模式对比
var communicationStyles = map[string]CommunicationStyle{
    "HTTP_REST": {
        Pattern:     SynchronousPattern,
        Protocol:    "HTTP/REST",
        Reliability: "Medium",
        Latency:     "Low",
        Coupling:    "High",
        UseCase:     []string{"查询操作", "实时交互", "简单CRUD"},
    },
    "gRPC": {
        Pattern:     SynchronousPattern,
        Protocol:    "gRPC/HTTP2",
        Reliability: "High",
        Latency:     "Very Low",
        Coupling:    "Medium",
        UseCase:     []string{"内部服务调用", "高性能通信", "类型安全"},
    },
    "Message_Queue": {
        Pattern:     AsynchronousPattern,
        Protocol:    "AMQP/Kafka",
        Reliability: "Very High",
        Latency:     "Medium",
        Coupling:    "Low",
        UseCase:     []string{"事件通知", "批量处理", "解耦服务"},
    },
    "Event_Streaming": {
        Pattern:     AsynchronousPattern,
        Protocol:    "Kafka/Pulsar",
        Reliability: "High",
        Latency:     "Low",
        Coupling:    "Very Low",
        UseCase:     []string{"实时数据流", "事件溯源", "CQRS"},
    },
}

同步通信模式 #

1. HTTP/REST 通信 #

// HTTP 客户端封装
type HTTPServiceClient struct {
    baseURL    string
    httpClient *http.Client
    timeout    time.Duration
    retryConfig RetryConfig
}

type RetryConfig struct {
    MaxAttempts int
    BaseDelay   time.Duration
    MaxDelay    time.Duration
    Multiplier  float64
}

func NewHTTPServiceClient(baseURL string, timeout time.Duration) *HTTPServiceClient {
    return &HTTPServiceClient{
        baseURL: baseURL,
        httpClient: &http.Client{
            Timeout: timeout,
            Transport: &http.Transport{
                MaxIdleConns:        100,
                MaxIdleConnsPerHost: 10,
                IdleConnTimeout:     90 * time.Second,
            },
        },
        timeout: timeout,
        retryConfig: RetryConfig{
            MaxAttempts: 3,
            BaseDelay:   100 * time.Millisecond,
            MaxDelay:    5 * time.Second,
            Multiplier:  2.0,
        },
    }
}

// 用户服务客户端
type UserServiceClient struct {
    *HTTPServiceClient
}

func NewUserServiceClient(baseURL string) *UserServiceClient {
    return &UserServiceClient{
        HTTPServiceClient: NewHTTPServiceClient(baseURL, 30*time.Second),
    }
}

func (c *UserServiceClient) GetUser(ctx context.Context, userID int64) (*User, error) {
    url := fmt.Sprintf("%s/users/%d", c.baseURL, userID)

    var user User
    err := c.doRequestWithRetry(ctx, "GET", url, nil, &user)
    if err != nil {
        return nil, fmt.Errorf("failed to get user %d: %w", userID, err)
    }

    return &user, nil
}

func (c *UserServiceClient) CreateUser(ctx context.Context, req CreateUserRequest) (*User, error) {
    url := fmt.Sprintf("%s/users", c.baseURL)

    var user User
    err := c.doRequestWithRetry(ctx, "POST", url, req, &user)
    if err != nil {
        return nil, fmt.Errorf("failed to create user: %w", err)
    }

    return &user, nil
}

func (c *HTTPServiceClient) doRequestWithRetry(ctx context.Context, method, url string, body interface{}, result interface{}) error {
    var lastErr error

    for attempt := 1; attempt <= c.retryConfig.MaxAttempts; attempt++ {
        err := c.doRequest(ctx, method, url, body, result)
        if err == nil {
            return nil
        }

        lastErr = err

        // 检查是否应该重试
        if !c.shouldRetry(err) {
            return err
        }

        if attempt < c.retryConfig.MaxAttempts {
            delay := c.calculateDelay(attempt)
            select {
            case <-time.After(delay):
            case <-ctx.Done():
                return ctx.Err()
            }
        }
    }

    return fmt.Errorf("request failed after %d attempts: %w", c.retryConfig.MaxAttempts, lastErr)
}

func (c *HTTPServiceClient) doRequest(ctx context.Context, method, url string, body interface{}, result interface{}) error {
    var reqBody io.Reader
    if body != nil {
        jsonData, err := json.Marshal(body)
        if err != nil {
            return fmt.Errorf("failed to marshal request body: %w", err)
        }
        reqBody = bytes.NewBuffer(jsonData)
    }

    req, err := http.NewRequestWithContext(ctx, method, url, reqBody)
    if err != nil {
        return fmt.Errorf("failed to create request: %w", err)
    }

    if body != nil {
        req.Header.Set("Content-Type", "application/json")
    }
    req.Header.Set("Accept", "application/json")

    resp, err := c.httpClient.Do(req)
    if err != nil {
        return fmt.Errorf("request failed: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode >= 400 {
        return &HTTPError{
            StatusCode: resp.StatusCode,
            Message:    resp.Status,
        }
    }

    if result != nil {
        if err := json.NewDecoder(resp.Body).Decode(result); err != nil {
            return fmt.Errorf("failed to decode response: %w", err)
        }
    }

    return nil
}

type HTTPError struct {
    StatusCode int
    Message    string
}

func (e *HTTPError) Error() string {
    return fmt.Sprintf("HTTP %d: %s", e.StatusCode, e.Message)
}

func (c *HTTPServiceClient) shouldRetry(err error) bool {
    var httpErr *HTTPError
    if errors.As(err, &httpErr) {
        // 5xx 错误可以重试
        return httpErr.StatusCode >= 500
    }

    // 网络错误可以重试
    var netErr net.Error
    if errors.As(err, &netErr) {
        return netErr.Timeout() || netErr.Temporary()
    }

    return false
}

func (c *HTTPServiceClient) calculateDelay(attempt int) time.Duration {
    delay := time.Duration(float64(c.retryConfig.BaseDelay) *
        math.Pow(c.retryConfig.Multiplier, float64(attempt-1)))

    if delay > c.retryConfig.MaxDelay {
        delay = c.retryConfig.MaxDelay
    }

    // 添加随机抖动
    jitter := time.Duration(rand.Float64() * float64(delay) * 0.1)
    return delay + jitter
}

2. gRPC 通信 #

// gRPC 服务定义 (user.proto)
/*
syntax = "proto3";

package user;

option go_package = "github.com/example/user/pb";

service UserService {
    rpc GetUser(GetUserRequest) returns (GetUserResponse);
    rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
    rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
}

message GetUserRequest {
    int64 user_id = 1;
}

message GetUserResponse {
    User user = 1;
}

message User {
    int64 id = 1;
    string username = 2;
    string email = 3;
    string status = 4;
    int64 created_at = 5;
}
*/

// gRPC 服务端实现
type UserGRPCServer struct {
    pb.UnimplementedUserServiceServer
    userService *UserService
}

func (s *UserGRPCServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    user, err := s.userService.GetUser(ctx, req.UserId)
    if err != nil {
        return nil, status.Errorf(codes.NotFound, "user not found: %v", err)
    }

    return &pb.GetUserResponse{
        User: &pb.User{
            Id:        user.ID,
            Username:  user.Username,
            Email:     user.Email,
            Status:    user.Status,
            CreatedAt: user.CreatedAt.Unix(),
        },
    }, nil
}

func (s *UserGRPCServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
    user, err := s.userService.CreateUser(ctx, CreateUserRequest{
        Username: req.Username,
        Email:    req.Email,
        Password: req.Password,
    })

    if err != nil {
        return nil, status.Errorf(codes.InvalidArgument, "failed to create user: %v", err)
    }

    return &pb.CreateUserResponse{
        User: &pb.User{
            Id:        user.ID,
            Username:  user.Username,
            Email:     user.Email,
            Status:    user.Status,
            CreatedAt: user.CreatedAt.Unix(),
        },
    }, nil
}

// gRPC 客户端封装
type UserGRPCClient struct {
    conn   *grpc.ClientConn
    client pb.UserServiceClient
}

func NewUserGRPCClient(address string) (*UserGRPCClient, error) {
    conn, err := grpc.Dial(address,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(
            grpc_retry.WithMax(3),
            grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
        )),
        grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
    )

    if err != nil {
        return nil, fmt.Errorf("failed to connect to user service: %w", err)
    }

    return &UserGRPCClient{
        conn:   conn,
        client: pb.NewUserServiceClient(conn),
    }, nil
}

func (c *UserGRPCClient) GetUser(ctx context.Context, userID int64) (*User, error) {
    resp, err := c.client.GetUser(ctx, &pb.GetUserRequest{
        UserId: userID,
    })

    if err != nil {
        return nil, fmt.Errorf("failed to get user: %w", err)
    }

    return &User{
        ID:        resp.User.Id,
        Username:  resp.User.Username,
        Email:     resp.User.Email,
        Status:    resp.User.Status,
        CreatedAt: time.Unix(resp.User.CreatedAt, 0),
    }, nil
}

func (c *UserGRPCClient) Close() error {
    return c.conn.Close()
}

3. 服务发现与负载均衡 #

// 服务发现接口
type ServiceDiscovery interface {
    Register(ctx context.Context, service ServiceInfo) error
    Deregister(ctx context.Context, serviceID string) error
    Discover(ctx context.Context, serviceName string) ([]ServiceInstance, error)
    Watch(ctx context.Context, serviceName string) (<-chan []ServiceInstance, error)
}

type ServiceInfo struct {
    ID       string            `json:"id"`
    Name     string            `json:"name"`
    Address  string            `json:"address"`
    Port     int               `json:"port"`
    Tags     []string          `json:"tags"`
    Metadata map[string]string `json:"metadata"`
}

type ServiceInstance struct {
    ID      string `json:"id"`
    Address string `json:"address"`
    Port    int    `json:"port"`
    Healthy bool   `json:"healthy"`
}

// 负载均衡器
type LoadBalancer interface {
    Select(instances []ServiceInstance) (*ServiceInstance, error)
}

// 轮询负载均衡
type RoundRobinLoadBalancer struct {
    counter uint64
}

func (lb *RoundRobinLoadBalancer) Select(instances []ServiceInstance) (*ServiceInstance, error) {
    if len(instances) == 0 {
        return nil, errors.New("no available instances")
    }

    // 过滤健康的实例
    healthy := make([]ServiceInstance, 0)
    for _, instance := range instances {
        if instance.Healthy {
            healthy = append(healthy, instance)
        }
    }

    if len(healthy) == 0 {
        return nil, errors.New("no healthy instances")
    }

    index := atomic.AddUint64(&lb.counter, 1) % uint64(len(healthy))
    return &healthy[index], nil
}

// 服务客户端工厂
type ServiceClientFactory struct {
    discovery    ServiceDiscovery
    loadBalancer LoadBalancer
    clients      sync.Map // serviceName -> *ServiceClient
}

type ServiceClient struct {
    serviceName  string
    discovery    ServiceDiscovery
    loadBalancer LoadBalancer
    httpClient   *http.Client
    instances    []ServiceInstance
    mu           sync.RWMutex
}

func NewServiceClientFactory(discovery ServiceDiscovery, loadBalancer LoadBalancer) *ServiceClientFactory {
    return &ServiceClientFactory{
        discovery:    discovery,
        loadBalancer: loadBalancer,
    }
}

func (f *ServiceClientFactory) GetClient(serviceName string) (*ServiceClient, error) {
    if client, ok := f.clients.Load(serviceName); ok {
        return client.(*ServiceClient), nil
    }

    client := &ServiceClient{
        serviceName:  serviceName,
        discovery:    f.discovery,
        loadBalancer: f.loadBalancer,
        httpClient: &http.Client{
            Timeout: 30 * time.Second,
        },
    }

    // 初始化服务实例
    if err := client.refreshInstances(); err != nil {
        return nil, err
    }

    // 启动实例监控
    go client.watchInstances()

    f.clients.Store(serviceName, client)
    return client, nil
}

func (c *ServiceClient) refreshInstances() error {
    instances, err := c.discovery.Discover(context.Background(), c.serviceName)
    if err != nil {
        return err
    }

    c.mu.Lock()
    c.instances = instances
    c.mu.Unlock()

    return nil
}

func (c *ServiceClient) watchInstances() {
    ctx := context.Background()
    watchCh, err := c.discovery.Watch(ctx, c.serviceName)
    if err != nil {
        log.Printf("Failed to watch service %s: %v", c.serviceName, err)
        return
    }

    for instances := range watchCh {
        c.mu.Lock()
        c.instances = instances
        c.mu.Unlock()
    }
}

func (c *ServiceClient) DoRequest(ctx context.Context, method, path string, body interface{}, result interface{}) error {
    c.mu.RLock()
    instances := c.instances
    c.mu.RUnlock()

    instance, err := c.loadBalancer.Select(instances)
    if err != nil {
        return fmt.Errorf("no available instance for service %s: %w", c.serviceName, err)
    }

    url := fmt.Sprintf("http://%s:%d%s", instance.Address, instance.Port, path)

    var reqBody io.Reader
    if body != nil {
        jsonData, err := json.Marshal(body)
        if err != nil {
            return err
        }
        reqBody = bytes.NewBuffer(jsonData)
    }

    req, err := http.NewRequestWithContext(ctx, method, url, reqBody)
    if err != nil {
        return err
    }

    if body != nil {
        req.Header.Set("Content-Type", "application/json")
    }

    resp, err := c.httpClient.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    if resp.StatusCode >= 400 {
        return fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status)
    }

    if result != nil {
        return json.NewDecoder(resp.Body).Decode(result)
    }

    return nil
}

异步通信模式 #

1. 消息队列通信 #

// 消息队列接口
type MessageQueue interface {
    Publish(ctx context.Context, topic string, message Message) error
    Subscribe(ctx context.Context, topic string, handler MessageHandler) error
    Close() error
}

type Message struct {
    ID        string            `json:"id"`
    Type      string            `json:"type"`
    Data      interface{}       `json:"data"`
    Headers   map[string]string `json:"headers"`
    Timestamp time.Time         `json:"timestamp"`
}

type MessageHandler func(ctx context.Context, message Message) error

// Kafka 消息队列实现
type KafkaMessageQueue struct {
    producer sarama.SyncProducer
    consumer sarama.ConsumerGroup
    config   *sarama.Config
}

func NewKafkaMessageQueue(brokers []string, groupID string) (*KafkaMessageQueue, error) {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
    config.Consumer.Offsets.Initial = sarama.OffsetNewest

    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        return nil, fmt.Errorf("failed to create producer: %w", err)
    }

    consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)
    if err != nil {
        producer.Close()
        return nil, fmt.Errorf("failed to create consumer: %w", err)
    }

    return &KafkaMessageQueue{
        producer: producer,
        consumer: consumer,
        config:   config,
    }, nil
}

func (k *KafkaMessageQueue) Publish(ctx context.Context, topic string, message Message) error {
    data, err := json.Marshal(message)
    if err != nil {
        return fmt.Errorf("failed to marshal message: %w", err)
    }

    msg := &sarama.ProducerMessage{
        Topic: topic,
        Key:   sarama.StringEncoder(message.ID),
        Value: sarama.ByteEncoder(data),
        Headers: []sarama.RecordHeader{
            {
                Key:   []byte("message-type"),
                Value: []byte(message.Type),
            },
        },
    }

    partition, offset, err := k.producer.SendMessage(msg)
    if err != nil {
        return fmt.Errorf("failed to send message: %w", err)
    }

    log.Printf("Message sent to partition %d at offset %d", partition, offset)
    return nil
}

func (k *KafkaMessageQueue) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
    consumerHandler := &kafkaConsumerHandler{
        handler: handler,
    }

    go func() {
        for {
            select {
            case <-ctx.Done():
                return
            default:
                if err := k.consumer.Consume(ctx, []string{topic}, consumerHandler); err != nil {
                    log.Printf("Consumer error: %v", err)
                }
            }
        }
    }()

    return nil
}

type kafkaConsumerHandler struct {
    handler MessageHandler
}

func (h *kafkaConsumerHandler) Setup(sarama.ConsumerGroupSession) error   { return nil }
func (h *kafkaConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }

func (h *kafkaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for {
        select {
        case message := <-claim.Messages():
            if message == nil {
                return nil
            }

            var msg Message
            if err := json.Unmarshal(message.Value, &msg); err != nil {
                log.Printf("Failed to unmarshal message: %v", err)
                session.MarkMessage(message, "")
                continue
            }

            ctx := context.Background()
            if err := h.handler(ctx, msg); err != nil {
                log.Printf("Failed to handle message: %v", err)
                // 这里可以实现重试逻辑或死信队列
            }

            session.MarkMessage(message, "")

        case <-session.Context().Done():
            return nil
        }
    }
}

func (k *KafkaMessageQueue) Close() error {
    if err := k.producer.Close(); err != nil {
        return err
    }
    return k.consumer.Close()
}

2. 事件驱动架构 #

// 事件总线
type EventBus interface {
    Publish(ctx context.Context, event DomainEvent) error
    Subscribe(eventType string, handler EventHandler) error
    Unsubscribe(eventType string, handler EventHandler) error
}

type DomainEvent interface {
    EventID() string
    EventType() string
    AggregateID() string
    OccurredOn() time.Time
    EventData() interface{}
}

type EventHandler interface {
    Handle(ctx context.Context, event DomainEvent) error
    EventType() string
}

// 内存事件总线实现
type InMemoryEventBus struct {
    handlers map[string][]EventHandler
    mu       sync.RWMutex
}

func NewInMemoryEventBus() *InMemoryEventBus {
    return &InMemoryEventBus{
        handlers: make(map[string][]EventHandler),
    }
}

func (bus *InMemoryEventBus) Publish(ctx context.Context, event DomainEvent) error {
    bus.mu.RLock()
    handlers, exists := bus.handlers[event.EventType()]
    bus.mu.RUnlock()

    if !exists {
        return nil // 没有订阅者
    }

    // 并发处理事件
    var wg sync.WaitGroup
    for _, handler := range handlers {
        wg.Add(1)
        go func(h EventHandler) {
            defer wg.Done()
            if err := h.Handle(ctx, event); err != nil {
                log.Printf("Event handler error: %v", err)
            }
        }(handler)
    }

    wg.Wait()
    return nil
}

func (bus *InMemoryEventBus) Subscribe(eventType string, handler EventHandler) error {
    bus.mu.Lock()
    defer bus.mu.Unlock()

    bus.handlers[eventType] = append(bus.handlers[eventType], handler)
    return nil
}

// 分布式事件总线实现
type DistributedEventBus struct {
    messageQueue MessageQueue
    handlers     map[string][]EventHandler
    mu           sync.RWMutex
}

func NewDistributedEventBus(messageQueue MessageQueue) *DistributedEventBus {
    return &DistributedEventBus{
        messageQueue: messageQueue,
        handlers:     make(map[string][]EventHandler),
    }
}

func (bus *DistributedEventBus) Publish(ctx context.Context, event DomainEvent) error {
    message := Message{
        ID:        event.EventID(),
        Type:      event.EventType(),
        Data:      event.EventData(),
        Timestamp: event.OccurredOn(),
        Headers: map[string]string{
            "aggregate-id": event.AggregateID(),
        },
    }

    topic := fmt.Sprintf("events.%s", event.EventType())
    return bus.messageQueue.Publish(ctx, topic, message)
}

func (bus *DistributedEventBus) Subscribe(eventType string, handler EventHandler) error {
    bus.mu.Lock()
    bus.handlers[eventType] = append(bus.handlers[eventType], handler)
    bus.mu.Unlock()

    topic := fmt.Sprintf("events.%s", eventType)
    return bus.messageQueue.Subscribe(context.Background(), topic, func(ctx context.Context, message Message) error {
        // 重构消息为领域事件
        event := &GenericDomainEvent{
            id:          message.ID,
            eventType:   message.Type,
            aggregateID: message.Headers["aggregate-id"],
            occurredOn:  message.Timestamp,
            data:        message.Data,
        }

        bus.mu.RLock()
        handlers := bus.handlers[eventType]
        bus.mu.RUnlock()

        for _, h := range handlers {
            if err := h.Handle(ctx, event); err != nil {
                return err
            }
        }

        return nil
    })
}

// 通用领域事件实现
type GenericDomainEvent struct {
    id          string
    eventType   string
    aggregateID string
    occurredOn  time.Time
    data        interface{}
}

func (e *GenericDomainEvent) EventID() string        { return e.id }
func (e *GenericDomainEvent) EventType() string      { return e.eventType }
func (e *GenericDomainEvent) AggregateID() string    { return e.aggregateID }
func (e *GenericDomainEvent) OccurredOn() time.Time  { return e.occurredOn }
func (e *GenericDomainEvent) EventData() interface{} { return e.data }

3. 事件处理器实现 #

// 订单创建事件处理器
type OrderCreatedEventHandler struct {
    inventoryService InventoryService
    emailService     EmailService
    logger          *log.Logger
}

func (h *OrderCreatedEventHandler) Handle(ctx context.Context, event DomainEvent) error {
    orderData, ok := event.EventData().(OrderCreatedEventData)
    if !ok {
        return errors.New("invalid event data type")
    }

    // 预留库存
    for _, item := range orderData.Items {
        if err := h.inventoryService.ReserveInventory(ctx, ReserveInventoryRequest{
            ProductID: item.ProductID,
            Quantity:  item.Quantity,
            OrderID:   orderData.OrderID,
        }); err != nil {
            h.logger.Errorf("Failed to reserve inventory for product %s: %v", item.ProductID, err)
            // 这里可以发布库存预留失败事件
            return err
        }
    }

    // 发送订单确认邮件
    if err := h.emailService.SendOrderConfirmation(ctx, SendEmailRequest{
        To:      orderData.CustomerEmail,
        OrderID: orderData.OrderID,
        Items:   orderData.Items,
    }); err != nil {
        h.logger.Errorf("Failed to send order confirmation email: %v", err)
        // 邮件发送失败不影响主流程
    }

    return nil
}

func (h *OrderCreatedEventHandler) EventType() string {
    return "OrderCreated"
}

type OrderCreatedEventData struct {
    OrderID       string      `json:"order_id"`
    CustomerID    string      `json:"customer_id"`
    CustomerEmail string      `json:"customer_email"`
    Items         []OrderItem `json:"items"`
    TotalAmount   float64     `json:"total_amount"`
}

// 库存变更事件处理器
type InventoryChangedEventHandler struct {
    productService ProductService
    logger        *log.Logger
}

func (h *InventoryChangedEventHandler) Handle(ctx context.Context, event DomainEvent) error {
    inventoryData, ok := event.EventData().(InventoryChangedEventData)
    if !ok {
        return errors.New("invalid event data type")
    }

    // 更新商品可用性状态
    if inventoryData.NewQuantity <= 0 {
        if err := h.productService.UpdateProductStatus(ctx, inventoryData.ProductID, "out_of_stock"); err != nil {
            h.logger.Errorf("Failed to update product status: %v", err)
            return err
        }
    } else if inventoryData.PreviousQuantity <= 0 && inventoryData.NewQuantity > 0 {
        if err := h.productService.UpdateProductStatus(ctx, inventoryData.ProductID, "available"); err != nil {
            h.logger.Errorf("Failed to update product status: %v", err)
            return err
        }
    }

    return nil
}

func (h *InventoryChangedEventHandler) EventType() string {
    return "InventoryChanged"
}

type InventoryChangedEventData struct {
    ProductID        string `json:"product_id"`
    WarehouseID      string `json:"warehouse_id"`
    PreviousQuantity int    `json:"previous_quantity"`
    NewQuantity      int    `json:"new_quantity"`
    ChangeReason     string `json:"change_reason"`
}

通信模式选择指南 #

1. 模式选择矩阵 #

// 通信模式选择器
type CommunicationPatternSelector struct {
    patterns map[string]PatternScore
}

type PatternScore struct {
    Pattern     string
    Score       int
    Reasons     []string
    Limitations []string
}

type SelectionCriteria struct {
    Consistency    string // "strong", "eventual"
    Latency        string // "low", "medium", "high"
    Reliability    string // "low", "medium", "high"
    Coupling       string // "loose", "tight"
    DataVolume     string // "low", "medium", "high"
    TransactionScope string // "single", "distributed"
}

func (s *CommunicationPatternSelector) SelectPattern(criteria SelectionCriteria) string {
    scores := make(map[string]int)

    // 一致性要求评分
    if criteria.Consistency == "strong" {
        scores["HTTP_REST"] += 3
        scores["gRPC"] += 4
        scores["Message_Queue"] += 1
    } else {
        scores["Message_Queue"] += 4
        scores["Event_Streaming"] += 4
        scores["HTTP_REST"] += 2
    }

    // 延迟要求评分
    if criteria.Latency == "low" {
        scores["gRPC"] += 4
        scores["HTTP_REST"] += 3
        scores["Message_Queue"] += 2
    }

    // 可靠性要求评分
    if criteria.Reliability == "high" {
        scores["Message_Queue"] += 4
        scores["Event_Streaming"] += 4
        scores["gRPC"] += 3
    }

    // 耦合度要求评分
    if criteria.Coupling == "loose" {
        scores["Message_Queue"] += 4
        scores["Event_Streaming"] += 4
        scores["HTTP_REST"] += 1
    }

    // 找出最高分的模式
    maxScore := 0
    selectedPattern := ""
    for pattern, score := range scores {
        if score > maxScore {
            maxScore = score
            selectedPattern = pattern
        }
    }

    return selectedPattern
}

// 使用示例
func selectCommunicationPattern() {
    selector := &CommunicationPatternSelector{}

    // 查询用户信息的场景
    userQueryCriteria := SelectionCriteria{
        Consistency:      "strong",
        Latency:         "low",
        Reliability:     "medium",
        Coupling:        "tight",
        DataVolume:      "low",
        TransactionScope: "single",
    }

    pattern := selector.SelectPattern(userQueryCriteria)
    fmt.Printf("用户查询推荐模式: %s\n", pattern) // 输出: gRPC

    // 订单状态变更通知的场景
    orderNotificationCriteria := SelectionCriteria{
        Consistency:      "eventual",
        Latency:         "medium",
        Reliability:     "high",
        Coupling:        "loose",
        DataVolume:      "medium",
        TransactionScope: "distributed",
    }

    pattern = selector.SelectPattern(orderNotificationCriteria)
    fmt.Printf("订单通知推荐模式: %s\n", pattern) // 输出: Message_Queue
}

2. 混合通信架构 #

// 混合通信架构实现
type HybridCommunicationManager struct {
    httpClients  map[string]*HTTPServiceClient
    grpcClients  map[string]*UserGRPCClient
    eventBus     EventBus
    messageQueue MessageQueue
}

func NewHybridCommunicationManager() *HybridCommunicationManager {
    return &HybridCommunicationManager{
        httpClients: make(map[string]*HTTPServiceClient),
        grpcClients: make(map[string]*UserGRPCClient),
    }
}

// 同步查询操作使用 gRPC
func (m *HybridCommunicationManager) QueryUser(ctx context.Context, userID int64) (*User, error) {
    client := m.grpcClients["user-service"]
    return client.GetUser(ctx, userID)
}

// 异步事件通知使用消息队列
func (m *HybridCommunicationManager) NotifyOrderCreated(ctx context.Context, order *Order) error {
    event := OrderCreatedEvent{
        OrderID:    order.ID,
        CustomerID: order.CustomerID,
        Items:      order.Items,
        Timestamp:  time.Now(),
    }

    return m.eventBus.Publish(ctx, event)
}

// 批量数据同步使用 HTTP REST
func (m *HybridCommunicationManager) SyncProducts(ctx context.Context, products []Product) error {
    client := m.httpClients["product-service"]

    return client.doRequestWithRetry(ctx, "POST", "/products/batch", products, nil)
}

通过本节的学习,我们深入了解了微服务间的各种通信模式和实现方法。选择合适的通信模式需要综合考虑一致性、延迟、可靠性、耦合度等多个因素。在实际项目中,往往需要采用混合的通信架构来满足不同场景的需求。

至此,我们完成了第 5.1 节微服务架构设计的全部内容,涵盖了微服务的核心原理、拆分策略、领域驱动设计和通信模式。这些知识为后续学习容器化、服务发现、配置管理等云原生技术奠定了坚实的基础。