4.6.3 性能指标收集

4.6.3 性能指标收集 #

性能指标收集是监控系统的核心功能,它负责收集、存储和分析各种系统和应用程序的性能数据。本节将详细介绍如何设计和实现一个完整的性能指标收集系统。

指标系统架构 #

指标类型分类 #

性能指标通常分为以下几种类型:

  1. 计数器(Counter):只能增加的累积指标,如请求总数、错误总数
  2. 仪表盘(Gauge):可以任意变化的瞬时值,如 CPU 使用率、内存使用量
  3. 直方图(Histogram):观察值的分布情况,如响应时间分布
  4. 摘要(Summary):类似直方图,但提供分位数统计

指标数据结构 #

package metrics

import (
    "sync"
    "time"
)

// MetricType 指标类型
type MetricType int

const (
    CounterType MetricType = iota
    GaugeType
    HistogramType
    SummaryType
)

// Metric 指标接口
type Metric interface {
    Name() string
    Type() MetricType
    Labels() map[string]string
    Value() interface{}
    Timestamp() time.Time
}

// BaseMetric 基础指标结构
type BaseMetric struct {
    name      string
    labels    map[string]string
    timestamp time.Time
    mutex     sync.RWMutex
}

// NewBaseMetric 创建基础指标
func NewBaseMetric(name string, labels map[string]string) *BaseMetric {
    return &BaseMetric{
        name:      name,
        labels:    labels,
        timestamp: time.Now(),
    }
}

// Name 返回指标名称
func (m *BaseMetric) Name() string {
    return m.name
}

// Labels 返回标签
func (m *BaseMetric) Labels() map[string]string {
    m.mutex.RLock()
    defer m.mutex.RUnlock()

    result := make(map[string]string)
    for k, v := range m.labels {
        result[k] = v
    }
    return result
}

// Timestamp 返回时间戳
func (m *BaseMetric) Timestamp() time.Time {
    m.mutex.RLock()
    defer m.mutex.RUnlock()
    return m.timestamp
}

// updateTimestamp 更新时间戳
func (m *BaseMetric) updateTimestamp() {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    m.timestamp = time.Now()
}

计数器实现 #

基础计数器 #

import (
    "sync/atomic"
)

// Counter 计数器
type Counter struct {
    *BaseMetric
    value int64
}

// NewCounter 创建计数器
func NewCounter(name string, labels map[string]string) *Counter {
    return &Counter{
        BaseMetric: NewBaseMetric(name, labels),
        value:      0,
    }
}

// Type 返回指标类型
func (c *Counter) Type() MetricType {
    return CounterType
}

// Value 返回当前值
func (c *Counter) Value() interface{} {
    return atomic.LoadInt64(&c.value)
}

// Inc 增加计数
func (c *Counter) Inc() {
    c.Add(1)
}

// Add 增加指定值
func (c *Counter) Add(delta int64) {
    if delta < 0 {
        panic("counter cannot decrease")
    }
    atomic.AddInt64(&c.value, delta)
    c.updateTimestamp()
}

// Reset 重置计数器
func (c *Counter) Reset() {
    atomic.StoreInt64(&c.value, 0)
    c.updateTimestamp()
}

// Get 获取当前值
func (c *Counter) Get() int64 {
    return atomic.LoadInt64(&c.value)
}

速率计数器 #

import (
    "time"
)

// RateCounter 速率计数器
type RateCounter struct {
    *Counter
    lastValue int64
    lastTime  time.Time
    rate      float64
    mutex     sync.RWMutex
}

// NewRateCounter 创建速率计数器
func NewRateCounter(name string, labels map[string]string) *RateCounter {
    return &RateCounter{
        Counter:   NewCounter(name, labels),
        lastTime:  time.Now(),
        lastValue: 0,
        rate:      0,
    }
}

// Rate 获取速率(每秒)
func (rc *RateCounter) Rate() float64 {
    rc.mutex.RLock()
    defer rc.mutex.RUnlock()
    return rc.rate
}

