5.9.5 综合练习:电商微服务系统

5.9.5 综合练习:电商微服务系统 #

本节将把前面开发的用户服务、商品服务、订单服务等组合成一个完整的电商微服务系统,包括服务集成、部署、监控、测试等各个方面。

系统集成架构 #

整体架构图 #

                    ┌─────────────────────────────────────┐
                    │            Load Balancer            │
                    └─────────────────┬───────────────────┘
                                      │
                    ┌─────────────────┴───────────────────┐
                    │            API Gateway              │
                    │         (Kong/Envoy)                │
                    └─────────────────┬───────────────────┘
                                      │
        ┌─────────────────────────────┼─────────────────────────────┐
        │                             │                             │
┌───────▼────────┐         ┌─────────▼────────┐         ┌─────────▼────────┐
│  User Service  │         │ Product Service  │         │ Order Service    │
│   (Port 8001)  │         │   (Port 8002)    │         │   (Port 8003)    │
└───────┬────────┘         └─────────┬────────┘         └─────────┬────────┘
        │                            │                            │
        │                            │                            │
┌───────▼────────┐         ┌─────────▼────────┐         ┌─────────▼────────┐
│   PostgreSQL   │         │   PostgreSQL     │         │   PostgreSQL     │
│   (User DB)    │         │  (Product DB)    │         │   (Order DB)     │
└────────────────┘         └──────────────────┘         └──────────────────┘

                    ┌─────────────────────────────────────┐
                    │         Infrastructure              │
                    │                                     │
                    │  ┌─────────┐  ┌─────────┐  ┌──────┐ │
                    │  │  Redis  │  │  Kafka  │  │ Consul│ │
                    │  │ (Cache) │  │(Message)│  │(Disc.)│ │
                    │  └─────────┘  └─────────┘  └──────┘ │
                    │                                     │
                    │  ┌─────────┐  ┌─────────┐  ┌──────┐ │
                    │  │Prometheus│ │ Jaeger  │  │ ELK  │ │
                    │  │(Metrics)│  │(Tracing)│  │(Logs)│ │
                    │  └─────────┘  └─────────┘  └──────┘ │
                    └─────────────────────────────────────┘

服务通信模式 #

// pkg/communication/service_client.go
package communication

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "time"

    "github.com/go-resty/resty/v2"
    "github.com/hashicorp/consul/api"
)

// 服务客户端接口
type ServiceClient interface {
    Call(ctx context.Context, service, method string, request interface{}, response interface{}) error
}

// HTTP 服务客户端
type HTTPServiceClient struct {
    client         *resty.Client
    consulClient   *api.Client
    circuitBreaker CircuitBreaker
}

func NewHTTPServiceClient(consulClient *api.Client) *HTTPServiceClient {
    client := resty.New().
        SetTimeout(30 * time.Second).
        SetRetryCount(3).
        SetRetryWaitTime(1 * time.Second)

    return &HTTPServiceClient{
        client:         client,
        consulClient:   consulClient,
        circuitBreaker: NewCircuitBreaker(),
    }
}

// 调用服务
func (c *HTTPServiceClient) Call(ctx context.Context, service, method string, request interface{}, response interface{}) error {
    // 服务发现
    serviceURL, err := c.discoverService(service)
    if err != nil {
        return fmt.Errorf("service discovery failed: %w", err)
    }

    // 构建请求 URL
    url := fmt.Sprintf("%s%s", serviceURL, method)

    // 使用熔断器
    return c.circuitBreaker.Execute(func() error {
        resp, err := c.client.R().
            SetContext(ctx).
            SetHeader("Content-Type", "application/json").
            SetBody(request).
            Post(url)

        if err != nil {
            return err
        }

        if resp.StatusCode() >= 400 {
            return fmt.Errorf("service call failed with status: %d", resp.StatusCode())
        }

        if response != nil {
            return json.Unmarshal(resp.Body(), response)
        }

        return nil
    })
}

// 服务发现
func (c *HTTPServiceClient) discoverService(serviceName string) (string, error) {
    services, _, err := c.consulClient.Health().Service(serviceName, "", true, nil)
    if err != nil {
        return "", err
    }

    if len(services) == 0 {
        return "", fmt.Errorf("no healthy instances found for service: %s", serviceName)
    }

    // 简单的负载均衡:随机选择一个实例
    service := services[rand.Intn(len(services))]
    return fmt.Sprintf("http://%s:%d", service.Service.Address, service.Service.Port), nil
}

// 熔断器实现
type CircuitBreaker interface {
    Execute(fn func() error) error
}

type circuitBreaker struct {
    failureThreshold int
    resetTimeout     time.Duration
    state            CircuitState
    failures         int
    lastFailureTime  time.Time
    mutex            sync.RWMutex
}

type CircuitState int

const (
    CircuitStateClosed CircuitState = iota
    CircuitStateOpen
    CircuitStateHalfOpen
)

func NewCircuitBreaker() CircuitBreaker {
    return &circuitBreaker{
        failureThreshold: 5,
        resetTimeout:     60 * time.Second,
        state:            CircuitStateClosed,
    }
}

func (cb *circuitBreaker) Execute(fn func() error) error {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()

    // 检查熔断器状态
    if cb.state == CircuitStateOpen {
        if time.Since(cb.lastFailureTime) > cb.resetTimeout {
            cb.state = CircuitStateHalfOpen
            cb.failures = 0
        } else {
            return errors.New("circuit breaker is open")
        }
    }

    // 执行函数
    err := fn()

    if err != nil {
        cb.failures++
        cb.lastFailureTime = time.Now()

        if cb.failures >= cb.failureThreshold {
            cb.state = CircuitStateOpen
        }

        return err
    }

    // 成功执行,重置状态
    if cb.state == CircuitStateHalfOpen {
        cb.state = CircuitStateClosed
    }
    cb.failures = 0

    return nil
}

