5.7.2 OpenTelemetry 使用

5.7.2 OpenTelemetry 使用 #

OpenTelemetry(简称 OTel)是云原生计算基金会(CNCF)的一个可观测性项目,它提供了一套标准化的 API、SDK 和工具,用于收集、处理和导出遥测数据(追踪、指标和日志)。

OpenTelemetry 架构概览 #

OpenTelemetry 采用分层架构设计,主要包含以下组件:

  • API:定义了遥测数据的接口标准
  • SDK:提供了 API 的具体实现
  • Instrumentation:自动和手动埋点库
  • Collector:数据收集、处理和导出服务
  • Exporters:将数据导出到各种后端系统

Go SDK 集成 #

基础依赖安装 #

go mod init otel-example
go get go.opentelemetry.io/otel
go get go.opentelemetry.io/otel/sdk
go get go.opentelemetry.io/otel/exporters/jaeger
go get go.opentelemetry.io/otel/exporters/prometheus
go get go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp

初始化 OpenTelemetry #

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/sdk/resource"
    "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

// initTracer 初始化追踪器
func initTracer(serviceName string) (*trace.TracerProvider, error) {
    // 创建 Jaeger 导出器
    exp, err := jaeger.New(jaeger.WithCollectorEndpoint(
        jaeger.WithEndpoint("http://localhost:14268/api/traces"),
    ))
    if err != nil {
        return nil, fmt.Errorf("failed to create Jaeger exporter: %w", err)
    }

    // 创建资源信息
    res, err := resource.New(context.Background(),
        resource.WithAttributes(
            semconv.ServiceName(serviceName),
            semconv.ServiceVersion("1.0.0"),
            semconv.DeploymentEnvironment("development"),
        ),
    )
    if err != nil {
        return nil, fmt.Errorf("failed to create resource: %w", err)
    }

    // 创建追踪提供者
    tp := trace.NewTracerProvider(
        trace.WithBatcher(exp),
        trace.WithResource(res),
        trace.WithSampler(trace.AlwaysSample()), // 开发环境全采样
    )

    // 设置全局追踪提供者
    otel.SetTracerProvider(tp)

    // 设置全局传播器
    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
        propagation.TraceContext{},
        propagation.Baggage{},
    ))

    return tp, nil
}

func main() {
    // 初始化追踪器
    tp, err := initTracer("user-service")
    if err != nil {
        log.Fatal(err)
    }
    defer func() {
        if err := tp.Shutdown(context.Background()); err != nil {
            log.Printf("Error shutting down tracer provider: %v", err)
        }
    }()

    // 获取追踪器实例
    tracer := otel.Tracer("user-service")

    // 创建根 Span
    ctx, span := tracer.Start(context.Background(), "main")
    defer span.End()

    // 添加属性
    span.SetAttributes(
        attribute.String("user.id", "123"),
        attribute.String("operation", "get_user_profile"),
    )

    // 执行业务逻辑
    err = processUser(ctx, tracer)
    if err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, err.Error())
    }
}

手动埋点技术 #

基础 Span 操作 #

package tracing

import (
    "context"
    "database/sql"
    "fmt"
    "time"

    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/codes"
    "go.opentelemetry.io/otel/trace"
)

// UserService 用户服务
type UserService struct {
    db     *sql.DB
    tracer trace.Tracer
}

func NewUserService(db *sql.DB) *UserService {
    return &UserService{
        db:     db,
        tracer: otel.Tracer("user-service"),
    }
}

// GetUser 获取用户信息
func (s *UserService) GetUser(ctx context.Context, userID string) (*User, error) {
    // 创建 Span
    ctx, span := s.tracer.Start(ctx, "UserService.GetUser")
    defer span.End()

    // 添加输入参数作为属性
    span.SetAttributes(
        attribute.String("user.id", userID),
        attribute.String("operation", "get_user"),
    )

    // 验证输入参数
    if userID == "" {
        err := fmt.Errorf("user ID cannot be empty")
        span.RecordError(err)
        span.SetStatus(codes.Error, "invalid input")
        return nil, err
    }

    // 查询数据库
    user, err := s.queryUserFromDB(ctx, userID)
    if err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, "database query failed")
        return nil, err
    }

    // 添加结果属性
    span.SetAttributes(
        attribute.String("user.name", user.Name),
        attribute.String("user.email", user.Email),
    )

    span.SetStatus(codes.Ok, "user retrieved successfully")
    return user, nil
}