// UpdateRate 更新速率
func (rc *RateCounter) UpdateRate() {
    rc.mutex.Lock()
    defer rc.mutex.Unlock()

    now := time.Now()
    currentValue := rc.Get()

    if !rc.lastTime.IsZero() {
        duration := now.Sub(rc.lastTime).Seconds()
        if duration > 0 {
            rc.rate = float64(currentValue-rc.lastValue) / duration
        }
    }

    rc.lastTime = now
    rc.lastValue = currentValue
}

仪表盘实现 #

基础仪表盘 #

import (
    "math"
    "sync/atomic"
    "unsafe"
)

// Gauge 仪表盘
type Gauge struct {
    *BaseMetric
    value uint64 // 使用 uint64 存储 float64 的位表示
}

// NewGauge 创建仪表盘
func NewGauge(name string, labels map[string]string) *Gauge {
    return &Gauge{
        BaseMetric: NewBaseMetric(name, labels),
        value:      0,
    }
}

// Type 返回指标类型
func (g *Gauge) Type() MetricType {
    return GaugeType
}

// Value 返回当前值
func (g *Gauge) Value() interface{} {
    return g.Get()
}

// Set 设置值
func (g *Gauge) Set(value float64) {
    atomic.StoreUint64(&g.value, math.Float64bits(value))
    g.updateTimestamp()
}

// Get 获取当前值
func (g *Gauge) Get() float64 {
    return math.Float64frombits(atomic.LoadUint64(&g.value))
}

// Inc 增加 1
func (g *Gauge) Inc() {
    g.Add(1)
}

// Dec 减少 1
func (g *Gauge) Dec() {
    g.Add(-1)
}

// Add 增加指定值
func (g *Gauge) Add(delta float64) {
    for {
        oldBits := atomic.LoadUint64(&g.value)
        oldValue := math.Float64frombits(oldBits)
        newValue := oldValue + delta
        newBits := math.Float64bits(newValue)

        if atomic.CompareAndSwapUint64(&g.value, oldBits, newBits) {
            g.updateTimestamp()
            break
        }
    }
}

// Sub 减少指定值
func (g *Gauge) Sub(delta float64) {
    g.Add(-delta)
}

统计仪表盘 #

// StatGauge 统计仪表盘
type StatGauge struct {
    *Gauge
    count int64
    sum   uint64 // float64 的位表示
    min   uint64 // float64 的位表示
    max   uint64 // float64 的位表示
}

// NewStatGauge 创建统计仪表盘
func NewStatGauge(name string, labels map[string]string) *StatGauge {
    return &StatGauge{
        Gauge: NewGauge(name, labels),
        count: 0,
        sum:   math.Float64bits(0),
        min:   math.Float64bits(math.Inf(1)),
        max:   math.Float64bits(math.Inf(-1)),
    }
}

// Update 更新统计信息
func (sg *StatGauge) Update(value float64) {
    sg.Set(value)

    // 更新统计信息
    atomic.AddInt64(&sg.count, 1)

    // 更新总和
    for {
        oldBits := atomic.LoadUint64(&sg.sum)
        oldSum := math.Float64frombits(oldBits)
        newSum := oldSum + value
        newBits := math.Float64bits(newSum)

        if atomic.CompareAndSwapUint64(&sg.sum, oldBits, newBits) {
            break
        }
    }

    // 更新最小值
    for {
        oldBits := atomic.LoadUint64(&sg.min)
        oldMin := math.Float64frombits(oldBits)
        if value >= oldMin {
            break
        }
        newBits := math.Float64bits(value)
        if atomic.CompareAndSwapUint64(&sg.min, oldBits, newBits) {
            break
        }
    }

    // 更新最大值
    for {
        oldBits := atomic.LoadUint64(&sg.max)
        oldMax := math.Float64frombits(oldBits)
        if value <= oldMax {
            break
        }
        newBits := math.Float64bits(value)
        if atomic.CompareAndSwapUint64(&sg.max, oldBits, newBits) {
            break
        }
    }
}

// Count 获取样本数量
func (sg *StatGauge) Count() int64 {
    return atomic.LoadInt64(&sg.count)
}

// Sum 获取总和
func (sg *StatGauge) Sum() float64 {
    return math.Float64frombits(atomic.LoadUint64(&sg.sum))
}

// Min 获取最小值
func (sg *StatGauge) Min() float64 {
    return math.Float64frombits(atomic.LoadUint64(&sg.min))
}