API 网关配置 #

Kong 网关配置 #

# deployments/kong/kong.yml
apiVersion: v1
kind: ConfigMap
metadata:
  name: kong-config
data:
  kong.conf: |
    database = off
    declarative_config = /kong/declarative/kong.yml
    proxy_listen = 0.0.0.0:8000
    admin_listen = 0.0.0.0:8001

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: kong-declarative-config
data:
  kong.yml: |
    _format_version: "2.1"

    services:
    - name: user-service
      url: http://user-service:8001
      routes:
      - name: user-routes
        paths:
        - /api/v1/users
        methods:
        - GET
        - POST
        - PUT
        - DELETE
        plugins:
        - name: rate-limiting
          config:
            minute: 100
            hour: 1000
        - name: jwt
          config:
            secret_is_base64: false

    - name: product-service
      url: http://product-service:8002
      routes:
      - name: product-routes
        paths:
        - /api/v1/products
        methods:
        - GET
        - POST
        - PUT
        - DELETE
        plugins:
        - name: rate-limiting
          config:
            minute: 200
            hour: 2000

    - name: order-service
      url: http://order-service:8003
      routes:
      - name: order-routes
        paths:
        - /api/v1/orders
        methods:
        - GET
        - POST
        - PUT
        - DELETE
        plugins:
        - name: rate-limiting
          config:
            minute: 150
            hour: 1500
        - name: jwt
          config:
            secret_is_base64: false

    consumers:
    - username: ecommerce-app
      jwt_secrets:
      - key: ecommerce-jwt-key
        secret: your-jwt-secret-key

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kong
spec:
  replicas: 2
  selector:
    matchLabels:
      app: kong
  template:
    metadata:
      labels:
        app: kong
    spec:
      containers:
        - name: kong
          image: kong:2.8
          ports:
            - containerPort: 8000
            - containerPort: 8001
          env:
            - name: KONG_DATABASE
              value: "off"
            - name: KONG_DECLARATIVE_CONFIG
              value: "/kong/declarative/kong.yml"
            - name: KONG_PROXY_LISTEN
              value: "0.0.0.0:8000"
            - name: KONG_ADMIN_LISTEN
              value: "0.0.0.0:8001"
          volumeMounts:
            - name: kong-config
              mountPath: /kong/declarative
      volumes:
        - name: kong-config
          configMap:
            name: kong-declarative-config

---
apiVersion: v1
kind: Service
metadata:
  name: kong
spec:
  selector:
    app: kong
  ports:
    - name: proxy
      port: 8000
      targetPort: 8000
    - name: admin
      port: 8001
      targetPort: 8001
  type: LoadBalancer

Docker 容器化 #

用户服务 Dockerfile #

# deployments/user-service/Dockerfile
FROM golang:1.21-alpine AS builder

WORKDIR /app

# 复制 go mod 文件
COPY go.mod go.sum ./
RUN go mod download

# 复制源代码
COPY . .

# 构建应用
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main ./cmd/server

# 运行阶段
FROM alpine:latest

RUN apk --no-cache add ca-certificates tzdata
WORKDIR /root/

# 复制二进制文件
COPY --from=builder /app/main .
COPY --from=builder /app/configs ./configs

# 设置时区
ENV TZ=Asia/Shanghai

EXPOSE 8001

CMD ["./main"]

Docker Compose 配置 #

# docker-compose.yml
version: "3.8"

services:
  # 基础设施服务
  consul:
    image: consul:1.15
    ports:
      - "8500:8500"
    command: agent -server -bootstrap -ui -client=0.0.0.0

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes
    volumes:
      - redis_data:/data

  kafka:
    image: confluentinc/cp-kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  # 数据库服务
  user-db:
    image: postgres:15
    environment:
      POSTGRES_DB: user_service
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"
    volumes:
      - user_db_data:/var/lib/postgresql/data

  product-db:
    image: postgres:15
    environment:
      POSTGRES_DB: product_service
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
    ports:
      - "5433:5432"
    volumes:
      - product_db_data:/var/lib/postgresql/data

  order-db:
    image: postgres:15
    environment:
      POSTGRES_DB: order_service
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
    ports:
      - "5434:5432"
    volumes:
      - order_db_data:/var/lib/postgresql/data

  # 搜索服务
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
    ports:
      - "9200:9200"
    volumes:
      - es_data:/usr/share/elasticsearch/data

  # 微服务
  user-service:
    build:
      context: ./user-service
      dockerfile: deployments/Dockerfile
    ports:
      - "8001:8001"
    environment:
      - DB_HOST=user-db
      - DB_PORT=5432
      - DB_NAME=user_service
      - DB_USER=postgres
      - DB_PASSWORD=password
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - CONSUL_HOST=consul
      - CONSUL_PORT=8500
      - KAFKA_BROKERS=kafka:9092
    depends_on:
      - user-db
      - redis
      - consul
      - kafka

  product-service:
    build:
      context: ./product-service
      dockerfile: deployments/Dockerfile
    ports:
      - "8002:8002"
    environment:
      - DB_HOST=product-db
      - DB_PORT=5432
      - DB_NAME=product_service
      - DB_USER=postgres
      - DB_PASSWORD=password
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - CONSUL_HOST=consul
      - CONSUL_PORT=8500
      - KAFKA_BROKERS=kafka:9092
      - ELASTICSEARCH_URL=http://elasticsearch:9200
    depends_on:
      - product-db
      - redis
      - consul
      - kafka
      - elasticsearch

  order-service:
    build:
      context: ./order-service
      dockerfile: deployments/Dockerfile
    ports:
      - "8003:8003"
    environment:
      - DB_HOST=order-db
      - DB_PORT=5432
      - DB_NAME=order_service
      - DB_USER=postgres
      - DB_PASSWORD=password
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - CONSUL_HOST=consul
      - CONSUL_PORT=8500
      - KAFKA_BROKERS=kafka:9092
      - USER_SERVICE_URL=http://user-service:8001
      - PRODUCT_SERVICE_URL=http://product-service:8002
    depends_on:
      - order-db
      - redis
      - consul
      - kafka
      - user-service
      - product-service

  # API 网关
  kong:
    image: kong:2.8
    ports:
      - "8000:8000"
      - "8001:8001"
    environment:
      - KONG_DATABASE=off
      - KONG_DECLARATIVE_CONFIG=/kong/declarative/kong.yml
      - KONG_PROXY_LISTEN=0.0.0.0:8000
      - KONG_ADMIN_LISTEN=0.0.0.0:8001
    volumes:
      - ./deployments/kong/kong.yml:/kong/declarative/kong.yml
    depends_on:
      - user-service
      - product-service
      - order-service