// queryUserFromDB 数据库查询
func (s *UserService) queryUserFromDB(ctx context.Context, userID string) (*User, error) {
    ctx, span := s.tracer.Start(ctx, "database.query")
    defer span.End()

    // 添加数据库相关属性
    span.SetAttributes(
        attribute.String("db.system", "postgresql"),
        attribute.String("db.operation", "SELECT"),
        attribute.String("db.statement", "SELECT * FROM users WHERE id = $1"),
    )

    // 添加事件
    span.AddEvent("query.start", trace.WithAttributes(
        attribute.String("query.type", "user_lookup"),
    ))

    // 模拟数据库查询
    time.Sleep(50 * time.Millisecond)

    user := &User{
        ID:    userID,
        Name:  "John Doe",
        Email: "[email protected]",
    }

    span.AddEvent("query.complete", trace.WithAttributes(
        attribute.Int("rows.affected", 1),
    ))

    return user, nil
}

type User struct {
    ID    string `json:"id"`
    Name  string `json:"name"`
    Email string `json:"email"`
}

错误处理和状态设置 #

// ProcessOrder 处理订单
func (s *OrderService) ProcessOrder(ctx context.Context, order *Order) error {
    ctx, span := s.tracer.Start(ctx, "OrderService.ProcessOrder")
    defer span.End()

    span.SetAttributes(
        attribute.String("order.id", order.ID),
        attribute.Float64("order.amount", order.Amount),
        attribute.String("order.currency", order.Currency),
    )

    // 验证订单
    if err := s.validateOrder(ctx, order); err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, "order validation failed")
        return fmt.Errorf("validation error: %w", err)
    }

    // 检查库存
    available, err := s.checkInventory(ctx, order.ProductID, order.Quantity)
    if err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, "inventory check failed")
        return fmt.Errorf("inventory error: %w", err)
    }

    if !available {
        span.SetAttributes(attribute.Bool("inventory.available", false))
        span.SetStatus(codes.Error, "insufficient inventory")
        return fmt.Errorf("insufficient inventory for product %s", order.ProductID)
    }

    // 处理支付
    if err := s.processPayment(ctx, order); err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, "payment processing failed")
        return fmt.Errorf("payment error: %w", err)
    }

    // 创建订单记录
    if err := s.createOrderRecord(ctx, order); err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, "order creation failed")
        return fmt.Errorf("order creation error: %w", err)
    }

    span.SetStatus(codes.Ok, "order processed successfully")
    return nil
}

自动埋点技术 #

HTTP 服务自动埋点 #

package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"

    "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
)

// 使用 otelhttp 中间件自动埋点
func setupHTTPServer() {
    tracer := otel.Tracer("http-server")

    // 用户处理器
    userHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        ctx := r.Context()

        // 从自动创建的 Span 中获取信息
        span := trace.SpanFromContext(ctx)
        span.SetAttributes(
            attribute.String("handler", "user"),
            attribute.String("user.id", r.URL.Query().Get("id")),
        )

        // 创建子 Span 进行业务处理
        ctx, businessSpan := tracer.Start(ctx, "business.get_user")
        defer businessSpan.End()

        user := processUserRequest(ctx, r.URL.Query().Get("id"))

        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(user)
    })

    // 应用 OpenTelemetry HTTP 中间件
    wrappedHandler := otelhttp.NewHandler(userHandler, "user-handler")

    http.Handle("/user", wrappedHandler)

    log.Println("Server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

func processUserRequest(ctx context.Context, userID string) map[string]interface{} {
    // 这里会自动继承父 Span 的上下文
    tracer := otel.Tracer("business-logic")
    ctx, span := tracer.Start(ctx, "process_user_request")
    defer span.End()

    span.SetAttributes(attribute.String("user.id", userID))

    return map[string]interface{}{
        "id":   userID,
        "name": "John Doe",
    }
}

HTTP 客户端自动埋点 #

package client

import (
    "context"
    "fmt"
    "net/http"

    "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
)

// HTTPClient 带追踪的 HTTP 客户端
type HTTPClient struct {
    client *http.Client
    tracer trace.Tracer
}

func NewHTTPClient() *HTTPClient {
    // 创建带 OpenTelemetry 埋点的 HTTP 客户端
    client := &http.Client{
        Transport: otelhttp.NewTransport(http.DefaultTransport),
    }

    return &HTTPClient{
        client: client,
        tracer: otel.Tracer("http-client"),
    }
}

// CallUserService 调用用户服务
func (c *HTTPClient) CallUserService(ctx context.Context, userID string) (*User, error) {
    ctx, span := c.tracer.Start(ctx, "http_client.call_user_service")
    defer span.End()

    span.SetAttributes(
        attribute.String("service.name", "user-service"),
        attribute.String("user.id", userID),
    )

    url := fmt.Sprintf("http://user-service:8080/user?id=%s", userID)
    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        span.RecordError(err)
        return nil, err
    }

    // HTTP 请求会自动创建子 Span
    resp, err := c.client.Do(req)
    if err != nil {
        span.RecordError(err)
        return nil, err
    }
    defer resp.Body.Close()

    span.SetAttributes(
        attribute.Int("http.status_code", resp.StatusCode),
        attribute.String("http.method", "GET"),
        attribute.String("http.url", url),
    )

    if resp.StatusCode != http.StatusOK {
        err := fmt.Errorf("HTTP error: %d", resp.StatusCode)
        span.RecordError(err)
        return nil, err
    }

    var user User
    if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
        span.RecordError(err)
        return nil, err
    }

    return &user, nil
}