// Max 获取最大值
func (sg *StatGauge) Max() float64 {
    return math.Float64frombits(atomic.LoadUint64(&sg.max))
}

// Avg 获取平均值
func (sg *StatGauge) Avg() float64 {
    count := sg.Count()
    if count == 0 {
        return 0
    }
    return sg.Sum() / float64(count)
}

直方图实现 #

基础直方图 #

import (
    "sort"
)

// Histogram 直方图
type Histogram struct {
    *BaseMetric
    buckets []float64
    counts  []int64
    sum     uint64 // float64 的位表示
    count   int64
    mutex   sync.RWMutex
}

// NewHistogram 创建直方图
func NewHistogram(name string, labels map[string]string, buckets []float64) *Histogram {
    // 确保桶是排序的
    sortedBuckets := make([]float64, len(buckets))
    copy(sortedBuckets, buckets)
    sort.Float64s(sortedBuckets)

    return &Histogram{
        BaseMetric: NewBaseMetric(name, labels),
        buckets:    sortedBuckets,
        counts:     make([]int64, len(sortedBuckets)),
        sum:        math.Float64bits(0),
        count:      0,
    }
}

// Type 返回指标类型
func (h *Histogram) Type() MetricType {
    return HistogramType
}

// Value 返回直方图数据
func (h *Histogram) Value() interface{} {
    h.mutex.RLock()
    defer h.mutex.RUnlock()

    buckets := make(map[string]int64)
    for i, bucket := range h.buckets {
        buckets[fmt.Sprintf("%.2f", bucket)] = atomic.LoadInt64(&h.counts[i])
    }

    return map[string]interface{}{
        "buckets": buckets,
        "count":   atomic.LoadInt64(&h.count),
        "sum":     math.Float64frombits(atomic.LoadUint64(&h.sum)),
    }
}

// Observe 观察一个值
func (h *Histogram) Observe(value float64) {
    // 更新总数和总和
    atomic.AddInt64(&h.count, 1)

    for {
        oldBits := atomic.LoadUint64(&h.sum)
        oldSum := math.Float64frombits(oldBits)
        newSum := oldSum + value
        newBits := math.Float64bits(newSum)

        if atomic.CompareAndSwapUint64(&h.sum, oldBits, newBits) {
            break
        }
    }

    // 更新桶计数
    h.mutex.RLock()
    for i, bucket := range h.buckets {
        if value <= bucket {
            atomic.AddInt64(&h.counts[i], 1)
        }
    }
    h.mutex.RUnlock()

    h.updateTimestamp()
}

// Count 获取观察总数
func (h *Histogram) Count() int64 {
    return atomic.LoadInt64(&h.count)
}

// Sum 获取观察值总和
func (h *Histogram) Sum() float64 {
    return math.Float64frombits(atomic.LoadUint64(&h.sum))
}

// Buckets 获取桶信息
func (h *Histogram) Buckets() map[float64]int64 {
    h.mutex.RLock()
    defer h.mutex.RUnlock()

    result := make(map[float64]int64)
    for i, bucket := range h.buckets {
        result[bucket] = atomic.LoadInt64(&h.counts[i])
    }
    return result
}

// Quantile 计算分位数
func (h *Histogram) Quantile(q float64) float64 {
    if q < 0 || q > 1 {
        return math.NaN()
    }

    count := h.Count()
    if count == 0 {
        return 0
    }

    target := float64(count) * q
    h.mutex.RLock()
    defer h.mutex.RUnlock()

    cumulative := int64(0)
    for i, bucket := range h.buckets {
        cumulative += atomic.LoadInt64(&h.counts[i])
        if float64(cumulative) >= target {
            return bucket
        }
    }

    return h.buckets[len(h.buckets)-1]
}

摘要实现 #

时间窗口摘要 #

import (
    "container/ring"
    "time"
)

// Summary 摘要
type Summary struct {
    *BaseMetric
    objectives map[float64]float64 // 分位数 -> 误差
    samples    *ring.Ring
    maxAge     time.Duration
    ageBuckets int
    mutex      sync.RWMutex
}

// Sample 样本
type Sample struct {
    Value     float64
    Timestamp time.Time
}