volumes:
  redis_data:
  user_db_data:
  product_db_data:
  order_db_data:
  es_data:

Kubernetes 部署 #

用户服务 K8s 配置 #

# deployments/k8s/user-service.yml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
  labels:
    app: user-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: user-service
  template:
    metadata:
      labels:
        app: user-service
    spec:
      containers:
        - name: user-service
          image: ecommerce/user-service:latest
          ports:
            - containerPort: 8001
          env:
            - name: DB_HOST
              value: "user-db"
            - name: DB_PORT
              value: "5432"
            - name: DB_NAME
              value: "user_service"
            - name: DB_USER
              valueFrom:
                secretKeyRef:
                  name: db-secret
                  key: username
            - name: DB_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: db-secret
                  key: password
            - name: REDIS_HOST
              value: "redis"
            - name: REDIS_PORT
              value: "6379"
            - name: CONSUL_HOST
              value: "consul"
            - name: CONSUL_PORT
              value: "8500"
          livenessProbe:
            httpGet:
              path: /health
              port: 8001
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /ready
              port: 8001
            initialDelaySeconds: 5
            periodSeconds: 5
          resources:
            requests:
              memory: "128Mi"
              cpu: "100m"
            limits:
              memory: "512Mi"
              cpu: "500m"

---
apiVersion: v1
kind: Service
metadata:
  name: user-service
spec:
  selector:
    app: user-service
  ports:
    - port: 8001
      targetPort: 8001
  type: ClusterIP

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: user-service-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: user-service
  minReplicas: 3
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80

配置管理 #

# deployments/k8s/configmap.yml
apiVersion: v1
kind: ConfigMap
metadata:
  name: app-config
data:
  app.yml: |
    server:
      port: 8001
      read_timeout: 30s
      write_timeout: 30s
      
    database:
      max_open_conns: 25
      max_idle_conns: 5
      conn_max_lifetime: 300s
      
    redis:
      pool_size: 10
      min_idle_conns: 5
      
    consul:
      health_check_interval: 10s
      deregister_critical_service_after: 30s
      
    kafka:
      consumer_group: ecommerce-group
      auto_offset_reset: earliest
      
    logging:
      level: info
      format: json
      
    tracing:
      jaeger_endpoint: http://jaeger:14268/api/traces
      sample_rate: 0.1
      
    metrics:
      prometheus_endpoint: /metrics

---
apiVersion: v1
kind: Secret
metadata:
  name: db-secret
type: Opaque
data:
  username: cG9zdGdyZXM= # postgres (base64)
  password: cGFzc3dvcmQ= # password (base64)

---
apiVersion: v1
kind: Secret
metadata:
  name: jwt-secret
type: Opaque
data:
  key: eW91ci1qd3Qtc2VjcmV0LWtleQ== # your-jwt-secret-key (base64)

监控与可观测性 #

Prometheus 监控配置 #

# deployments/monitoring/prometheus.yml
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
data:
  prometheus.yml: |
    global:
      scrape_interval: 15s
      evaluation_interval: 15s
      
    rule_files:
      - "alert_rules.yml"
      
    alerting:
      alertmanagers:
        - static_configs:
            - targets:
              - alertmanager:9093
              
    scrape_configs:
      - job_name: 'user-service'
        static_configs:
          - targets: ['user-service:8001']
        metrics_path: /metrics
        scrape_interval: 10s
        
      - job_name: 'product-service'
        static_configs:
          - targets: ['product-service:8002']
        metrics_path: /metrics
        scrape_interval: 10s
        
      - job_name: 'order-service'
        static_configs:
          - targets: ['order-service:8003']
        metrics_path: /metrics
        scrape_interval: 10s
        
      - job_name: 'kong'
        static_configs:
          - targets: ['kong:8001']
        metrics_path: /metrics
        scrape_interval: 15s
        
      - job_name: 'consul'
        static_configs:
          - targets: ['consul:8500']
        metrics_path: /v1/agent/metrics
        params:
          format: ['prometheus']
        scrape_interval: 15s

  alert_rules.yml: |
    groups:
    - name: ecommerce_alerts
      rules:
      - alert: ServiceDown
        expr: up == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Service {{ $labels.job }} is down"
          description: "Service {{ $labels.job }} has been down for more than 1 minute"
          
      - alert: HighErrorRate
        expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.1
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "High error rate on {{ $labels.job }}"
          description: "Error rate is {{ $value }} errors per second"
          
      - alert: HighLatency
        expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 0.5
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "High latency on {{ $labels.job }}"
          description: "95th percentile latency is {{ $value }} seconds"
          
      - alert: HighMemoryUsage
        expr: (container_memory_usage_bytes / container_spec_memory_limit_bytes) > 0.8
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High memory usage on {{ $labels.container_label_io_kubernetes_pod_name }}"
          description: "Memory usage is {{ $value | humanizePercentage }}"

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: prometheus
spec:
  replicas: 1
  selector:
    matchLabels:
      app: prometheus
  template:
    metadata:
      labels:
        app: prometheus
    spec:
      containers:
        - name: prometheus
          image: prom/prometheus:latest
          ports:
            - containerPort: 9090
          volumeMounts:
            - name: config
              mountPath: /etc/prometheus
            - name: storage
              mountPath: /prometheus
          args:
            - "--config.file=/etc/prometheus/prometheus.yml"
            - "--storage.tsdb.path=/prometheus"
            - "--web.console.libraries=/etc/prometheus/console_libraries"
            - "--web.console.templates=/etc/prometheus/consoles"
            - "--storage.tsdb.retention.time=15d"
            - "--web.enable-lifecycle"
      volumes:
        - name: config
          configMap:
            name: prometheus-config
        - name: storage
          emptyDir: {}