指标收集与导出 #

基础指标定义 #

package metrics

import (
    "context"
    "time"

    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/metric"
)

// MetricsService 指标服务
type MetricsService struct {
    meter           metric.Meter
    requestCounter  metric.Int64Counter
    requestDuration metric.Float64Histogram
    activeUsers     metric.Int64UpDownCounter
}

func NewMetricsService() (*MetricsService, error) {
    meter := otel.Meter("user-service")

    // 创建请求计数器
    requestCounter, err := meter.Int64Counter(
        "http_requests_total",
        metric.WithDescription("Total number of HTTP requests"),
        metric.WithUnit("1"),
    )
    if err != nil {
        return nil, err
    }

    // 创建请求持续时间直方图
    requestDuration, err := meter.Float64Histogram(
        "http_request_duration_seconds",
        metric.WithDescription("HTTP request duration in seconds"),
        metric.WithUnit("s"),
    )
    if err != nil {
        return nil, err
    }

    // 创建活跃用户计数器
    activeUsers, err := meter.Int64UpDownCounter(
        "active_users",
        metric.WithDescription("Number of active users"),
        metric.WithUnit("1"),
    )
    if err != nil {
        return nil, err
    }

    return &MetricsService{
        meter:           meter,
        requestCounter:  requestCounter,
        requestDuration: requestDuration,
        activeUsers:     activeUsers,
    }, nil
}

// RecordHTTPRequest 记录 HTTP 请求指标
func (m *MetricsService) RecordHTTPRequest(ctx context.Context, method, path string, statusCode int, duration time.Duration) {
    // 记录请求计数
    m.requestCounter.Add(ctx, 1,
        metric.WithAttributes(
            attribute.String("method", method),
            attribute.String("path", path),
            attribute.Int("status_code", statusCode),
        ),
    )

    // 记录请求持续时间
    m.requestDuration.Record(ctx, duration.Seconds(),
        metric.WithAttributes(
            attribute.String("method", method),
            attribute.String("path", path),
            attribute.Int("status_code", statusCode),
        ),
    )
}

// UpdateActiveUsers 更新活跃用户数
func (m *MetricsService) UpdateActiveUsers(ctx context.Context, delta int64) {
    m.activeUsers.Add(ctx, delta)
}

HTTP 中间件集成指标 #

// MetricsMiddleware HTTP 指标中间件
func (m *MetricsService) MetricsMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        start := time.Now()

        // 包装 ResponseWriter 以捕获状态码
        wrapped := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}

        // 处理请求
        next.ServeHTTP(wrapped, r)

        // 记录指标
        duration := time.Since(start)
        m.RecordHTTPRequest(r.Context(), r.Method, r.URL.Path, wrapped.statusCode, duration)
    })
}

type responseWriter struct {
    http.ResponseWriter
    statusCode int
}

func (rw *responseWriter) WriteHeader(code int) {
    rw.statusCode = code
    rw.ResponseWriter.WriteHeader(code)
}

配置管理 #

环境配置 #

package config

import (
    "os"
    "strconv"
    "time"
)

// TracingConfig 追踪配置
type TracingConfig struct {
    ServiceName     string
    ServiceVersion  string
    Environment     string
    JaegerEndpoint  string
    SamplingRate    float64
    BatchTimeout    time.Duration
    MaxBatchSize    int
    Enabled         bool
}

// LoadTracingConfig 加载追踪配置
func LoadTracingConfig() *TracingConfig {
    config := &TracingConfig{
        ServiceName:     getEnv("SERVICE_NAME", "unknown-service"),
        ServiceVersion:  getEnv("SERVICE_VERSION", "1.0.0"),
        Environment:     getEnv("ENVIRONMENT", "development"),
        JaegerEndpoint:  getEnv("JAEGER_ENDPOINT", "http://localhost:14268/api/traces"),
        SamplingRate:    getEnvFloat("SAMPLING_RATE", 1.0),
        BatchTimeout:    getEnvDuration("BATCH_TIMEOUT", "5s"),
        MaxBatchSize:    getEnvInt("MAX_BATCH_SIZE", 512),
        Enabled:         getEnvBool("TRACING_ENABLED", true),
    }

    return config
}