// NewSummary 创建摘要
func NewSummary(name string, labels map[string]string, objectives map[float64]float64, maxAge time.Duration, ageBuckets int) *Summary {
    return &Summary{
        BaseMetric: NewBaseMetric(name, labels),
        objectives: objectives,
        samples:    ring.New(1000), // 初始容量
        maxAge:     maxAge,
        ageBuckets: ageBuckets,
    }
}

// Type 返回指标类型
func (s *Summary) Type() MetricType {
    return SummaryType
}

// Value 返回摘要数据
func (s *Summary) Value() interface{} {
    s.mutex.RLock()
    defer s.mutex.RUnlock()

    samples := s.getValidSamples()
    if len(samples) == 0 {
        return map[string]interface{}{
            "count":      0,
            "sum":        0,
            "quantiles":  make(map[string]float64),
        }
    }

    // 排序样本
    sort.Slice(samples, func(i, j int) bool {
        return samples[i].Value < samples[j].Value
    })

    quantiles := make(map[string]float64)
    for q := range s.objectives {
        quantiles[fmt.Sprintf("%.2f", q)] = s.calculateQuantile(samples, q)
    }

    sum := 0.0
    for _, sample := range samples {
        sum += sample.Value
    }

    return map[string]interface{}{
        "count":     len(samples),
        "sum":       sum,
        "quantiles": quantiles,
    }
}

// Observe 观察一个值
func (s *Summary) Observe(value float64) {
    s.mutex.Lock()
    defer s.mutex.Unlock()

    sample := &Sample{
        Value:     value,
        Timestamp: time.Now(),
    }

    s.samples.Value = sample
    s.samples = s.samples.Next()

    s.updateTimestamp()
}

// getValidSamples 获取有效样本
func (s *Summary) getValidSamples() []*Sample {
    cutoff := time.Now().Add(-s.maxAge)
    var samples []*Sample

    s.samples.Do(func(v interface{}) {
        if v != nil {
            if sample, ok := v.(*Sample); ok && sample.Timestamp.After(cutoff) {
                samples = append(samples, sample)
            }
        }
    })

    return samples
}

// calculateQuantile 计算分位数
func (s *Summary) calculateQuantile(samples []*Sample, q float64) float64 {
    if len(samples) == 0 {
        return 0
    }

    index := q * float64(len(samples)-1)
    lower := int(index)
    upper := lower + 1

    if upper >= len(samples) {
        return samples[len(samples)-1].Value
    }

    if lower == upper {
        return samples[lower].Value
    }

    // 线性插值
    weight := index - float64(lower)
    return samples[lower].Value*(1-weight) + samples[upper].Value*weight
}

指标注册表 #

注册表实现 #

// Registry 指标注册表
type Registry struct {
    metrics map[string]Metric
    mutex   sync.RWMutex
}

// NewRegistry 创建注册表
func NewRegistry() *Registry {
    return &Registry{
        metrics: make(map[string]Metric),
    }
}

// Register 注册指标
func (r *Registry) Register(metric Metric) error {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    key := r.buildKey(metric.Name(), metric.Labels())
    if _, exists := r.metrics[key]; exists {
        return fmt.Errorf("metric already registered: %s", key)
    }

    r.metrics[key] = metric
    return nil
}

// Unregister 注销指标
func (r *Registry) Unregister(name string, labels map[string]string) {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    key := r.buildKey(name, labels)
    delete(r.metrics, key)
}

// Get 获取指标
func (r *Registry) Get(name string, labels map[string]string) (Metric, bool) {
    r.mutex.RLock()
    defer r.mutex.RUnlock()

    key := r.buildKey(name, labels)
    metric, exists := r.metrics[key]
    return metric, exists
}