---
apiVersion: v1
kind: Service
metadata:
  name: prometheus
spec:
  selector:
    app: prometheus
  ports:
    - port: 9090
      targetPort: 9090
  type: ClusterIP

应用指标收集 #

// pkg/metrics/metrics.go
package metrics

import (
    "time"

    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    // HTTP 请求指标
    httpRequestsTotal = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "http_requests_total",
            Help: "Total number of HTTP requests",
        },
        []string{"method", "endpoint", "status"},
    )

    httpRequestDuration = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "http_request_duration_seconds",
            Help:    "HTTP request duration in seconds",
            Buckets: prometheus.DefBuckets,
        },
        []string{"method", "endpoint"},
    )

    // 业务指标
    ordersTotal = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "orders_total",
            Help: "Total number of orders",
        },
        []string{"status"},
    )

    orderAmount = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "order_amount",
            Help:    "Order amount distribution",
            Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
        },
        []string{"currency"},
    )

    activeUsers = promauto.NewGauge(
        prometheus.GaugeOpts{
            Name: "active_users",
            Help: "Number of active users",
        },
    )

    inventoryLevel = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "inventory_level",
            Help: "Current inventory level",
        },
        []string{"product_id"},
    )

    // 系统指标
    databaseConnections = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "database_connections",
            Help: "Number of database connections",
        },
        []string{"state"},
    )

    cacheHitRate = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "cache_hit_rate",
            Help: "Cache hit rate",
        },
        []string{"cache_type"},
    )
)

// 指标收集器
type MetricsCollector struct{}

func NewMetricsCollector() *MetricsCollector {
    return &MetricsCollector{}
}

// 记录 HTTP 请求
func (m *MetricsCollector) RecordHTTPRequest(method, endpoint, status string, duration time.Duration) {
    httpRequestsTotal.WithLabelValues(method, endpoint, status).Inc()
    httpRequestDuration.WithLabelValues(method, endpoint).Observe(duration.Seconds())
}

// 记录订单创建
func (m *MetricsCollector) RecordOrderCreated(status string, amount float64, currency string) {
    ordersTotal.WithLabelValues(status).Inc()
    orderAmount.WithLabelValues(currency).Observe(amount)
}

// 更新活跃用户数
func (m *MetricsCollector) UpdateActiveUsers(count float64) {
    activeUsers.Set(count)
}

// 更新库存水平
func (m *MetricsCollector) UpdateInventoryLevel(productID string, level float64) {
    inventoryLevel.WithLabelValues(productID).Set(level)
}

// 更新数据库连接数
func (m *MetricsCollector) UpdateDatabaseConnections(state string, count float64) {
    databaseConnections.WithLabelValues(state).Set(count)
}

// 更新缓存命中率
func (m *MetricsCollector) UpdateCacheHitRate(cacheType string, rate float64) {
    cacheHitRate.WithLabelValues(cacheType).Set(rate)
}

// HTTP 中间件
func (m *MetricsCollector) HTTPMiddleware() gin.HandlerFunc {
    return func(c *gin.Context) {
        start := time.Now()

        c.Next()

        duration := time.Since(start)
        status := fmt.Sprintf("%d", c.Writer.Status())

        m.RecordHTTPRequest(c.Request.Method, c.FullPath(), status, duration)
    }
}

分布式追踪 #

// pkg/tracing/tracer.go
package tracing

import (
    "context"
    "io"

    "github.com/opentracing/opentracing-go"
    "github.com/opentracing/opentracing-go/ext"
    "github.com/uber/jaeger-client-go"
    "github.com/uber/jaeger-client-go/config"
)

// 初始化 Jaeger 追踪器
func InitJaeger(serviceName, jaegerEndpoint string, sampleRate float64) (opentracing.Tracer, io.Closer, error) {
    cfg := config.Configuration{
        ServiceName: serviceName,
        Sampler: &config.SamplerConfig{
            Type:  jaeger.SamplerTypeConst,
            Param: sampleRate,
        },
        Reporter: &config.ReporterConfig{
            LogSpans:           true,
            LocalAgentHostPort: jaegerEndpoint,
        },
    }

    tracer, closer, err := cfg.NewTracer()
    if err != nil {
        return nil, nil, err
    }

    opentracing.SetGlobalTracer(tracer)
    return tracer, closer, nil
}