func getEnv(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}

func getEnvFloat(key string, defaultValue float64) float64 {
    if value := os.Getenv(key); value != "" {
        if f, err := strconv.ParseFloat(value, 64); err == nil {
            return f
        }
    }
    return defaultValue
}

func getEnvDuration(key string, defaultValue string) time.Duration {
    if value := os.Getenv(key); value != "" {
        if d, err := time.ParseDuration(value); err == nil {
            return d
        }
    }
    if d, err := time.ParseDuration(defaultValue); err == nil {
        return d
    }
    return time.Second * 5
}

func getEnvInt(key string, defaultValue int) int {
    if value := os.Getenv(key); value != "" {
        if i, err := strconv.Atoi(value); err == nil {
            return i
        }
    }
    return defaultValue
}

func getEnvBool(key string, defaultValue bool) bool {
    if value := os.Getenv(key); value != "" {
        if b, err := strconv.ParseBool(value); err == nil {
            return b
        }
    }
    return defaultValue
}

完整的初始化示例 #

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/sdk/resource"
    "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

func main() {
    // 加载配置
    config := LoadTracingConfig()

    if !config.Enabled {
        log.Println("Tracing disabled")
        startServer(nil, nil)
        return
    }

    // 初始化追踪
    tp, err := initTracing(config)
    if err != nil {
        log.Fatalf("Failed to initialize tracing: %v", err)
    }

    // 初始化指标
    metricsService, err := NewMetricsService()
    if err != nil {
        log.Fatalf("Failed to initialize metrics: %v", err)
    }

    // 启动服务器
    startServer(tp, metricsService)
}

func initTracing(config *TracingConfig) (*trace.TracerProvider, error) {
    // 创建导出器
    exp, err := jaeger.New(jaeger.WithCollectorEndpoint(
        jaeger.WithEndpoint(config.JaegerEndpoint),
    ))
    if err != nil {
        return nil, err
    }

    // 创建资源
    res, err := resource.New(context.Background(),
        resource.WithAttributes(
            semconv.ServiceName(config.ServiceName),
            semconv.ServiceVersion(config.ServiceVersion),
            semconv.DeploymentEnvironment(config.Environment),
        ),
    )
    if err != nil {
        return nil, err
    }

    // 创建采样器
    var sampler trace.Sampler
    if config.SamplingRate >= 1.0 {
        sampler = trace.AlwaysSample()
    } else if config.SamplingRate <= 0.0 {
        sampler = trace.NeverSample()
    } else {
        sampler = trace.TraceIDRatioBased(config.SamplingRate)
    }

    // 创建追踪提供者
    tp := trace.NewTracerProvider(
        trace.WithBatcher(exp,
            trace.WithBatchTimeout(config.BatchTimeout),
            trace.WithMaxExportBatchSize(config.MaxBatchSize),
        ),
        trace.WithResource(res),
        trace.WithSampler(sampler),
    )

    otel.SetTracerProvider(tp)
    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
        propagation.TraceContext{},
        propagation.Baggage{},
    ))

    return tp, nil
}

func startServer(tp *trace.TracerProvider, metrics *MetricsService) {
    mux := http.NewServeMux()

    // 添加处理器
    handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"status": "ok"}`))
    })

    // 应用中间件
    var finalHandler http.Handler = handler
    if metrics != nil {
        finalHandler = metrics.MetricsMiddleware(finalHandler)
    }
    finalHandler = otelhttp.NewHandler(finalHandler, "health-check")

    mux.Handle("/health", finalHandler)

    server := &http.Server{
        Addr:    ":8080",
        Handler: mux,
    }

    // 优雅关闭
    go func() {
        log.Println("Server starting on :8080")
        if err := server.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatalf("Server failed: %v", err)
        }
    }()

    // 等待信号
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Println("Shutting down server...")

    // 关闭服务器
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    if err := server.Shutdown(ctx); err != nil {
        log.Printf("Server shutdown error: %v", err)
    }

    // 关闭追踪提供者
    if tp != nil {
        if err := tp.Shutdown(context.Background()); err != nil {
            log.Printf("Tracer shutdown error: %v", err)
        }
    }

    log.Println("Server stopped")
}

小结 #

OpenTelemetry 为 Go 应用提供了标准化的可观测性解决方案。通过合理使用自动和手动埋点技术,我们可以构建完整的分布式追踪体系。配合指标收集功能,OpenTelemetry 能够为微服务架构提供全面的可观测性支持。

在下一节中,我们将学习如何使用 Jaeger 来可视化和分析这些追踪数据。