// GetOrCreate 获取或创建指标
func (r *Registry) GetOrCreate(name string, labels map[string]string, metricType MetricType) Metric {
    if metric, exists := r.Get(name, labels); exists {
        return metric
    }

    r.mutex.Lock()
    defer r.mutex.Unlock()

    // 双重检查
    key := r.buildKey(name, labels)
    if metric, exists := r.metrics[key]; exists {
        return metric
    }

    var metric Metric
    switch metricType {
    case CounterType:
        metric = NewCounter(name, labels)
    case GaugeType:
        metric = NewGauge(name, labels)
    case HistogramType:
        // 使用默认桶
        buckets := []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}
        metric = NewHistogram(name, labels, buckets)
    case SummaryType:
        objectives := map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}
        metric = NewSummary(name, labels, objectives, 10*time.Minute, 5)
    default:
        panic(fmt.Sprintf("unknown metric type: %v", metricType))
    }

    r.metrics[key] = metric
    return metric
}

// All 获取所有指标
func (r *Registry) All() []Metric {
    r.mutex.RLock()
    defer r.mutex.RUnlock()

    metrics := make([]Metric, 0, len(r.metrics))
    for _, metric := range r.metrics {
        metrics = append(metrics, metric)
    }
    return metrics
}

// buildKey 构建指标键
func (r *Registry) buildKey(name string, labels map[string]string) string {
    if len(labels) == 0 {
        return name
    }

    keys := make([]string, 0, len(labels))
    for k := range labels {
        keys = append(keys, k)
    }
    sort.Strings(keys)

    var parts []string
    parts = append(parts, name)
    for _, k := range keys {
        parts = append(parts, fmt.Sprintf("%s=%s", k, labels[k]))
    }

    return strings.Join(parts, ",")
}

全局注册表 #

var (
    defaultRegistry *Registry
    registryOnce    sync.Once
)

// DefaultRegistry 获取默认注册表
func DefaultRegistry() *Registry {
    registryOnce.Do(func() {
        defaultRegistry = NewRegistry()
    })
    return defaultRegistry
}

// 便利函数
func RegisterCounter(name string, labels map[string]string) *Counter {
    counter := NewCounter(name, labels)
    DefaultRegistry().Register(counter)
    return counter
}

func RegisterGauge(name string, labels map[string]string) *Gauge {
    gauge := NewGauge(name, labels)
    DefaultRegistry().Register(gauge)
    return gauge
}

func RegisterHistogram(name string, labels map[string]string, buckets []float64) *Histogram {
    histogram := NewHistogram(name, labels, buckets)
    DefaultRegistry().Register(histogram)
    return histogram
}

func GetCounter(name string, labels map[string]string) *Counter {
    metric := DefaultRegistry().GetOrCreate(name, labels, CounterType)
    return metric.(*Counter)
}

func GetGauge(name string, labels map[string]string) *Gauge {
    metric := DefaultRegistry().GetOrCreate(name, labels, GaugeType)
    return metric.(*Gauge)
}

指标收集器 #

系统指标收集器 #

// SystemMetricsCollector 系统指标收集器
type SystemMetricsCollector struct {
    registry *Registry
    interval time.Duration

    // 系统指标
    cpuUsage    *Gauge
    memUsage    *Gauge
    diskUsage   *Gauge
    networkRx   *Counter
    networkTx   *Counter

    // 监控组件
    cpuMonitor     *CPUMonitor
    memoryMonitor  *MemoryMonitor
    diskMonitor    *DiskMonitor
    networkMonitor *NetworkMonitor
}

// NewSystemMetricsCollector 创建系统指标收集器
func NewSystemMetricsCollector(registry *Registry, interval time.Duration) *SystemMetricsCollector {
    collector := &SystemMetricsCollector{
        registry: registry,
        interval: interval,

        cpuUsage:  NewGauge("system_cpu_usage_percent", nil),
        memUsage:  NewGauge("system_memory_usage_percent", nil),
        diskUsage: NewGauge("system_disk_usage_percent", map[string]string{"path": "/"}),
        networkRx: NewCounter("system_network_rx_bytes_total", map[string]string{"interface": "eth0"}),
        networkTx: NewCounter("system_network_tx_bytes_total", map[string]string{"interface": "eth0"}),

        cpuMonitor:     NewCPUMonitor(interval),
        memoryMonitor:  NewMemoryMonitor(),
        diskMonitor:    NewDiskMonitor(),
        networkMonitor: NewNetworkMonitor(),
    }

    // 注册指标
    registry.Register(collector.cpuUsage)
    registry.Register(collector.memUsage)
    registry.Register(collector.diskUsage)
    registry.Register(collector.networkRx)
    registry.Register(collector.networkTx)

    return collector
}