// HTTP 追踪中间件
func TracingMiddleware() gin.HandlerFunc {
    return func(c *gin.Context) {
        tracer := opentracing.GlobalTracer()

        spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(c.Request.Header))

        span := tracer.StartSpan(
            fmt.Sprintf("%s %s", c.Request.Method, c.FullPath()),
            ext.RPCServerOption(spanCtx),
        )
        defer span.Finish()

        ext.HTTPMethod.Set(span, c.Request.Method)
        ext.HTTPUrl.Set(span, c.Request.URL.String())

        c.Set("tracing-span", span)
        c.Request = c.Request.WithContext(opentracing.ContextWithSpan(c.Request.Context(), span))

        c.Next()

        ext.HTTPStatusCode.Set(span, uint16(c.Writer.Status()))
        if c.Writer.Status() >= 400 {
            ext.Error.Set(span, true)
        }
    }
}

// 创建子 Span
func StartSpanFromContext(ctx context.Context, operationName string) (opentracing.Span, context.Context) {
    span, ctx := opentracing.StartSpanFromContext(ctx, operationName)
    return span, ctx
}

// 服务间调用追踪
func InjectSpanToHTTPRequest(span opentracing.Span, req *http.Request) {
    tracer := opentracing.GlobalTracer()
    tracer.Inject(
        span.Context(),
        opentracing.HTTPHeaders,
        opentracing.HTTPHeadersCarrier(req.Header),
    )
}

集成测试 #

API 测试套件 #

// tests/integration/api_test.go
package integration

import (
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
    "testing"
    "time"

    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/suite"
)

type APITestSuite struct {
    suite.Suite
    baseURL string
    client  *http.Client
    token   string
}

func (suite *APITestSuite) SetupSuite() {
    suite.baseURL = "http://localhost:8000" // Kong gateway
    suite.client = &http.Client{
        Timeout: 30 * time.Second,
    }

    // 创建测试用户并获取 token
    suite.createTestUser()
    suite.loginAndGetToken()
}

func (suite *APITestSuite) createTestUser() {
    userData := map[string]interface{}{
        "username": "testuser",
        "email":    "[email protected]",
        "phone":    "1234567890",
        "password": "password123",
    }

    jsonData, _ := json.Marshal(userData)
    resp, err := suite.client.Post(
        suite.baseURL+"/api/v1/users/register",
        "application/json",
        bytes.NewBuffer(jsonData),
    )

    suite.NoError(err)
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusConflict {
        suite.T().Fatalf("Failed to create test user: %d", resp.StatusCode)
    }
}

func (suite *APITestSuite) loginAndGetToken() {
    loginData := map[string]interface{}{
        "username": "testuser",
        "password": "password123",
    }

    jsonData, _ := json.Marshal(loginData)
    resp, err := suite.client.Post(
        suite.baseURL+"/api/v1/users/login",
        "application/json",
        bytes.NewBuffer(jsonData),
    )

    suite.NoError(err)
    defer resp.Body.Close()

    suite.Equal(http.StatusOK, resp.StatusCode)

    var result map[string]interface{}
    json.NewDecoder(resp.Body).Decode(&result)

    suite.token = result["token"].(string)
    suite.NotEmpty(suite.token)
}

func (suite *APITestSuite) TestUserWorkflow() {
    // 测试获取用户信息
    suite.Run("GetUserProfile", func() {
        req, _ := http.NewRequest("GET", suite.baseURL+"/api/v1/users/profile", nil)
        req.Header.Set("Authorization", "Bearer "+suite.token)

        resp, err := suite.client.Do(req)
        suite.NoError(err)
        defer resp.Body.Close()

        suite.Equal(http.StatusOK, resp.StatusCode)

        var user map[string]interface{}
        json.NewDecoder(resp.Body).Decode(&user)

        suite.Equal("testuser", user["username"])
        suite.Equal("[email protected]", user["email"])
    })

    // 测试更新用户资料
    suite.Run("UpdateUserProfile", func() {
        updateData := map[string]interface{}{
            "first_name": "Test",
            "last_name":  "User",
            "address": map[string]interface{}{
                "street":   "123 Test St",
                "city":     "Test City",
                "province": "Test Province",
                "country":  "Test Country",
                "zip_code": "12345",
            },
        }

        jsonData, _ := json.Marshal(updateData)
        req, _ := http.NewRequest("PUT", suite.baseURL+"/api/v1/users/profile", bytes.NewBuffer(jsonData))
        req.Header.Set("Authorization", "Bearer "+suite.token)
        req.Header.Set("Content-Type", "application/json")

        resp, err := suite.client.Do(req)
        suite.NoError(err)
        defer resp.Body.Close()

        suite.Equal(http.StatusOK, resp.StatusCode)
    })
}

