5.9.5 综合练习:电商微服务系统 #
本节将把前面开发的用户服务、商品服务、订单服务等组合成一个完整的电商微服务系统,包括服务集成、部署、监控、测试等各个方面。
系统集成架构 #
整体架构图 #
┌─────────────────────────────────────┐
│ Load Balancer │
└─────────────────┬───────────────────┘
│
┌─────────────────┴───────────────────┐
│ API Gateway │
│ (Kong/Envoy) │
└─────────────────┬───────────────────┘
│
┌─────────────────────────────┼─────────────────────────────┐
│ │ │
┌───────▼────────┐ ┌─────────▼────────┐ ┌─────────▼────────┐
│ User Service │ │ Product Service │ │ Order Service │
│ (Port 8001) │ │ (Port 8002) │ │ (Port 8003) │
└───────┬────────┘ └─────────┬────────┘ └─────────┬────────┘
│ │ │
│ │ │
┌───────▼────────┐ ┌─────────▼────────┐ ┌─────────▼────────┐
│ PostgreSQL │ │ PostgreSQL │ │ PostgreSQL │
│ (User DB) │ │ (Product DB) │ │ (Order DB) │
└────────────────┘ └──────────────────┘ └──────────────────┘
┌─────────────────────────────────────┐
│ Infrastructure │
│ │
│ ┌─────────┐ ┌─────────┐ ┌──────┐ │
│ │ Redis │ │ Kafka │ │ Consul│ │
│ │ (Cache) │ │(Message)│ │(Disc.)│ │
│ └─────────┘ └─────────┘ └──────┘ │
│ │
│ ┌─────────┐ ┌─────────┐ ┌──────┐ │
│ │Prometheus│ │ Jaeger │ │ ELK │ │
│ │(Metrics)│ │(Tracing)│ │(Logs)│ │
│ └─────────┘ └─────────┘ └──────┘ │
└─────────────────────────────────────┘
服务通信模式 #
// pkg/communication/service_client.go
package communication
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/go-resty/resty/v2"
"github.com/hashicorp/consul/api"
)
// 服务客户端接口
type ServiceClient interface {
Call(ctx context.Context, service, method string, request interface{}, response interface{}) error
}
// HTTP 服务客户端
type HTTPServiceClient struct {
client *resty.Client
consulClient *api.Client
circuitBreaker CircuitBreaker
}
func NewHTTPServiceClient(consulClient *api.Client) *HTTPServiceClient {
client := resty.New().
SetTimeout(30 * time.Second).
SetRetryCount(3).
SetRetryWaitTime(1 * time.Second)
return &HTTPServiceClient{
client: client,
consulClient: consulClient,
circuitBreaker: NewCircuitBreaker(),
}
}
// 调用服务
func (c *HTTPServiceClient) Call(ctx context.Context, service, method string, request interface{}, response interface{}) error {
// 服务发现
serviceURL, err := c.discoverService(service)
if err != nil {
return fmt.Errorf("service discovery failed: %w", err)
}
// 构建请求 URL
url := fmt.Sprintf("%s%s", serviceURL, method)
// 使用熔断器
return c.circuitBreaker.Execute(func() error {
resp, err := c.client.R().
SetContext(ctx).
SetHeader("Content-Type", "application/json").
SetBody(request).
Post(url)
if err != nil {
return err
}
if resp.StatusCode() >= 400 {
return fmt.Errorf("service call failed with status: %d", resp.StatusCode())
}
if response != nil {
return json.Unmarshal(resp.Body(), response)
}
return nil
})
}
// 服务发现
func (c *HTTPServiceClient) discoverService(serviceName string) (string, error) {
services, _, err := c.consulClient.Health().Service(serviceName, "", true, nil)
if err != nil {
return "", err
}
if len(services) == 0 {
return "", fmt.Errorf("no healthy instances found for service: %s", serviceName)
}
// 简单的负载均衡:随机选择一个实例
service := services[rand.Intn(len(services))]
return fmt.Sprintf("http://%s:%d", service.Service.Address, service.Service.Port), nil
}
// 熔断器实现
type CircuitBreaker interface {
Execute(fn func() error) error
}
type circuitBreaker struct {
failureThreshold int
resetTimeout time.Duration
state CircuitState
failures int
lastFailureTime time.Time
mutex sync.RWMutex
}
type CircuitState int
const (
CircuitStateClosed CircuitState = iota
CircuitStateOpen
CircuitStateHalfOpen
)
func NewCircuitBreaker() CircuitBreaker {
return &circuitBreaker{
failureThreshold: 5,
resetTimeout: 60 * time.Second,
state: CircuitStateClosed,
}
}
func (cb *circuitBreaker) Execute(fn func() error) error {
cb.mutex.Lock()
defer cb.mutex.Unlock()
// 检查熔断器状态
if cb.state == CircuitStateOpen {
if time.Since(cb.lastFailureTime) > cb.resetTimeout {
cb.state = CircuitStateHalfOpen
cb.failures = 0
} else {
return errors.New("circuit breaker is open")
}
}
// 执行函数
err := fn()
if err != nil {
cb.failures++
cb.lastFailureTime = time.Now()
if cb.failures >= cb.failureThreshold {
cb.state = CircuitStateOpen
}
return err
}
// 成功执行,重置状态
if cb.state == CircuitStateHalfOpen {
cb.state = CircuitStateClosed
}
cb.failures = 0
return nil
}
API 网关配置 #
Kong 网关配置 #
# deployments/kong/kong.yml
apiVersion: v1
kind: ConfigMap
metadata:
name: kong-config
data:
kong.conf: |
database = off
declarative_config = /kong/declarative/kong.yml
proxy_listen = 0.0.0.0:8000
admin_listen = 0.0.0.0:8001
---
apiVersion: v1
kind: ConfigMap
metadata:
name: kong-declarative-config
data:
kong.yml: |
_format_version: "2.1"
services:
- name: user-service
url: http://user-service:8001
routes:
- name: user-routes
paths:
- /api/v1/users
methods:
- GET
- POST
- PUT
- DELETE
plugins:
- name: rate-limiting
config:
minute: 100
hour: 1000
- name: jwt
config:
secret_is_base64: false
- name: product-service
url: http://product-service:8002
routes:
- name: product-routes
paths:
- /api/v1/products
methods:
- GET
- POST
- PUT
- DELETE
plugins:
- name: rate-limiting
config:
minute: 200
hour: 2000
- name: order-service
url: http://order-service:8003
routes:
- name: order-routes
paths:
- /api/v1/orders
methods:
- GET
- POST
- PUT
- DELETE
plugins:
- name: rate-limiting
config:
minute: 150
hour: 1500
- name: jwt
config:
secret_is_base64: false
consumers:
- username: ecommerce-app
jwt_secrets:
- key: ecommerce-jwt-key
secret: your-jwt-secret-key
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kong
spec:
replicas: 2
selector:
matchLabels:
app: kong
template:
metadata:
labels:
app: kong
spec:
containers:
- name: kong
image: kong:2.8
ports:
- containerPort: 8000
- containerPort: 8001
env:
- name: KONG_DATABASE
value: "off"
- name: KONG_DECLARATIVE_CONFIG
value: "/kong/declarative/kong.yml"
- name: KONG_PROXY_LISTEN
value: "0.0.0.0:8000"
- name: KONG_ADMIN_LISTEN
value: "0.0.0.0:8001"
volumeMounts:
- name: kong-config
mountPath: /kong/declarative
volumes:
- name: kong-config
configMap:
name: kong-declarative-config
---
apiVersion: v1
kind: Service
metadata:
name: kong
spec:
selector:
app: kong
ports:
- name: proxy
port: 8000
targetPort: 8000
- name: admin
port: 8001
targetPort: 8001
type: LoadBalancer
Docker 容器化 #
用户服务 Dockerfile #
# deployments/user-service/Dockerfile
FROM golang:1.21-alpine AS builder
WORKDIR /app
# 复制 go mod 文件
COPY go.mod go.sum ./
RUN go mod download
# 复制源代码
COPY . .
# 构建应用
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main ./cmd/server
# 运行阶段
FROM alpine:latest
RUN apk --no-cache add ca-certificates tzdata
WORKDIR /root/
# 复制二进制文件
COPY --from=builder /app/main .
COPY --from=builder /app/configs ./configs
# 设置时区
ENV TZ=Asia/Shanghai
EXPOSE 8001
CMD ["./main"]
Docker Compose 配置 #
# docker-compose.yml
version: "3.8"
services:
# 基础设施服务
consul:
image: consul:1.15
ports:
- "8500:8500"
command: agent -server -bootstrap -ui -client=0.0.0.0
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --appendonly yes
volumes:
- redis_data:/data
kafka:
image: confluentinc/cp-kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
# 数据库服务
user-db:
image: postgres:15
environment:
POSTGRES_DB: user_service
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
volumes:
- user_db_data:/var/lib/postgresql/data
product-db:
image: postgres:15
environment:
POSTGRES_DB: product_service
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- "5433:5432"
volumes:
- product_db_data:/var/lib/postgresql/data
order-db:
image: postgres:15
environment:
POSTGRES_DB: order_service
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- "5434:5432"
volumes:
- order_db_data:/var/lib/postgresql/data
# 搜索服务
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
# 微服务
user-service:
build:
context: ./user-service
dockerfile: deployments/Dockerfile
ports:
- "8001:8001"
environment:
- DB_HOST=user-db
- DB_PORT=5432
- DB_NAME=user_service
- DB_USER=postgres
- DB_PASSWORD=password
- REDIS_HOST=redis
- REDIS_PORT=6379
- CONSUL_HOST=consul
- CONSUL_PORT=8500
- KAFKA_BROKERS=kafka:9092
depends_on:
- user-db
- redis
- consul
- kafka
product-service:
build:
context: ./product-service
dockerfile: deployments/Dockerfile
ports:
- "8002:8002"
environment:
- DB_HOST=product-db
- DB_PORT=5432
- DB_NAME=product_service
- DB_USER=postgres
- DB_PASSWORD=password
- REDIS_HOST=redis
- REDIS_PORT=6379
- CONSUL_HOST=consul
- CONSUL_PORT=8500
- KAFKA_BROKERS=kafka:9092
- ELASTICSEARCH_URL=http://elasticsearch:9200
depends_on:
- product-db
- redis
- consul
- kafka
- elasticsearch
order-service:
build:
context: ./order-service
dockerfile: deployments/Dockerfile
ports:
- "8003:8003"
environment:
- DB_HOST=order-db
- DB_PORT=5432
- DB_NAME=order_service
- DB_USER=postgres
- DB_PASSWORD=password
- REDIS_HOST=redis
- REDIS_PORT=6379
- CONSUL_HOST=consul
- CONSUL_PORT=8500
- KAFKA_BROKERS=kafka:9092
- USER_SERVICE_URL=http://user-service:8001
- PRODUCT_SERVICE_URL=http://product-service:8002
depends_on:
- order-db
- redis
- consul
- kafka
- user-service
- product-service
# API 网关
kong:
image: kong:2.8
ports:
- "8000:8000"
- "8001:8001"
environment:
- KONG_DATABASE=off
- KONG_DECLARATIVE_CONFIG=/kong/declarative/kong.yml
- KONG_PROXY_LISTEN=0.0.0.0:8000
- KONG_ADMIN_LISTEN=0.0.0.0:8001
volumes:
- ./deployments/kong/kong.yml:/kong/declarative/kong.yml
depends_on:
- user-service
- product-service
- order-service
volumes:
redis_data:
user_db_data:
product_db_data:
order_db_data:
es_data:
Kubernetes 部署 #
用户服务 K8s 配置 #
# deployments/k8s/user-service.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
labels:
app: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: ecommerce/user-service:latest
ports:
- containerPort: 8001
env:
- name: DB_HOST
value: "user-db"
- name: DB_PORT
value: "5432"
- name: DB_NAME
value: "user_service"
- name: DB_USER
valueFrom:
secretKeyRef:
name: db-secret
key: username
- name: DB_PASSWORD
valueFrom:
secretKeyRef:
name: db-secret
key: password
- name: REDIS_HOST
value: "redis"
- name: REDIS_PORT
value: "6379"
- name: CONSUL_HOST
value: "consul"
- name: CONSUL_PORT
value: "8500"
livenessProbe:
httpGet:
path: /health
port: 8001
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8001
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
- port: 8001
targetPort: 8001
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: user-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: user-service
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
配置管理 #
# deployments/k8s/configmap.yml
apiVersion: v1
kind: ConfigMap
metadata:
name: app-config
data:
app.yml: |
server:
port: 8001
read_timeout: 30s
write_timeout: 30s
database:
max_open_conns: 25
max_idle_conns: 5
conn_max_lifetime: 300s
redis:
pool_size: 10
min_idle_conns: 5
consul:
health_check_interval: 10s
deregister_critical_service_after: 30s
kafka:
consumer_group: ecommerce-group
auto_offset_reset: earliest
logging:
level: info
format: json
tracing:
jaeger_endpoint: http://jaeger:14268/api/traces
sample_rate: 0.1
metrics:
prometheus_endpoint: /metrics
---
apiVersion: v1
kind: Secret
metadata:
name: db-secret
type: Opaque
data:
username: cG9zdGdyZXM= # postgres (base64)
password: cGFzc3dvcmQ= # password (base64)
---
apiVersion: v1
kind: Secret
metadata:
name: jwt-secret
type: Opaque
data:
key: eW91ci1qd3Qtc2VjcmV0LWtleQ== # your-jwt-secret-key (base64)
监控与可观测性 #
Prometheus 监控配置 #
# deployments/monitoring/prometheus.yml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
data:
prometheus.yml: |
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
scrape_configs:
- job_name: 'user-service'
static_configs:
- targets: ['user-service:8001']
metrics_path: /metrics
scrape_interval: 10s
- job_name: 'product-service'
static_configs:
- targets: ['product-service:8002']
metrics_path: /metrics
scrape_interval: 10s
- job_name: 'order-service'
static_configs:
- targets: ['order-service:8003']
metrics_path: /metrics
scrape_interval: 10s
- job_name: 'kong'
static_configs:
- targets: ['kong:8001']
metrics_path: /metrics
scrape_interval: 15s
- job_name: 'consul'
static_configs:
- targets: ['consul:8500']
metrics_path: /v1/agent/metrics
params:
format: ['prometheus']
scrape_interval: 15s
alert_rules.yml: |
groups:
- name: ecommerce_alerts
rules:
- alert: ServiceDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Service {{ $labels.job }} is down"
description: "Service {{ $labels.job }} has been down for more than 1 minute"
- alert: HighErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "High error rate on {{ $labels.job }}"
description: "Error rate is {{ $value }} errors per second"
- alert: HighLatency
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 0.5
for: 2m
labels:
severity: warning
annotations:
summary: "High latency on {{ $labels.job }}"
description: "95th percentile latency is {{ $value }} seconds"
- alert: HighMemoryUsage
expr: (container_memory_usage_bytes / container_spec_memory_limit_bytes) > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "High memory usage on {{ $labels.container_label_io_kubernetes_pod_name }}"
description: "Memory usage is {{ $value | humanizePercentage }}"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: prometheus
spec:
replicas: 1
selector:
matchLabels:
app: prometheus
template:
metadata:
labels:
app: prometheus
spec:
containers:
- name: prometheus
image: prom/prometheus:latest
ports:
- containerPort: 9090
volumeMounts:
- name: config
mountPath: /etc/prometheus
- name: storage
mountPath: /prometheus
args:
- "--config.file=/etc/prometheus/prometheus.yml"
- "--storage.tsdb.path=/prometheus"
- "--web.console.libraries=/etc/prometheus/console_libraries"
- "--web.console.templates=/etc/prometheus/consoles"
- "--storage.tsdb.retention.time=15d"
- "--web.enable-lifecycle"
volumes:
- name: config
configMap:
name: prometheus-config
- name: storage
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: prometheus
spec:
selector:
app: prometheus
ports:
- port: 9090
targetPort: 9090
type: ClusterIP
应用指标收集 #
// pkg/metrics/metrics.go
package metrics
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// HTTP 请求指标
httpRequestsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total number of HTTP requests",
},
[]string{"method", "endpoint", "status"},
)
httpRequestDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"method", "endpoint"},
)
// 业务指标
ordersTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "orders_total",
Help: "Total number of orders",
},
[]string{"status"},
)
orderAmount = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "order_amount",
Help: "Order amount distribution",
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
},
[]string{"currency"},
)
activeUsers = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "active_users",
Help: "Number of active users",
},
)
inventoryLevel = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "inventory_level",
Help: "Current inventory level",
},
[]string{"product_id"},
)
// 系统指标
databaseConnections = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "database_connections",
Help: "Number of database connections",
},
[]string{"state"},
)
cacheHitRate = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cache_hit_rate",
Help: "Cache hit rate",
},
[]string{"cache_type"},
)
)
// 指标收集器
type MetricsCollector struct{}
func NewMetricsCollector() *MetricsCollector {
return &MetricsCollector{}
}
// 记录 HTTP 请求
func (m *MetricsCollector) RecordHTTPRequest(method, endpoint, status string, duration time.Duration) {
httpRequestsTotal.WithLabelValues(method, endpoint, status).Inc()
httpRequestDuration.WithLabelValues(method, endpoint).Observe(duration.Seconds())
}
// 记录订单创建
func (m *MetricsCollector) RecordOrderCreated(status string, amount float64, currency string) {
ordersTotal.WithLabelValues(status).Inc()
orderAmount.WithLabelValues(currency).Observe(amount)
}
// 更新活跃用户数
func (m *MetricsCollector) UpdateActiveUsers(count float64) {
activeUsers.Set(count)
}
// 更新库存水平
func (m *MetricsCollector) UpdateInventoryLevel(productID string, level float64) {
inventoryLevel.WithLabelValues(productID).Set(level)
}
// 更新数据库连接数
func (m *MetricsCollector) UpdateDatabaseConnections(state string, count float64) {
databaseConnections.WithLabelValues(state).Set(count)
}
// 更新缓存命中率
func (m *MetricsCollector) UpdateCacheHitRate(cacheType string, rate float64) {
cacheHitRate.WithLabelValues(cacheType).Set(rate)
}
// HTTP 中间件
func (m *MetricsCollector) HTTPMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
start := time.Now()
c.Next()
duration := time.Since(start)
status := fmt.Sprintf("%d", c.Writer.Status())
m.RecordHTTPRequest(c.Request.Method, c.FullPath(), status, duration)
}
}
分布式追踪 #
// pkg/tracing/tracer.go
package tracing
import (
"context"
"io"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
)
// 初始化 Jaeger 追踪器
func InitJaeger(serviceName, jaegerEndpoint string, sampleRate float64) (opentracing.Tracer, io.Closer, error) {
cfg := config.Configuration{
ServiceName: serviceName,
Sampler: &config.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: sampleRate,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: jaegerEndpoint,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
return nil, nil, err
}
opentracing.SetGlobalTracer(tracer)
return tracer, closer, nil
}
// HTTP 追踪中间件
func TracingMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
tracer := opentracing.GlobalTracer()
spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(c.Request.Header))
span := tracer.StartSpan(
fmt.Sprintf("%s %s", c.Request.Method, c.FullPath()),
ext.RPCServerOption(spanCtx),
)
defer span.Finish()
ext.HTTPMethod.Set(span, c.Request.Method)
ext.HTTPUrl.Set(span, c.Request.URL.String())
c.Set("tracing-span", span)
c.Request = c.Request.WithContext(opentracing.ContextWithSpan(c.Request.Context(), span))
c.Next()
ext.HTTPStatusCode.Set(span, uint16(c.Writer.Status()))
if c.Writer.Status() >= 400 {
ext.Error.Set(span, true)
}
}
}
// 创建子 Span
func StartSpanFromContext(ctx context.Context, operationName string) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, operationName)
return span, ctx
}
// 服务间调用追踪
func InjectSpanToHTTPRequest(span opentracing.Span, req *http.Request) {
tracer := opentracing.GlobalTracer()
tracer.Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header),
)
}
集成测试 #
API 测试套件 #
// tests/integration/api_test.go
package integration
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type APITestSuite struct {
suite.Suite
baseURL string
client *http.Client
token string
}
func (suite *APITestSuite) SetupSuite() {
suite.baseURL = "http://localhost:8000" // Kong gateway
suite.client = &http.Client{
Timeout: 30 * time.Second,
}
// 创建测试用户并获取 token
suite.createTestUser()
suite.loginAndGetToken()
}
func (suite *APITestSuite) createTestUser() {
userData := map[string]interface{}{
"username": "testuser",
"email": "[email protected]",
"phone": "1234567890",
"password": "password123",
}
jsonData, _ := json.Marshal(userData)
resp, err := suite.client.Post(
suite.baseURL+"/api/v1/users/register",
"application/json",
bytes.NewBuffer(jsonData),
)
suite.NoError(err)
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusConflict {
suite.T().Fatalf("Failed to create test user: %d", resp.StatusCode)
}
}
func (suite *APITestSuite) loginAndGetToken() {
loginData := map[string]interface{}{
"username": "testuser",
"password": "password123",
}
jsonData, _ := json.Marshal(loginData)
resp, err := suite.client.Post(
suite.baseURL+"/api/v1/users/login",
"application/json",
bytes.NewBuffer(jsonData),
)
suite.NoError(err)
defer resp.Body.Close()
suite.Equal(http.StatusOK, resp.StatusCode)
var result map[string]interface{}
json.NewDecoder(resp.Body).Decode(&result)
suite.token = result["token"].(string)
suite.NotEmpty(suite.token)
}
func (suite *APITestSuite) TestUserWorkflow() {
// 测试获取用户信息
suite.Run("GetUserProfile", func() {
req, _ := http.NewRequest("GET", suite.baseURL+"/api/v1/users/profile", nil)
req.Header.Set("Authorization", "Bearer "+suite.token)
resp, err := suite.client.Do(req)
suite.NoError(err)
defer resp.Body.Close()
suite.Equal(http.StatusOK, resp.StatusCode)
var user map[string]interface{}
json.NewDecoder(resp.Body).Decode(&user)
suite.Equal("testuser", user["username"])
suite.Equal("[email protected]", user["email"])
})
// 测试更新用户资料
suite.Run("UpdateUserProfile", func() {
updateData := map[string]interface{}{
"first_name": "Test",
"last_name": "User",
"address": map[string]interface{}{
"street": "123 Test St",
"city": "Test City",
"province": "Test Province",
"country": "Test Country",
"zip_code": "12345",
},
}
jsonData, _ := json.Marshal(updateData)
req, _ := http.NewRequest("PUT", suite.baseURL+"/api/v1/users/profile", bytes.NewBuffer(jsonData))
req.Header.Set("Authorization", "Bearer "+suite.token)
req.Header.Set("Content-Type", "application/json")
resp, err := suite.client.Do(req)
suite.NoError(err)
defer resp.Body.Close()
suite.Equal(http.StatusOK, resp.StatusCode)
})
}
func (suite *APITestSuite) TestProductWorkflow() {
var productID string
// 测试创建商品
suite.Run("CreateProduct", func() {
productData := map[string]interface{}{
"name": "Test Product",
"description": "This is a test product",
"sku": "TEST-001",
"brand_id": "test-brand-id",
"category_id": "test-category-id",
"price": map[string]interface{}{
"amount": 10000, // 100.00
"currency": "CNY",
},
"images": []map[string]interface{}{
{
"url": "https://example.com/image.jpg",
"alt": "Test Product Image",
"is_primary": true,
"sort_order": 1,
},
},
"attributes": []map[string]interface{}{
{
"name": "Color",
"value": "Red",
"type": 1,
},
},
"weight": map[string]interface{}{
"value": 1.5,
"unit": "kg",
},
"dimensions": map[string]interface{}{
"length": 10.0,
"width": 5.0,
"height": 3.0,
"unit": "cm",
},
"tags": []string{"test", "product"},
}
jsonData, _ := json.Marshal(productData)
req, _ := http.NewRequest("POST", suite.baseURL+"/api/v1/products", bytes.NewBuffer(jsonData))
req.Header.Set("Authorization", "Bearer "+suite.token)
req.Header.Set("Content-Type", "application/json")
resp, err := suite.client.Do(req)
suite.NoError(err)
defer resp.Body.Close()
suite.Equal(http.StatusCreated, resp.StatusCode)
var result map[string]interface{}
json.NewDecoder(resp.Body).Decode(&result)
productID = result["product_id"].(string)
suite.NotEmpty(productID)
})
// 测试获取商品详情
suite.Run("GetProduct", func() {
resp, err := suite.client.Get(suite.baseURL + "/api/v1/products/" + productID)
suite.NoError(err)
defer resp.Body.Close()
suite.Equal(http.StatusOK, resp.StatusCode)
var product map[string]interface{}
json.NewDecoder(resp.Body).Decode(&product)
suite.Equal("Test Product", product["name"])
suite.Equal("TEST-001", product["sku"])
})
// 测试更新库存
suite.Run("UpdateInventory", func() {
inventoryData := map[string]interface{}{
"quantity": 100,
}
jsonData, _ := json.Marshal(inventoryData)
req, _ := http.NewRequest("PUT", suite.baseURL+"/api/v1/products/"+productID+"/inventory", bytes.NewBuffer(jsonData))
req.Header.Set("Authorization", "Bearer "+suite.token)
req.Header.Set("Content-Type", "application/json")
resp, err := suite.client.Do(req)
suite.NoError(err)
defer resp.Body.Close()
suite.Equal(http.StatusOK, resp.StatusCode)
})
}
func (suite *APITestSuite) TestOrderWorkflow() {
var orderID string
// 测试创建订单
suite.Run("CreateOrder", func() {
orderData := map[string]interface{}{
"items": []map[string]interface{}{
{
"product_id": "test-product-id",
"quantity": 2,
},
},
"shipping_address": map[string]interface{}{
"recipient_name": "Test User",
"phone": "1234567890",
"street": "123 Test St",
"city": "Test City",
"province": "Test Province",
"country": "Test Country",
"zip_code": "12345",
},
"billing_address": map[string]interface{}{
"recipient_name": "Test User",
"phone": "1234567890",
"street": "123 Test St",
"city": "Test City",
"province": "Test Province",
"country": "Test Country",
"zip_code": "12345",
},
"shipping_method": map[string]interface{}{
"id": "standard",
"name": "Standard Shipping",
"description": "5-7 business days",
"price": map[string]interface{}{
"amount": 1000, // 10.00
"currency": "CNY",
},
"estimated_days": 7,
},
"payment_method": 1, // Credit Card
"payment_amount": map[string]interface{}{
"amount": 21000, // 210.00
"currency": "CNY",
},
}
jsonData, _ := json.Marshal(orderData)
req, _ := http.NewRequest("POST", suite.baseURL+"/api/v1/orders", bytes.NewBuffer(jsonData))
req.Header.Set("Authorization", "Bearer "+suite.token)
req.Header.Set("Content-Type", "application/json")
resp, err := suite.client.Do(req)
suite.NoError(err)
defer resp.Body.Close()
suite.Equal(http.StatusCreated, resp.StatusCode)
var result map[string]interface{}
json.NewDecoder(resp.Body).Decode(&result)
orderID = result["saga_id"].(string)
suite.NotEmpty(orderID)
})
// 等待 Saga 完成
time.Sleep(5 * time.Second)
// 测试获取订单列表
suite.Run("GetOrders", func() {
req, _ := http.NewRequest("GET", suite.baseURL+"/api/v1/orders", nil)
req.Header.Set("Authorization", "Bearer "+suite.token)
resp, err := suite.client.Do(req)
suite.NoError(err)
defer resp.Body.Close()
suite.Equal(http.StatusOK, resp.StatusCode)
var result map[string]interface{}
json.NewDecoder(resp.Body).Decode(&result)
orders := result["orders"].([]interface{})
suite.Greater(len(orders), 0)
})
}
func TestAPITestSuite(t *testing.T) {
suite.Run(t, new(APITestSuite))
}
性能测试 #
// tests/performance/load_test.go
package performance
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"sync"
"testing"
"time"
)
func BenchmarkUserRegistration(b *testing.B) {
client := &http.Client{Timeout: 30 * time.Second}
baseURL := "http://localhost:8000"
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
userData := map[string]interface{}{
"username": fmt.Sprintf("user%d", i),
"email": fmt.Sprintf("user%[email protected]", i),
"phone": fmt.Sprintf("123456%04d", i),
"password": "password123",
}
jsonData, _ := json.Marshal(userData)
resp, err := client.Post(
baseURL+"/api/v1/users/register",
"application/json",
bytes.NewBuffer(jsonData),
)
if err != nil {
b.Error(err)
continue
}
resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
b.Errorf("Expected status 201, got %d", resp.StatusCode)
}
i++
}
})
}
func BenchmarkProductSearch(b *testing.B) {
client := &http.Client{Timeout: 30 * time.Second}
baseURL := "http://localhost:8000"
searchQueries := []string{
"laptop",
"phone",
"book",
"shoes",
"watch",
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
query := searchQueries[i%len(searchQueries)]
url := fmt.Sprintf("%s/api/v1/products/search?q=%s", baseURL, query)
resp, err := client.Get(url)
if err != nil {
b.Error(err)
continue
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b.Errorf("Expected status 200, got %d", resp.StatusCode)
}
i++
}
})
}
// 并发测试
func TestConcurrentOrderCreation(t *testing.T) {
client := &http.Client{Timeout: 30 * time.Second}
baseURL := "http://localhost:8000"
// 获取测试 token
token := getTestToken(t, client, baseURL)
concurrency := 10
ordersPerGoroutine := 5
var wg sync.WaitGroup
errors := make(chan error, concurrency*ordersPerGoroutine)
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < ordersPerGoroutine; j++ {
orderData := map[string]interface{}{
"items": []map[string]interface{}{
{
"product_id": "test-product-id",
"quantity": 1,
},
},
"shipping_address": map[string]interface{}{
"recipient_name": fmt.Sprintf("User %d-%d", goroutineID, j),
"phone": "1234567890",
"street": "123 Test St",
"city": "Test City",
"province": "Test Province",
"country": "Test Country",
"zip_code": "12345",
},
"billing_address": map[string]interface{}{
"recipient_name": fmt.Sprintf("User %d-%d", goroutineID, j),
"phone": "1234567890",
"street": "123 Test St",
"city": "Test City",
"province": "Test Province",
"country": "Test Country",
"zip_code": "12345",
},
"shipping_method": map[string]interface{}{
"id": "standard",
"name": "Standard Shipping",
"description": "5-7 business days",
"price": map[string]interface{}{
"amount": 1000,
"currency": "CNY",
},
"estimated_days": 7,
},
"payment_method": 1,
"payment_amount": map[string]interface{}{
"amount": 11000,
"currency": "CNY",
},
}
jsonData, _ := json.Marshal(orderData)
req, _ := http.NewRequest("POST", baseURL+"/api/v1/orders", bytes.NewBuffer(jsonData))
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
errors <- err
continue
}
resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
errors <- fmt.Errorf("expected status 201, got %d", resp.StatusCode)
}
}
}(i)
}
wg.Wait()
close(errors)
errorCount := 0
for err := range errors {
t.Error(err)
errorCount++
}
if errorCount > 0 {
t.Fatalf("Failed with %d errors out of %d requests", errorCount, concurrency*ordersPerGoroutine)
}
t.Logf("Successfully created %d orders concurrently", concurrency*ordersPerGoroutine)
}
func getTestToken(t *testing.T, client *http.Client, baseURL string) string {
loginData := map[string]interface{}{
"username": "testuser",
"password": "password123",
}
jsonData, _ := json.Marshal(loginData)
resp, err := client.Post(
baseURL+"/api/v1/users/login",
"application/json",
bytes.NewBuffer(jsonData),
)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("Login failed with status: %d", resp.StatusCode)
}
var result map[string]interface{}
json.NewDecoder(resp.Body).Decode(&result)
return result["token"].(string)
}
部署脚本 #
自动化部署脚本 #
#!/bin/bash
# scripts/deploy.sh
set -e
# 配置变量
ENVIRONMENT=${1:-development}
NAMESPACE="ecommerce-${ENVIRONMENT}"
DOCKER_REGISTRY="your-registry.com"
VERSION=${2:-latest}
echo "Deploying ecommerce microservices to ${ENVIRONMENT} environment..."
# 创建命名空间
kubectl create namespace ${NAMESPACE} --dry-run=client -o yaml | kubectl apply -f -
# 部署基础设施服务
echo "Deploying infrastructure services..."
kubectl apply -f deployments/k8s/infrastructure/ -n ${NAMESPACE}
# 等待基础设施服务就绪
echo "Waiting for infrastructure services to be ready..."
kubectl wait --for=condition=ready pod -l app=consul -n ${NAMESPACE} --timeout=300s
kubectl wait --for=condition=ready pod -l app=redis -n ${NAMESPACE} --timeout=300s
kubectl wait --for=condition=ready pod -l app=kafka -n ${NAMESPACE} --timeout=300s
# 部署数据库
echo "Deploying databases..."
kubectl apply -f deployments/k8s/databases/ -n ${NAMESPACE}
# 等待数据库就绪
echo "Waiting for databases to be ready..."
kubectl wait --for=condition=ready pod -l app=user-db -n ${NAMESPACE} --timeout=300s
kubectl wait --for=condition=ready pod -l app=product-db -n ${NAMESPACE} --timeout=300s
kubectl wait --for=condition=ready pod -l app=order-db -n ${NAMESPACE} --timeout=300s
# 运行数据库迁移
echo "Running database migrations..."
kubectl apply -f deployments/k8s/migrations/ -n ${NAMESPACE}
# 部署微服务
echo "Deploying microservices..."
# 更新镜像版本
sed -i "s|image: ecommerce/user-service:.*|image: ${DOCKER_REGISTRY}/ecommerce/user-service:${VERSION}|g" deployments/k8s/user-service.yml
sed -i "s|image: ecommerce/product-service:.*|image: ${DOCKER_REGISTRY}/ecommerce/product-service:${VERSION}|g" deployments/k8s/product-service.yml
sed -i "s|image: ecommerce/order-service:.*|image: ${DOCKER_REGISTRY}/ecommerce/order-service:${VERSION}|g" deployments/k8s/order-service.yml
kubectl apply -f deployments/k8s/user-service.yml -n ${NAMESPACE}
kubectl apply -f deployments/k8s/product-service.yml -n ${NAMESPACE}
kubectl apply -f deployments/k8s/order-service.yml -n ${NAMESPACE}
# 等待微服务就绪
echo "Waiting for microservices to be ready..."
kubectl wait --for=condition=ready pod -l app=user-service -n ${NAMESPACE} --timeout=300s
kubectl wait --for=condition=ready pod -l app=product-service -n ${NAMESPACE} --timeout=300s
kubectl wait --for=condition=ready pod -l app=order-service -n ${NAMESPACE} --timeout=300s
# 部署 API 网关
echo "Deploying API Gateway..."
kubectl apply -f deployments/k8s/kong.yml -n ${NAMESPACE}
# 等待网关就绪
kubectl wait --for=condition=ready pod -l app=kong -n ${NAMESPACE} --timeout=300s
# 部署监控服务
echo "Deploying monitoring services..."
kubectl apply -f deployments/monitoring/ -n ${NAMESPACE}
# 运行健康检查
echo "Running health checks..."
./scripts/health-check.sh ${NAMESPACE}
echo "Deployment completed successfully!"
echo "API Gateway: http://$(kubectl get svc kong -n ${NAMESPACE} -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):8000"
echo "Prometheus: http://$(kubectl get svc prometheus -n ${NAMESPACE} -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):9090"
echo "Grafana: http://$(kubectl get svc grafana -n ${NAMESPACE} -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):3000"
健康检查脚本 #
#!/bin/bash
# scripts/health-check.sh
NAMESPACE=${1:-ecommerce-development}
echo "Running health checks for namespace: ${NAMESPACE}"
# 检查服务状态
check_service() {
local service=$1
local port=$2
echo "Checking ${service}..."
# 获取服务 IP
SERVICE_IP=$(kubectl get svc ${service} -n ${NAMESPACE} -o jsonpath='{.spec.clusterIP}')
if [ -z "$SERVICE_IP" ]; then
echo "❌ ${service}: Service not found"
return 1
fi
# 健康检查
if kubectl run health-check-${service} --rm -i --restart=Never --image=curlimages/curl -- curl -f http://${SERVICE_IP}:${port}/health > /dev/null 2>&1; then
echo "✅ ${service}: Healthy"
return 0
else
echo "❌ ${service}: Unhealthy"
return 1
fi
}
# 检查各个服务
FAILED=0
check_service "user-service" "8001" || FAILED=1
check_service "product-service" "8002" || FAILED=1
check_service "order-service" "8003" || FAILED=1
check_service "kong" "8000" || FAILED=1
# 检查数据库连接
echo "Checking database connections..."
# 用户数据库
if kubectl exec -n ${NAMESPACE} deployment/user-db -- pg_isready > /dev/null 2>&1; then
echo "✅ user-db: Connected"
else
echo "❌ user-db: Connection failed"
FAILED=1
fi
# 商品数据库
if kubectl exec -n ${NAMESPACE} deployment/product-db -- pg_isready > /dev/null 2>&1; then
echo "✅ product-db: Connected"
else
echo "❌ product-db: Connection failed"
FAILED=1
fi
# 订单数据库
if kubectl exec -n ${NAMESPACE} deployment/order-db -- pg_isready > /dev/null 2>&1; then
echo "✅ order-db: Connected"
else
echo "❌ order-db: Connection failed"
FAILED=1
fi
# 检查 Redis
if kubectl exec -n ${NAMESPACE} deployment/redis -- redis-cli ping | grep -q PONG; then
echo "✅ redis: Connected"
else
echo "❌ redis: Connection failed"
FAILED=1
fi
# 检查 Kafka
if kubectl exec -n ${NAMESPACE} deployment/kafka -- kafka-topics --bootstrap-server localhost:9092 --list > /dev/null 2>&1; then
echo "✅ kafka: Connected"
else
echo "❌ kafka: Connection failed"
FAILED=1
fi
if [ $FAILED -eq 0 ]; then
echo "🎉 All health checks passed!"
exit 0
else
echo "💥 Some health checks failed!"
exit 1
fi
总结 #
通过本节的综合实战,我们完成了一个完整的电商微服务系统的构建,包括:
- 服务集成:实现了用户服务、商品服务、订单服务的完整集成
- API 网关:使用 Kong 实现统一的 API 入口和流量管理
- 容器化部署:使用 Docker 和 Kubernetes 实现服务的容器化部署
- 监控体系:集成 Prometheus、Grafana、Jaeger 实现完整的可观测性
- 测试策略:包含集成测试、性能测试、并发测试
- 自动化部署:提供完整的部署脚本和健康检查
这个系统展示了现代微服务架构的最佳实践,为构建大规模分布式系统提供了完整的参考实现。