// Start 开始收集
func (c *SystemMetricsCollector) Start(ctx context.Context) error {
    ticker := time.NewTicker(c.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            c.collect()
        }
    }
}

// collect 收集指标
func (c *SystemMetricsCollector) collect() {
    // 收集 CPU 使用率
    if cpuUsage, err := c.cpuMonitor.GetCPUUsage(); err == nil {
        c.cpuUsage.Set(cpuUsage)
    }

    // 收集内存使用率
    if memUsage, err := c.memoryMonitor.GetMemoryUsage(); err == nil {
        c.memUsage.Set(memUsage)
    }

    // 收集磁盘使用率
    if diskInfo, err := c.diskMonitor.GetDiskUsage("/"); err == nil {
        c.diskUsage.Set(diskInfo.UsageRate)
    }

    // 收集网络流量
    if networkStats, err := c.networkMonitor.getNetworkStats(); err == nil {
        if eth0, exists := networkStats["eth0"]; exists {
            c.networkRx.Add(int64(eth0.RxBytes))
            c.networkTx.Add(int64(eth0.TxBytes))
        }
    }
}

应用指标收集器 #

// AppMetricsCollector 应用指标收集器
type AppMetricsCollector struct {
    registry *Registry

    // HTTP 指标
    httpRequests     *Counter
    httpDuration     *Histogram
    httpErrors       *Counter
    activeConnections *Gauge

    // 业务指标
    userLogins    *Counter
    orderCreated  *Counter
    paymentAmount *Histogram
}

// NewAppMetricsCollector 创建应用指标收集器
func NewAppMetricsCollector(registry *Registry) *AppMetricsCollector {
    collector := &AppMetricsCollector{
        registry: registry,

        httpRequests: NewCounter("http_requests_total",
            map[string]string{"method": "", "status": ""}),
        httpDuration: NewHistogram("http_request_duration_seconds",
            map[string]string{"method": "", "endpoint": ""},
            []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}),
        httpErrors: NewCounter("http_errors_total",
            map[string]string{"method": "", "status": ""}),
        activeConnections: NewGauge("http_active_connections", nil),

        userLogins:    NewCounter("user_logins_total", nil),
        orderCreated:  NewCounter("orders_created_total", map[string]string{"status": ""}),
        paymentAmount: NewHistogram("payment_amount_dollars", nil,
            []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000}),
    }

    // 注册指标
    registry.Register(collector.httpRequests)
    registry.Register(collector.httpDuration)
    registry.Register(collector.httpErrors)
    registry.Register(collector.activeConnections)
    registry.Register(collector.userLogins)
    registry.Register(collector.orderCreated)
    registry.Register(collector.paymentAmount)

    return collector
}

// RecordHTTPRequest 记录 HTTP 请求
func (c *AppMetricsCollector) RecordHTTPRequest(method, status string, duration time.Duration) {
    // 更新请求计数
    requestCounter := c.registry.GetOrCreate("http_requests_total",
        map[string]string{"method": method, "status": status}, CounterType).(*Counter)
    requestCounter.Inc()

    // 记录请求时长
    durationHist := c.registry.GetOrCreate("http_request_duration_seconds",
        map[string]string{"method": method}, HistogramType).(*Histogram)
    durationHist.Observe(duration.Seconds())

    // 记录错误
    if status[0] == '4' || status[0] == '5' {
        errorCounter := c.registry.GetOrCreate("http_errors_total",
            map[string]string{"method": method, "status": status}, CounterType).(*Counter)
        errorCounter.Inc()
    }
}

// RecordUserLogin 记录用户登录
func (c *AppMetricsCollector) RecordUserLogin() {
    c.userLogins.Inc()
}

// RecordOrderCreated 记录订单创建
func (c *AppMetricsCollector) RecordOrderCreated(status string) {
    orderCounter := c.registry.GetOrCreate("orders_created_total",
        map[string]string{"status": status}, CounterType).(*Counter)
    orderCounter.Inc()
}

// RecordPayment 记录支付
func (c *AppMetricsCollector) RecordPayment(amount float64) {
    c.paymentAmount.Observe(amount)
}