func (suite *APITestSuite) TestProductWorkflow() {
    var productID string

    // 测试创建商品
    suite.Run("CreateProduct", func() {
        productData := map[string]interface{}{
            "name":        "Test Product",
            "description": "This is a test product",
            "sku":         "TEST-001",
            "brand_id":    "test-brand-id",
            "category_id": "test-category-id",
            "price": map[string]interface{}{
                "amount":   10000, // 100.00
                "currency": "CNY",
            },
            "images": []map[string]interface{}{
                {
                    "url":        "https://example.com/image.jpg",
                    "alt":        "Test Product Image",
                    "is_primary": true,
                    "sort_order": 1,
                },
            },
            "attributes": []map[string]interface{}{
                {
                    "name":  "Color",
                    "value": "Red",
                    "type":  1,
                },
            },
            "weight": map[string]interface{}{
                "value": 1.5,
                "unit":  "kg",
            },
            "dimensions": map[string]interface{}{
                "length": 10.0,
                "width":  5.0,
                "height": 3.0,
                "unit":   "cm",
            },
            "tags": []string{"test", "product"},
        }

        jsonData, _ := json.Marshal(productData)
        req, _ := http.NewRequest("POST", suite.baseURL+"/api/v1/products", bytes.NewBuffer(jsonData))
        req.Header.Set("Authorization", "Bearer "+suite.token)
        req.Header.Set("Content-Type", "application/json")

        resp, err := suite.client.Do(req)
        suite.NoError(err)
        defer resp.Body.Close()

        suite.Equal(http.StatusCreated, resp.StatusCode)

        var result map[string]interface{}
        json.NewDecoder(resp.Body).Decode(&result)

        productID = result["product_id"].(string)
        suite.NotEmpty(productID)
    })

    // 测试获取商品详情
    suite.Run("GetProduct", func() {
        resp, err := suite.client.Get(suite.baseURL + "/api/v1/products/" + productID)
        suite.NoError(err)
        defer resp.Body.Close()

        suite.Equal(http.StatusOK, resp.StatusCode)

        var product map[string]interface{}
        json.NewDecoder(resp.Body).Decode(&product)

        suite.Equal("Test Product", product["name"])
        suite.Equal("TEST-001", product["sku"])
    })

    // 测试更新库存
    suite.Run("UpdateInventory", func() {
        inventoryData := map[string]interface{}{
            "quantity": 100,
        }

        jsonData, _ := json.Marshal(inventoryData)
        req, _ := http.NewRequest("PUT", suite.baseURL+"/api/v1/products/"+productID+"/inventory", bytes.NewBuffer(jsonData))
        req.Header.Set("Authorization", "Bearer "+suite.token)
        req.Header.Set("Content-Type", "application/json")

        resp, err := suite.client.Do(req)
        suite.NoError(err)
        defer resp.Body.Close()

        suite.Equal(http.StatusOK, resp.StatusCode)
    })
}

func (suite *APITestSuite) TestOrderWorkflow() {
    var orderID string

    // 测试创建订单
    suite.Run("CreateOrder", func() {
        orderData := map[string]interface{}{
            "items": []map[string]interface{}{
                {
                    "product_id": "test-product-id",
                    "quantity":   2,
                },
            },
            "shipping_address": map[string]interface{}{
                "recipient_name": "Test User",
                "phone":          "1234567890",
                "street":         "123 Test St",
                "city":           "Test City",
                "province":       "Test Province",
                "country":        "Test Country",
                "zip_code":       "12345",
            },
            "billing_address": map[string]interface{}{
                "recipient_name": "Test User",
                "phone":          "1234567890",
                "street":         "123 Test St",
                "city":           "Test City",
                "province":       "Test Province",
                "country":        "Test Country",
                "zip_code":       "12345",
            },
            "shipping_method": map[string]interface{}{
                "id":             "standard",
                "name":           "Standard Shipping",
                "description":    "5-7 business days",
                "price": map[string]interface{}{
                    "amount":   1000, // 10.00
                    "currency": "CNY",
                },
                "estimated_days": 7,
            },
            "payment_method": 1, // Credit Card
            "payment_amount": map[string]interface{}{
                "amount":   21000, // 210.00
                "currency": "CNY",
            },
        }

        jsonData, _ := json.Marshal(orderData)
        req, _ := http.NewRequest("POST", suite.baseURL+"/api/v1/orders", bytes.NewBuffer(jsonData))
        req.Header.Set("Authorization", "Bearer "+suite.token)
        req.Header.Set("Content-Type", "application/json")

        resp, err := suite.client.Do(req)
        suite.NoError(err)
        defer resp.Body.Close()

        suite.Equal(http.StatusCreated, resp.StatusCode)

        var result map[string]interface{}
        json.NewDecoder(resp.Body).Decode(&result)

        orderID = result["saga_id"].(string)
        suite.NotEmpty(orderID)
    })

    // 等待 Saga 完成
    time.Sleep(5 * time.Second)

    // 测试获取订单列表
    suite.Run("GetOrders", func() {
        req, _ := http.NewRequest("GET", suite.baseURL+"/api/v1/orders", nil)
        req.Header.Set("Authorization", "Bearer "+suite.token)

        resp, err := suite.client.Do(req)
        suite.NoError(err)
        defer resp.Body.Close()

        suite.Equal(http.StatusOK, resp.StatusCode)

        var result map[string]interface{}
        json.NewDecoder(resp.Body).Decode(&result)

        orders := result["orders"].([]interface{})
        suite.Greater(len(orders), 0)
    })
}

func TestAPITestSuite(t *testing.T) {
    suite.Run(t, new(APITestSuite))
}

性能测试 #

// tests/performance/load_test.go
package performance

import (
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
    "sync"
    "testing"
    "time"
)

func BenchmarkUserRegistration(b *testing.B) {
    client := &http.Client{Timeout: 30 * time.Second}
    baseURL := "http://localhost:8000"

    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        i := 0
        for pb.Next() {
            userData := map[string]interface{}{
                "username": fmt.Sprintf("user%d", i),
                "email":    fmt.Sprintf("user%[email protected]", i),
                "phone":    fmt.Sprintf("123456%04d", i),
                "password": "password123",
            }

            jsonData, _ := json.Marshal(userData)
            resp, err := client.Post(
                baseURL+"/api/v1/users/register",
                "application/json",
                bytes.NewBuffer(jsonData),
            )

            if err != nil {
                b.Error(err)
                continue
            }

            resp.Body.Close()

            if resp.StatusCode != http.StatusCreated {
                b.Errorf("Expected status 201, got %d", resp.StatusCode)
            }

            i++
        }
    })
}

