5.7.2 OpenTelemetry 使用 #
OpenTelemetry(简称 OTel)是云原生计算基金会(CNCF)的一个可观测性项目,它提供了一套标准化的 API、SDK 和工具,用于收集、处理和导出遥测数据(追踪、指标和日志)。
OpenTelemetry 架构概览 #
OpenTelemetry 采用分层架构设计,主要包含以下组件:
- API:定义了遥测数据的接口标准
- SDK:提供了 API 的具体实现
- Instrumentation:自动和手动埋点库
- Collector:数据收集、处理和导出服务
- Exporters:将数据导出到各种后端系统
Go SDK 集成 #
基础依赖安装 #
go mod init otel-example
go get go.opentelemetry.io/otel
go get go.opentelemetry.io/otel/sdk
go get go.opentelemetry.io/otel/exporters/jaeger
go get go.opentelemetry.io/otel/exporters/prometheus
go get go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp
初始化 OpenTelemetry #
package main
import (
"context"
"fmt"
"log"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)
// initTracer 初始化追踪器
func initTracer(serviceName string) (*trace.TracerProvider, error) {
// 创建 Jaeger 导出器
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(
jaeger.WithEndpoint("http://localhost:14268/api/traces"),
))
if err != nil {
return nil, fmt.Errorf("failed to create Jaeger exporter: %w", err)
}
// 创建资源信息
res, err := resource.New(context.Background(),
resource.WithAttributes(
semconv.ServiceName(serviceName),
semconv.ServiceVersion("1.0.0"),
semconv.DeploymentEnvironment("development"),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}
// 创建追踪提供者
tp := trace.NewTracerProvider(
trace.WithBatcher(exp),
trace.WithResource(res),
trace.WithSampler(trace.AlwaysSample()), // 开发环境全采样
)
// 设置全局追踪提供者
otel.SetTracerProvider(tp)
// 设置全局传播器
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
return tp, nil
}
func main() {
// 初始化追踪器
tp, err := initTracer("user-service")
if err != nil {
log.Fatal(err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()
// 获取追踪器实例
tracer := otel.Tracer("user-service")
// 创建根 Span
ctx, span := tracer.Start(context.Background(), "main")
defer span.End()
// 添加属性
span.SetAttributes(
attribute.String("user.id", "123"),
attribute.String("operation", "get_user_profile"),
)
// 执行业务逻辑
err = processUser(ctx, tracer)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
}
手动埋点技术 #
基础 Span 操作 #
package tracing
import (
"context"
"database/sql"
"fmt"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
// UserService 用户服务
type UserService struct {
db *sql.DB
tracer trace.Tracer
}
func NewUserService(db *sql.DB) *UserService {
return &UserService{
db: db,
tracer: otel.Tracer("user-service"),
}
}
// GetUser 获取用户信息
func (s *UserService) GetUser(ctx context.Context, userID string) (*User, error) {
// 创建 Span
ctx, span := s.tracer.Start(ctx, "UserService.GetUser")
defer span.End()
// 添加输入参数作为属性
span.SetAttributes(
attribute.String("user.id", userID),
attribute.String("operation", "get_user"),
)
// 验证输入参数
if userID == "" {
err := fmt.Errorf("user ID cannot be empty")
span.RecordError(err)
span.SetStatus(codes.Error, "invalid input")
return nil, err
}
// 查询数据库
user, err := s.queryUserFromDB(ctx, userID)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "database query failed")
return nil, err
}
// 添加结果属性
span.SetAttributes(
attribute.String("user.name", user.Name),
attribute.String("user.email", user.Email),
)
span.SetStatus(codes.Ok, "user retrieved successfully")
return user, nil
}
// queryUserFromDB 数据库查询
func (s *UserService) queryUserFromDB(ctx context.Context, userID string) (*User, error) {
ctx, span := s.tracer.Start(ctx, "database.query")
defer span.End()
// 添加数据库相关属性
span.SetAttributes(
attribute.String("db.system", "postgresql"),
attribute.String("db.operation", "SELECT"),
attribute.String("db.statement", "SELECT * FROM users WHERE id = $1"),
)
// 添加事件
span.AddEvent("query.start", trace.WithAttributes(
attribute.String("query.type", "user_lookup"),
))
// 模拟数据库查询
time.Sleep(50 * time.Millisecond)
user := &User{
ID: userID,
Name: "John Doe",
Email: "[email protected]",
}
span.AddEvent("query.complete", trace.WithAttributes(
attribute.Int("rows.affected", 1),
))
return user, nil
}
type User struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
错误处理和状态设置 #
// ProcessOrder 处理订单
func (s *OrderService) ProcessOrder(ctx context.Context, order *Order) error {
ctx, span := s.tracer.Start(ctx, "OrderService.ProcessOrder")
defer span.End()
span.SetAttributes(
attribute.String("order.id", order.ID),
attribute.Float64("order.amount", order.Amount),
attribute.String("order.currency", order.Currency),
)
// 验证订单
if err := s.validateOrder(ctx, order); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "order validation failed")
return fmt.Errorf("validation error: %w", err)
}
// 检查库存
available, err := s.checkInventory(ctx, order.ProductID, order.Quantity)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "inventory check failed")
return fmt.Errorf("inventory error: %w", err)
}
if !available {
span.SetAttributes(attribute.Bool("inventory.available", false))
span.SetStatus(codes.Error, "insufficient inventory")
return fmt.Errorf("insufficient inventory for product %s", order.ProductID)
}
// 处理支付
if err := s.processPayment(ctx, order); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "payment processing failed")
return fmt.Errorf("payment error: %w", err)
}
// 创建订单记录
if err := s.createOrderRecord(ctx, order); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "order creation failed")
return fmt.Errorf("order creation error: %w", err)
}
span.SetStatus(codes.Ok, "order processed successfully")
return nil
}
自动埋点技术 #
HTTP 服务自动埋点 #
package main
import (
"context"
"encoding/json"
"log"
"net/http"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)
// 使用 otelhttp 中间件自动埋点
func setupHTTPServer() {
tracer := otel.Tracer("http-server")
// 用户处理器
userHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// 从自动创建的 Span 中获取信息
span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.String("handler", "user"),
attribute.String("user.id", r.URL.Query().Get("id")),
)
// 创建子 Span 进行业务处理
ctx, businessSpan := tracer.Start(ctx, "business.get_user")
defer businessSpan.End()
user := processUserRequest(ctx, r.URL.Query().Get("id"))
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(user)
})
// 应用 OpenTelemetry HTTP 中间件
wrappedHandler := otelhttp.NewHandler(userHandler, "user-handler")
http.Handle("/user", wrappedHandler)
log.Println("Server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
func processUserRequest(ctx context.Context, userID string) map[string]interface{} {
// 这里会自动继承父 Span 的上下文
tracer := otel.Tracer("business-logic")
ctx, span := tracer.Start(ctx, "process_user_request")
defer span.End()
span.SetAttributes(attribute.String("user.id", userID))
return map[string]interface{}{
"id": userID,
"name": "John Doe",
}
}
HTTP 客户端自动埋点 #
package client
import (
"context"
"fmt"
"net/http"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)
// HTTPClient 带追踪的 HTTP 客户端
type HTTPClient struct {
client *http.Client
tracer trace.Tracer
}
func NewHTTPClient() *HTTPClient {
// 创建带 OpenTelemetry 埋点的 HTTP 客户端
client := &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
}
return &HTTPClient{
client: client,
tracer: otel.Tracer("http-client"),
}
}
// CallUserService 调用用户服务
func (c *HTTPClient) CallUserService(ctx context.Context, userID string) (*User, error) {
ctx, span := c.tracer.Start(ctx, "http_client.call_user_service")
defer span.End()
span.SetAttributes(
attribute.String("service.name", "user-service"),
attribute.String("user.id", userID),
)
url := fmt.Sprintf("http://user-service:8080/user?id=%s", userID)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
span.RecordError(err)
return nil, err
}
// HTTP 请求会自动创建子 Span
resp, err := c.client.Do(req)
if err != nil {
span.RecordError(err)
return nil, err
}
defer resp.Body.Close()
span.SetAttributes(
attribute.Int("http.status_code", resp.StatusCode),
attribute.String("http.method", "GET"),
attribute.String("http.url", url),
)
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("HTTP error: %d", resp.StatusCode)
span.RecordError(err)
return nil, err
}
var user User
if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
span.RecordError(err)
return nil, err
}
return &user, nil
}
指标收集与导出 #
基础指标定义 #
package metrics
import (
"context"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
// MetricsService 指标服务
type MetricsService struct {
meter metric.Meter
requestCounter metric.Int64Counter
requestDuration metric.Float64Histogram
activeUsers metric.Int64UpDownCounter
}
func NewMetricsService() (*MetricsService, error) {
meter := otel.Meter("user-service")
// 创建请求计数器
requestCounter, err := meter.Int64Counter(
"http_requests_total",
metric.WithDescription("Total number of HTTP requests"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
// 创建请求持续时间直方图
requestDuration, err := meter.Float64Histogram(
"http_request_duration_seconds",
metric.WithDescription("HTTP request duration in seconds"),
metric.WithUnit("s"),
)
if err != nil {
return nil, err
}
// 创建活跃用户计数器
activeUsers, err := meter.Int64UpDownCounter(
"active_users",
metric.WithDescription("Number of active users"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
return &MetricsService{
meter: meter,
requestCounter: requestCounter,
requestDuration: requestDuration,
activeUsers: activeUsers,
}, nil
}
// RecordHTTPRequest 记录 HTTP 请求指标
func (m *MetricsService) RecordHTTPRequest(ctx context.Context, method, path string, statusCode int, duration time.Duration) {
// 记录请求计数
m.requestCounter.Add(ctx, 1,
metric.WithAttributes(
attribute.String("method", method),
attribute.String("path", path),
attribute.Int("status_code", statusCode),
),
)
// 记录请求持续时间
m.requestDuration.Record(ctx, duration.Seconds(),
metric.WithAttributes(
attribute.String("method", method),
attribute.String("path", path),
attribute.Int("status_code", statusCode),
),
)
}
// UpdateActiveUsers 更新活跃用户数
func (m *MetricsService) UpdateActiveUsers(ctx context.Context, delta int64) {
m.activeUsers.Add(ctx, delta)
}
HTTP 中间件集成指标 #
// MetricsMiddleware HTTP 指标中间件
func (m *MetricsService) MetricsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 包装 ResponseWriter 以捕获状态码
wrapped := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}
// 处理请求
next.ServeHTTP(wrapped, r)
// 记录指标
duration := time.Since(start)
m.RecordHTTPRequest(r.Context(), r.Method, r.URL.Path, wrapped.statusCode, duration)
})
}
type responseWriter struct {
http.ResponseWriter
statusCode int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}
配置管理 #
环境配置 #
package config
import (
"os"
"strconv"
"time"
)
// TracingConfig 追踪配置
type TracingConfig struct {
ServiceName string
ServiceVersion string
Environment string
JaegerEndpoint string
SamplingRate float64
BatchTimeout time.Duration
MaxBatchSize int
Enabled bool
}
// LoadTracingConfig 加载追踪配置
func LoadTracingConfig() *TracingConfig {
config := &TracingConfig{
ServiceName: getEnv("SERVICE_NAME", "unknown-service"),
ServiceVersion: getEnv("SERVICE_VERSION", "1.0.0"),
Environment: getEnv("ENVIRONMENT", "development"),
JaegerEndpoint: getEnv("JAEGER_ENDPOINT", "http://localhost:14268/api/traces"),
SamplingRate: getEnvFloat("SAMPLING_RATE", 1.0),
BatchTimeout: getEnvDuration("BATCH_TIMEOUT", "5s"),
MaxBatchSize: getEnvInt("MAX_BATCH_SIZE", 512),
Enabled: getEnvBool("TRACING_ENABLED", true),
}
return config
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
func getEnvFloat(key string, defaultValue float64) float64 {
if value := os.Getenv(key); value != "" {
if f, err := strconv.ParseFloat(value, 64); err == nil {
return f
}
}
return defaultValue
}
func getEnvDuration(key string, defaultValue string) time.Duration {
if value := os.Getenv(key); value != "" {
if d, err := time.ParseDuration(value); err == nil {
return d
}
}
if d, err := time.ParseDuration(defaultValue); err == nil {
return d
}
return time.Second * 5
}
func getEnvInt(key string, defaultValue int) int {
if value := os.Getenv(key); value != "" {
if i, err := strconv.Atoi(value); err == nil {
return i
}
}
return defaultValue
}
func getEnvBool(key string, defaultValue bool) bool {
if value := os.Getenv(key); value != "" {
if b, err := strconv.ParseBool(value); err == nil {
return b
}
}
return defaultValue
}
完整的初始化示例 #
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)
func main() {
// 加载配置
config := LoadTracingConfig()
if !config.Enabled {
log.Println("Tracing disabled")
startServer(nil, nil)
return
}
// 初始化追踪
tp, err := initTracing(config)
if err != nil {
log.Fatalf("Failed to initialize tracing: %v", err)
}
// 初始化指标
metricsService, err := NewMetricsService()
if err != nil {
log.Fatalf("Failed to initialize metrics: %v", err)
}
// 启动服务器
startServer(tp, metricsService)
}
func initTracing(config *TracingConfig) (*trace.TracerProvider, error) {
// 创建导出器
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(
jaeger.WithEndpoint(config.JaegerEndpoint),
))
if err != nil {
return nil, err
}
// 创建资源
res, err := resource.New(context.Background(),
resource.WithAttributes(
semconv.ServiceName(config.ServiceName),
semconv.ServiceVersion(config.ServiceVersion),
semconv.DeploymentEnvironment(config.Environment),
),
)
if err != nil {
return nil, err
}
// 创建采样器
var sampler trace.Sampler
if config.SamplingRate >= 1.0 {
sampler = trace.AlwaysSample()
} else if config.SamplingRate <= 0.0 {
sampler = trace.NeverSample()
} else {
sampler = trace.TraceIDRatioBased(config.SamplingRate)
}
// 创建追踪提供者
tp := trace.NewTracerProvider(
trace.WithBatcher(exp,
trace.WithBatchTimeout(config.BatchTimeout),
trace.WithMaxExportBatchSize(config.MaxBatchSize),
),
trace.WithResource(res),
trace.WithSampler(sampler),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
return tp, nil
}
func startServer(tp *trace.TracerProvider, metrics *MetricsService) {
mux := http.NewServeMux()
// 添加处理器
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"status": "ok"}`))
})
// 应用中间件
var finalHandler http.Handler = handler
if metrics != nil {
finalHandler = metrics.MetricsMiddleware(finalHandler)
}
finalHandler = otelhttp.NewHandler(finalHandler, "health-check")
mux.Handle("/health", finalHandler)
server := &http.Server{
Addr: ":8080",
Handler: mux,
}
// 优雅关闭
go func() {
log.Println("Server starting on :8080")
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalf("Server failed: %v", err)
}
}()
// 等待信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
// 关闭服务器
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Printf("Server shutdown error: %v", err)
}
// 关闭追踪提供者
if tp != nil {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Tracer shutdown error: %v", err)
}
}
log.Println("Server stopped")
}
小结 #
OpenTelemetry 为 Go 应用提供了标准化的可观测性解决方案。通过合理使用自动和手动埋点技术,我们可以构建完整的分布式追踪体系。配合指标收集功能,OpenTelemetry 能够为微服务架构提供全面的可观测性支持。
在下一节中,我们将学习如何使用 Jaeger 来可视化和分析这些追踪数据。