指标导出器 #

Prometheus 格式导出器 #

import (
    "fmt"
    "io"
    "sort"
    "strings"
)

// PrometheusExporter Prometheus 格式导出器
type PrometheusExporter struct {
    registry *Registry
}

// NewPrometheusExporter 创建 Prometheus 导出器
func NewPrometheusExporter(registry *Registry) *PrometheusExporter {
    return &PrometheusExporter{
        registry: registry,
    }
}

// Export 导出指标
func (e *PrometheusExporter) Export(writer io.Writer) error {
    metrics := e.registry.All()

    for _, metric := range metrics {
        if err := e.writeMetric(writer, metric); err != nil {
            return err
        }
    }

    return nil
}

// writeMetric 写入单个指标
func (e *PrometheusExporter) writeMetric(writer io.Writer, metric Metric) error {
    name := e.sanitizeName(metric.Name())
    labels := metric.Labels()

    switch metric.Type() {
    case CounterType:
        return e.writeCounter(writer, name, labels, metric)
    case GaugeType:
        return e.writeGauge(writer, name, labels, metric)
    case HistogramType:
        return e.writeHistogram(writer, name, labels, metric)
    case SummaryType:
        return e.writeSummary(writer, name, labels, metric)
    default:
        return fmt.Errorf("unknown metric type: %v", metric.Type())
    }
}

// writeCounter 写入计数器
func (e *PrometheusExporter) writeCounter(writer io.Writer, name string, labels map[string]string, metric Metric) error {
    labelStr := e.formatLabels(labels)
    value := metric.Value()

    _, err := fmt.Fprintf(writer, "# TYPE %s counter\n%s%s %v\n",
        name, name, labelStr, value)
    return err
}

// writeGauge 写入仪表盘
func (e *PrometheusExporter) writeGauge(writer io.Writer, name string, labels map[string]string, metric Metric) error {
    labelStr := e.formatLabels(labels)
    value := metric.Value()

    _, err := fmt.Fprintf(writer, "# TYPE %s gauge\n%s%s %v\n",
        name, name, labelStr, value)
    return err
}

// writeHistogram 写入直方图
func (e *PrometheusExporter) writeHistogram(writer io.Writer, name string, labels map[string]string, metric Metric) error {
    histogram := metric.(*Histogram)

    // 写入类型声明
    fmt.Fprintf(writer, "# TYPE %s histogram\n", name)

    // 写入桶
    buckets := histogram.Buckets()
    bucketKeys := make([]float64, 0, len(buckets))
    for bucket := range buckets {
        bucketKeys = append(bucketKeys, bucket)
    }
    sort.Float64s(bucketKeys)

    for _, bucket := range bucketKeys {
        count := buckets[bucket]
        bucketLabels := make(map[string]string)
        for k, v := range labels {
            bucketLabels[k] = v
        }
        bucketLabels["le"] = fmt.Sprintf("%g", bucket)

        labelStr := e.formatLabels(bucketLabels)
        fmt.Fprintf(writer, "%s_bucket%s %d\n", name, labelStr, count)
    }

    // 写入总数和总和
    labelStr := e.formatLabels(labels)
    fmt.Fprintf(writer, "%s_count%s %d\n", name, labelStr, histogram.Count())
    fmt.Fprintf(writer, "%s_sum%s %f\n", name, labelStr, histogram.Sum())

    return nil
}

// writeSummary 写入摘要
func (e *PrometheusExporter) writeSummary(writer io.Writer, name string, labels map[string]string, metric Metric) error {
    summary := metric.(*Summary)
    value := summary.Value().(map[string]interface{})

    // 写入类型声明
    fmt.Fprintf(writer, "# TYPE %s summary\n", name)

    // 写入分位数
    quantiles := value["quantiles"].(map[string]float64)
    for q, v := range quantiles {
        quantileLabels := make(map[string]string)
        for k, v := range labels {
            quantileLabels[k] = v
        }
        quantileLabels["quantile"] = q

        labelStr := e.formatLabels(quantileLabels)
        fmt.Fprintf(writer, "%s%s %f\n", name, labelStr, v)
    }

    // 写入总数和总和
    labelStr := e.formatLabels(labels)
    fmt.Fprintf(writer, "%s_count%s %v\n", name, labelStr, value["count"])
    fmt.Fprintf(writer, "%s_sum%s %v\n", name, labelStr, value["sum"])

    return nil
}