func BenchmarkProductSearch(b *testing.B) {
    client := &http.Client{Timeout: 30 * time.Second}
    baseURL := "http://localhost:8000"

    searchQueries := []string{
        "laptop",
        "phone",
        "book",
        "shoes",
        "watch",
    }

    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        i := 0
        for pb.Next() {
            query := searchQueries[i%len(searchQueries)]
            url := fmt.Sprintf("%s/api/v1/products/search?q=%s", baseURL, query)

            resp, err := client.Get(url)
            if err != nil {
                b.Error(err)
                continue
            }

            resp.Body.Close()

            if resp.StatusCode != http.StatusOK {
                b.Errorf("Expected status 200, got %d", resp.StatusCode)
            }

            i++
        }
    })
}

// 并发测试
func TestConcurrentOrderCreation(t *testing.T) {
    client := &http.Client{Timeout: 30 * time.Second}
    baseURL := "http://localhost:8000"

    // 获取测试 token
    token := getTestToken(t, client, baseURL)

    concurrency := 10
    ordersPerGoroutine := 5

    var wg sync.WaitGroup
    errors := make(chan error, concurrency*ordersPerGoroutine)

    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go func(goroutineID int) {
            defer wg.Done()

            for j := 0; j < ordersPerGoroutine; j++ {
                orderData := map[string]interface{}{
                    "items": []map[string]interface{}{
                        {
                            "product_id": "test-product-id",
                            "quantity":   1,
                        },
                    },
                    "shipping_address": map[string]interface{}{
                        "recipient_name": fmt.Sprintf("User %d-%d", goroutineID, j),
                        "phone":          "1234567890",
                        "street":         "123 Test St",
                        "city":           "Test City",
                        "province":       "Test Province",
                        "country":        "Test Country",
                        "zip_code":       "12345",
                    },
                    "billing_address": map[string]interface{}{
                        "recipient_name": fmt.Sprintf("User %d-%d", goroutineID, j),
                        "phone":          "1234567890",
                        "street":         "123 Test St",
                        "city":           "Test City",
                        "province":       "Test Province",
                        "country":        "Test Country",
                        "zip_code":       "12345",
                    },
                    "shipping_method": map[string]interface{}{
                        "id":             "standard",
                        "name":           "Standard Shipping",
                        "description":    "5-7 business days",
                        "price": map[string]interface{}{
                            "amount":   1000,
                            "currency": "CNY",
                        },
                        "estimated_days": 7,
                    },
                    "payment_method": 1,
                    "payment_amount": map[string]interface{}{
                        "amount":   11000,
                        "currency": "CNY",
                    },
                }

                jsonData, _ := json.Marshal(orderData)
                req, _ := http.NewRequest("POST", baseURL+"/api/v1/orders", bytes.NewBuffer(jsonData))
                req.Header.Set("Authorization", "Bearer "+token)
                req.Header.Set("Content-Type", "application/json")

                resp, err := client.Do(req)
                if err != nil {
                    errors <- err
                    continue
                }

                resp.Body.Close()

                if resp.StatusCode != http.StatusCreated {
                    errors <- fmt.Errorf("expected status 201, got %d", resp.StatusCode)
                }
            }
        }(i)
    }

    wg.Wait()
    close(errors)

    errorCount := 0
    for err := range errors {
        t.Error(err)
        errorCount++
    }

    if errorCount > 0 {
        t.Fatalf("Failed with %d errors out of %d requests", errorCount, concurrency*ordersPerGoroutine)
    }

    t.Logf("Successfully created %d orders concurrently", concurrency*ordersPerGoroutine)
}

func getTestToken(t *testing.T, client *http.Client, baseURL string) string {
    loginData := map[string]interface{}{
        "username": "testuser",
        "password": "password123",
    }

    jsonData, _ := json.Marshal(loginData)
    resp, err := client.Post(
        baseURL+"/api/v1/users/login",
        "application/json",
        bytes.NewBuffer(jsonData),
    )

    if err != nil {
        t.Fatal(err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        t.Fatalf("Login failed with status: %d", resp.StatusCode)
    }

    var result map[string]interface{}
    json.NewDecoder(resp.Body).Decode(&result)

    return result["token"].(string)
}

部署脚本 #

自动化部署脚本 #

#!/bin/bash
# scripts/deploy.sh

set -e

# 配置变量
ENVIRONMENT=${1:-development}
NAMESPACE="ecommerce-${ENVIRONMENT}"
DOCKER_REGISTRY="your-registry.com"
VERSION=${2:-latest}

echo "Deploying ecommerce microservices to ${ENVIRONMENT} environment..."

# 创建命名空间
kubectl create namespace ${NAMESPACE} --dry-run=client -o yaml | kubectl apply -f -

# 部署基础设施服务
echo "Deploying infrastructure services..."
kubectl apply -f deployments/k8s/infrastructure/ -n ${NAMESPACE}

# 等待基础设施服务就绪
echo "Waiting for infrastructure services to be ready..."
kubectl wait --for=condition=ready pod -l app=consul -n ${NAMESPACE} --timeout=300s
kubectl wait --for=condition=ready pod -l app=redis -n ${NAMESPACE} --timeout=300s
kubectl wait --for=condition=ready pod -l app=kafka -n ${NAMESPACE} --timeout=300s

# 部署数据库
echo "Deploying databases..."
kubectl apply -f deployments/k8s/databases/ -n ${NAMESPACE}

# 等待数据库就绪
echo "Waiting for databases to be ready..."
kubectl wait --for=condition=ready pod -l app=user-db -n ${NAMESPACE} --timeout=300s
kubectl wait --for=condition=ready pod -l app=product-db -n ${NAMESPACE} --timeout=300s
kubectl wait --for=condition=ready pod -l app=order-db -n ${NAMESPACE} --timeout=300s

# 运行数据库迁移
echo "Running database migrations..."
kubectl apply -f deployments/k8s/migrations/ -n ${NAMESPACE}

# 部署微服务
echo "Deploying microservices..."

# 更新镜像版本
sed -i "s|image: ecommerce/user-service:.*|image: ${DOCKER_REGISTRY}/ecommerce/user-service:${VERSION}|g" deployments/k8s/user-service.yml
sed -i "s|image: ecommerce/product-service:.*|image: ${DOCKER_REGISTRY}/ecommerce/product-service:${VERSION}|g" deployments/k8s/product-service.yml
sed -i "s|image: ecommerce/order-service:.*|image: ${DOCKER_REGISTRY}/ecommerce/order-service:${VERSION}|g" deployments/k8s/order-service.yml

kubectl apply -f deployments/k8s/user-service.yml -n ${NAMESPACE}
kubectl apply -f deployments/k8s/product-service.yml -n ${NAMESPACE}
kubectl apply -f deployments/k8s/order-service.yml -n ${NAMESPACE}

# 等待微服务就绪
echo "Waiting for microservices to be ready..."
kubectl wait --for=condition=ready pod -l app=user-service -n ${NAMESPACE} --timeout=300s
kubectl wait --for=condition=ready pod -l app=product-service -n ${NAMESPACE} --timeout=300s
kubectl wait --for=condition=ready pod -l app=order-service -n ${NAMESPACE} --timeout=300s

# 部署 API 网关
echo "Deploying API Gateway..."
kubectl apply -f deployments/k8s/kong.yml -n ${NAMESPACE}

# 等待网关就绪
kubectl wait --for=condition=ready pod -l app=kong -n ${NAMESPACE} --timeout=300s

# 部署监控服务
echo "Deploying monitoring services..."
kubectl apply -f deployments/monitoring/ -n ${NAMESPACE}

# 运行健康检查
echo "Running health checks..."
./scripts/health-check.sh ${NAMESPACE}

echo "Deployment completed successfully!"
echo "API Gateway: http://$(kubectl get svc kong -n ${NAMESPACE} -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):8000"
echo "Prometheus: http://$(kubectl get svc prometheus -n ${NAMESPACE} -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):9090"
echo "Grafana: http://$(kubectl get svc grafana -n ${NAMESPACE} -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):3000"

健康检查脚本 #

#!/bin/bash
# scripts/health-check.sh

NAMESPACE=${1:-ecommerce-development}

echo "Running health checks for namespace: ${NAMESPACE}"

# 检查服务状态
check_service() {
    local service=$1
    local port=$2

    echo "Checking ${service}..."

    # 获取服务 IP
    SERVICE_IP=$(kubectl get svc ${service} -n ${NAMESPACE} -o jsonpath='{.spec.clusterIP}')

    if [ -z "$SERVICE_IP" ]; then
        echo "❌ ${service}: Service not found"
        return 1
    fi

    # 健康检查
    if kubectl run health-check-${service} --rm -i --restart=Never --image=curlimages/curl -- curl -f http://${SERVICE_IP}:${port}/health > /dev/null 2>&1; then
        echo "✅ ${service}: Healthy"
        return 0
    else
        echo "❌ ${service}: Unhealthy"
        return 1
    fi
}

# 检查各个服务
FAILED=0

check_service "user-service" "8001" || FAILED=1
check_service "product-service" "8002" || FAILED=1
check_service "order-service" "8003" || FAILED=1
check_service "kong" "8000" || FAILED=1

# 检查数据库连接
echo "Checking database connections..."

# 用户数据库
if kubectl exec -n ${NAMESPACE} deployment/user-db -- pg_isready > /dev/null 2>&1; then
    echo "✅ user-db: Connected"
else
    echo "❌ user-db: Connection failed"
    FAILED=1
fi

# 商品数据库
if kubectl exec -n ${NAMESPACE} deployment/product-db -- pg_isready > /dev/null 2>&1; then
    echo "✅ product-db: Connected"
else
    echo "❌ product-db: Connection failed"
    FAILED=1
fi

# 订单数据库
if kubectl exec -n ${NAMESPACE} deployment/order-db -- pg_isready > /dev/null 2>&1; then
    echo "✅ order-db: Connected"
else
    echo "❌ order-db: Connection failed"
    FAILED=1
fi

# 检查 Redis
if kubectl exec -n ${NAMESPACE} deployment/redis -- redis-cli ping | grep -q PONG; then
    echo "✅ redis: Connected"
else
    echo "❌ redis: Connection failed"
    FAILED=1
fi

# 检查 Kafka
if kubectl exec -n ${NAMESPACE} deployment/kafka -- kafka-topics --bootstrap-server localhost:9092 --list > /dev/null 2>&1; then
    echo "✅ kafka: Connected"
else
    echo "❌ kafka: Connection failed"
    FAILED=1
fi

if [ $FAILED -eq 0 ]; then
    echo "🎉 All health checks passed!"
    exit 0
else
    echo "💥 Some health checks failed!"
    exit 1
fi

总结 #

通过本节的综合实战,我们完成了一个完整的电商微服务系统的构建,包括:

  1. 服务集成:实现了用户服务、商品服务、订单服务的完整集成
  2. API 网关:使用 Kong 实现统一的 API 入口和流量管理
  3. 容器化部署:使用 Docker 和 Kubernetes 实现服务的容器化部署
  4. 监控体系:集成 Prometheus、Grafana、Jaeger 实现完整的可观测性
  5. 测试策略:包含集成测试、性能测试、并发测试
  6. 自动化部署:提供完整的部署脚本和健康检查

这个系统展示了现代微服务架构的最佳实践,为构建大规模分布式系统提供了完整的参考实现。