// formatLabels 格式化标签
func (e *PrometheusExporter) formatLabels(labels map[string]string) string {
    if len(labels) == 0 {
        return ""
    }

    keys := make([]string, 0, len(labels))
    for k := range labels {
        keys = append(keys, k)
    }
    sort.Strings(keys)

    var parts []string
    for _, k := range keys {
        parts = append(parts, fmt.Sprintf(`%s="%s"`, k, labels[k]))
    }

    return "{" + strings.Join(parts, ",") + "}"
}

// sanitizeName 清理指标名称
func (e *PrometheusExporter) sanitizeName(name string) string {
    // 替换非法字符
    result := strings.ReplaceAll(name, "-", "_")
    result = strings.ReplaceAll(result, ".", "_")
    result = strings.ReplaceAll(result, " ", "_")
    return result
}

使用示例 #

基本使用 #

package main

import (
    "context"
    "fmt"
    "net/http"
    "os"
    "time"
)

func main() {
    // 创建注册表
    registry := NewRegistry()

    // 创建系统指标收集器
    systemCollector := NewSystemMetricsCollector(registry, 5*time.Second)

    // 创建应用指标收集器
    appCollector := NewAppMetricsCollector(registry)

    // 启动系统指标收集
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        if err := systemCollector.Start(ctx); err != nil {
            fmt.Printf("System collector error: %v\n", err)
        }
    }()

    // 模拟应用指标
    go func() {
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
                // 模拟 HTTP 请求
                appCollector.RecordHTTPRequest("GET", "200",
                    time.Duration(50+time.Now().UnixNano()%100)*time.Millisecond)

                // 模拟用户登录
                if time.Now().Second()%10 == 0 {
                    appCollector.RecordUserLogin()
                }

                // 模拟订单创建
                if time.Now().Second()%15 == 0 {
                    appCollector.RecordOrderCreated("success")
                }

                // 模拟支付
                if time.Now().Second()%20 == 0 {
                    appCollector.RecordPayment(float64(10 + time.Now().UnixNano()%100))
                }
            }
        }
    }()

    // 创建 HTTP 服务器提供指标
    http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        exporter := NewPrometheusExporter(registry)
        w.Header().Set("Content-Type", "text/plain")
        exporter.Export(w)
    })

    fmt.Println("Metrics server starting on :8080")
    fmt.Println("Visit http://localhost:8080/metrics to see metrics")

    if err := http.ListenAndServe(":8080", nil); err != nil {
        fmt.Printf("Server error: %v\n", err)
        os.Exit(1)
    }
}

中间件集成 #

// MetricsMiddleware HTTP 指标中间件
func MetricsMiddleware(collector *AppMetricsCollector) func(http.Handler) http.Handler {
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            start := time.Now()

            // 增加活跃连接数
            collector.activeConnections.Inc()
            defer collector.activeConnections.Dec()

            // 包装 ResponseWriter 以捕获状态码
            wrapped := &responseWriter{ResponseWriter: w, statusCode: 200}

            // 处理请求
            next.ServeHTTP(wrapped, r)

            // 记录指标
            duration := time.Since(start)
            status := fmt.Sprintf("%d", wrapped.statusCode)
            collector.RecordHTTPRequest(r.Method, status, duration)
        })
    }
}

type responseWriter struct {
    http.ResponseWriter
    statusCode int
}

func (rw *responseWriter) WriteHeader(code int) {
    rw.statusCode = code
    rw.ResponseWriter.WriteHeader(code)
}

小结 #

本节详细介绍了性能指标收集系统的设计和实现,包括:

  1. 指标类型:计数器、仪表盘、直方图和摘要的实现
  2. 注册表:指标的注册和管理机制
  3. 收集器:系统和应用指标的自动收集
  4. 导出器:Prometheus 格式的指标导出
  5. 中间件:HTTP 请求指标的自动收集

通过这个指标收集系统,我们可以全面监控应用程序和系统的性能状况,为性能优化和问题诊断提供数据支持。