4.6.3 性能指标收集 #
性能指标收集是监控系统的核心功能,它负责收集、存储和分析各种系统和应用程序的性能数据。本节将详细介绍如何设计和实现一个完整的性能指标收集系统。
指标系统架构 #
指标类型分类 #
性能指标通常分为以下几种类型:
- 计数器(Counter):只能增加的累积指标,如请求总数、错误总数
- 仪表盘(Gauge):可以任意变化的瞬时值,如 CPU 使用率、内存使用量
- 直方图(Histogram):观察值的分布情况,如响应时间分布
- 摘要(Summary):类似直方图,但提供分位数统计
指标数据结构 #
package metrics
import (
"sync"
"time"
)
// MetricType 指标类型
type MetricType int
const (
CounterType MetricType = iota
GaugeType
HistogramType
SummaryType
)
// Metric 指标接口
type Metric interface {
Name() string
Type() MetricType
Labels() map[string]string
Value() interface{}
Timestamp() time.Time
}
// BaseMetric 基础指标结构
type BaseMetric struct {
name string
labels map[string]string
timestamp time.Time
mutex sync.RWMutex
}
// NewBaseMetric 创建基础指标
func NewBaseMetric(name string, labels map[string]string) *BaseMetric {
return &BaseMetric{
name: name,
labels: labels,
timestamp: time.Now(),
}
}
// Name 返回指标名称
func (m *BaseMetric) Name() string {
return m.name
}
// Labels 返回标签
func (m *BaseMetric) Labels() map[string]string {
m.mutex.RLock()
defer m.mutex.RUnlock()
result := make(map[string]string)
for k, v := range m.labels {
result[k] = v
}
return result
}
// Timestamp 返回时间戳
func (m *BaseMetric) Timestamp() time.Time {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.timestamp
}
// updateTimestamp 更新时间戳
func (m *BaseMetric) updateTimestamp() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.timestamp = time.Now()
}
计数器实现 #
基础计数器 #
import (
"sync/atomic"
)
// Counter 计数器
type Counter struct {
*BaseMetric
value int64
}
// NewCounter 创建计数器
func NewCounter(name string, labels map[string]string) *Counter {
return &Counter{
BaseMetric: NewBaseMetric(name, labels),
value: 0,
}
}
// Type 返回指标类型
func (c *Counter) Type() MetricType {
return CounterType
}
// Value 返回当前值
func (c *Counter) Value() interface{} {
return atomic.LoadInt64(&c.value)
}
// Inc 增加计数
func (c *Counter) Inc() {
c.Add(1)
}
// Add 增加指定值
func (c *Counter) Add(delta int64) {
if delta < 0 {
panic("counter cannot decrease")
}
atomic.AddInt64(&c.value, delta)
c.updateTimestamp()
}
// Reset 重置计数器
func (c *Counter) Reset() {
atomic.StoreInt64(&c.value, 0)
c.updateTimestamp()
}
// Get 获取当前值
func (c *Counter) Get() int64 {
return atomic.LoadInt64(&c.value)
}
速率计数器 #
import (
"time"
)
// RateCounter 速率计数器
type RateCounter struct {
*Counter
lastValue int64
lastTime time.Time
rate float64
mutex sync.RWMutex
}
// NewRateCounter 创建速率计数器
func NewRateCounter(name string, labels map[string]string) *RateCounter {
return &RateCounter{
Counter: NewCounter(name, labels),
lastTime: time.Now(),
lastValue: 0,
rate: 0,
}
}
// Rate 获取速率(每秒)
func (rc *RateCounter) Rate() float64 {
rc.mutex.RLock()
defer rc.mutex.RUnlock()
return rc.rate
}
// UpdateRate 更新速率
func (rc *RateCounter) UpdateRate() {
rc.mutex.Lock()
defer rc.mutex.Unlock()
now := time.Now()
currentValue := rc.Get()
if !rc.lastTime.IsZero() {
duration := now.Sub(rc.lastTime).Seconds()
if duration > 0 {
rc.rate = float64(currentValue-rc.lastValue) / duration
}
}
rc.lastTime = now
rc.lastValue = currentValue
}
仪表盘实现 #
基础仪表盘 #
import (
"math"
"sync/atomic"
"unsafe"
)
// Gauge 仪表盘
type Gauge struct {
*BaseMetric
value uint64 // 使用 uint64 存储 float64 的位表示
}
// NewGauge 创建仪表盘
func NewGauge(name string, labels map[string]string) *Gauge {
return &Gauge{
BaseMetric: NewBaseMetric(name, labels),
value: 0,
}
}
// Type 返回指标类型
func (g *Gauge) Type() MetricType {
return GaugeType
}
// Value 返回当前值
func (g *Gauge) Value() interface{} {
return g.Get()
}
// Set 设置值
func (g *Gauge) Set(value float64) {
atomic.StoreUint64(&g.value, math.Float64bits(value))
g.updateTimestamp()
}
// Get 获取当前值
func (g *Gauge) Get() float64 {
return math.Float64frombits(atomic.LoadUint64(&g.value))
}
// Inc 增加 1
func (g *Gauge) Inc() {
g.Add(1)
}
// Dec 减少 1
func (g *Gauge) Dec() {
g.Add(-1)
}
// Add 增加指定值
func (g *Gauge) Add(delta float64) {
for {
oldBits := atomic.LoadUint64(&g.value)
oldValue := math.Float64frombits(oldBits)
newValue := oldValue + delta
newBits := math.Float64bits(newValue)
if atomic.CompareAndSwapUint64(&g.value, oldBits, newBits) {
g.updateTimestamp()
break
}
}
}
// Sub 减少指定值
func (g *Gauge) Sub(delta float64) {
g.Add(-delta)
}
统计仪表盘 #
// StatGauge 统计仪表盘
type StatGauge struct {
*Gauge
count int64
sum uint64 // float64 的位表示
min uint64 // float64 的位表示
max uint64 // float64 的位表示
}
// NewStatGauge 创建统计仪表盘
func NewStatGauge(name string, labels map[string]string) *StatGauge {
return &StatGauge{
Gauge: NewGauge(name, labels),
count: 0,
sum: math.Float64bits(0),
min: math.Float64bits(math.Inf(1)),
max: math.Float64bits(math.Inf(-1)),
}
}
// Update 更新统计信息
func (sg *StatGauge) Update(value float64) {
sg.Set(value)
// 更新统计信息
atomic.AddInt64(&sg.count, 1)
// 更新总和
for {
oldBits := atomic.LoadUint64(&sg.sum)
oldSum := math.Float64frombits(oldBits)
newSum := oldSum + value
newBits := math.Float64bits(newSum)
if atomic.CompareAndSwapUint64(&sg.sum, oldBits, newBits) {
break
}
}
// 更新最小值
for {
oldBits := atomic.LoadUint64(&sg.min)
oldMin := math.Float64frombits(oldBits)
if value >= oldMin {
break
}
newBits := math.Float64bits(value)
if atomic.CompareAndSwapUint64(&sg.min, oldBits, newBits) {
break
}
}
// 更新最大值
for {
oldBits := atomic.LoadUint64(&sg.max)
oldMax := math.Float64frombits(oldBits)
if value <= oldMax {
break
}
newBits := math.Float64bits(value)
if atomic.CompareAndSwapUint64(&sg.max, oldBits, newBits) {
break
}
}
}
// Count 获取样本数量
func (sg *StatGauge) Count() int64 {
return atomic.LoadInt64(&sg.count)
}
// Sum 获取总和
func (sg *StatGauge) Sum() float64 {
return math.Float64frombits(atomic.LoadUint64(&sg.sum))
}
// Min 获取最小值
func (sg *StatGauge) Min() float64 {
return math.Float64frombits(atomic.LoadUint64(&sg.min))
}
// Max 获取最大值
func (sg *StatGauge) Max() float64 {
return math.Float64frombits(atomic.LoadUint64(&sg.max))
}
// Avg 获取平均值
func (sg *StatGauge) Avg() float64 {
count := sg.Count()
if count == 0 {
return 0
}
return sg.Sum() / float64(count)
}
直方图实现 #
基础直方图 #
import (
"sort"
)
// Histogram 直方图
type Histogram struct {
*BaseMetric
buckets []float64
counts []int64
sum uint64 // float64 的位表示
count int64
mutex sync.RWMutex
}
// NewHistogram 创建直方图
func NewHistogram(name string, labels map[string]string, buckets []float64) *Histogram {
// 确保桶是排序的
sortedBuckets := make([]float64, len(buckets))
copy(sortedBuckets, buckets)
sort.Float64s(sortedBuckets)
return &Histogram{
BaseMetric: NewBaseMetric(name, labels),
buckets: sortedBuckets,
counts: make([]int64, len(sortedBuckets)),
sum: math.Float64bits(0),
count: 0,
}
}
// Type 返回指标类型
func (h *Histogram) Type() MetricType {
return HistogramType
}
// Value 返回直方图数据
func (h *Histogram) Value() interface{} {
h.mutex.RLock()
defer h.mutex.RUnlock()
buckets := make(map[string]int64)
for i, bucket := range h.buckets {
buckets[fmt.Sprintf("%.2f", bucket)] = atomic.LoadInt64(&h.counts[i])
}
return map[string]interface{}{
"buckets": buckets,
"count": atomic.LoadInt64(&h.count),
"sum": math.Float64frombits(atomic.LoadUint64(&h.sum)),
}
}
// Observe 观察一个值
func (h *Histogram) Observe(value float64) {
// 更新总数和总和
atomic.AddInt64(&h.count, 1)
for {
oldBits := atomic.LoadUint64(&h.sum)
oldSum := math.Float64frombits(oldBits)
newSum := oldSum + value
newBits := math.Float64bits(newSum)
if atomic.CompareAndSwapUint64(&h.sum, oldBits, newBits) {
break
}
}
// 更新桶计数
h.mutex.RLock()
for i, bucket := range h.buckets {
if value <= bucket {
atomic.AddInt64(&h.counts[i], 1)
}
}
h.mutex.RUnlock()
h.updateTimestamp()
}
// Count 获取观察总数
func (h *Histogram) Count() int64 {
return atomic.LoadInt64(&h.count)
}
// Sum 获取观察值总和
func (h *Histogram) Sum() float64 {
return math.Float64frombits(atomic.LoadUint64(&h.sum))
}
// Buckets 获取桶信息
func (h *Histogram) Buckets() map[float64]int64 {
h.mutex.RLock()
defer h.mutex.RUnlock()
result := make(map[float64]int64)
for i, bucket := range h.buckets {
result[bucket] = atomic.LoadInt64(&h.counts[i])
}
return result
}
// Quantile 计算分位数
func (h *Histogram) Quantile(q float64) float64 {
if q < 0 || q > 1 {
return math.NaN()
}
count := h.Count()
if count == 0 {
return 0
}
target := float64(count) * q
h.mutex.RLock()
defer h.mutex.RUnlock()
cumulative := int64(0)
for i, bucket := range h.buckets {
cumulative += atomic.LoadInt64(&h.counts[i])
if float64(cumulative) >= target {
return bucket
}
}
return h.buckets[len(h.buckets)-1]
}
摘要实现 #
时间窗口摘要 #
import (
"container/ring"
"time"
)
// Summary 摘要
type Summary struct {
*BaseMetric
objectives map[float64]float64 // 分位数 -> 误差
samples *ring.Ring
maxAge time.Duration
ageBuckets int
mutex sync.RWMutex
}
// Sample 样本
type Sample struct {
Value float64
Timestamp time.Time
}
// NewSummary 创建摘要
func NewSummary(name string, labels map[string]string, objectives map[float64]float64, maxAge time.Duration, ageBuckets int) *Summary {
return &Summary{
BaseMetric: NewBaseMetric(name, labels),
objectives: objectives,
samples: ring.New(1000), // 初始容量
maxAge: maxAge,
ageBuckets: ageBuckets,
}
}
// Type 返回指标类型
func (s *Summary) Type() MetricType {
return SummaryType
}
// Value 返回摘要数据
func (s *Summary) Value() interface{} {
s.mutex.RLock()
defer s.mutex.RUnlock()
samples := s.getValidSamples()
if len(samples) == 0 {
return map[string]interface{}{
"count": 0,
"sum": 0,
"quantiles": make(map[string]float64),
}
}
// 排序样本
sort.Slice(samples, func(i, j int) bool {
return samples[i].Value < samples[j].Value
})
quantiles := make(map[string]float64)
for q := range s.objectives {
quantiles[fmt.Sprintf("%.2f", q)] = s.calculateQuantile(samples, q)
}
sum := 0.0
for _, sample := range samples {
sum += sample.Value
}
return map[string]interface{}{
"count": len(samples),
"sum": sum,
"quantiles": quantiles,
}
}
// Observe 观察一个值
func (s *Summary) Observe(value float64) {
s.mutex.Lock()
defer s.mutex.Unlock()
sample := &Sample{
Value: value,
Timestamp: time.Now(),
}
s.samples.Value = sample
s.samples = s.samples.Next()
s.updateTimestamp()
}
// getValidSamples 获取有效样本
func (s *Summary) getValidSamples() []*Sample {
cutoff := time.Now().Add(-s.maxAge)
var samples []*Sample
s.samples.Do(func(v interface{}) {
if v != nil {
if sample, ok := v.(*Sample); ok && sample.Timestamp.After(cutoff) {
samples = append(samples, sample)
}
}
})
return samples
}
// calculateQuantile 计算分位数
func (s *Summary) calculateQuantile(samples []*Sample, q float64) float64 {
if len(samples) == 0 {
return 0
}
index := q * float64(len(samples)-1)
lower := int(index)
upper := lower + 1
if upper >= len(samples) {
return samples[len(samples)-1].Value
}
if lower == upper {
return samples[lower].Value
}
// 线性插值
weight := index - float64(lower)
return samples[lower].Value*(1-weight) + samples[upper].Value*weight
}
指标注册表 #
注册表实现 #
// Registry 指标注册表
type Registry struct {
metrics map[string]Metric
mutex sync.RWMutex
}
// NewRegistry 创建注册表
func NewRegistry() *Registry {
return &Registry{
metrics: make(map[string]Metric),
}
}
// Register 注册指标
func (r *Registry) Register(metric Metric) error {
r.mutex.Lock()
defer r.mutex.Unlock()
key := r.buildKey(metric.Name(), metric.Labels())
if _, exists := r.metrics[key]; exists {
return fmt.Errorf("metric already registered: %s", key)
}
r.metrics[key] = metric
return nil
}
// Unregister 注销指标
func (r *Registry) Unregister(name string, labels map[string]string) {
r.mutex.Lock()
defer r.mutex.Unlock()
key := r.buildKey(name, labels)
delete(r.metrics, key)
}
// Get 获取指标
func (r *Registry) Get(name string, labels map[string]string) (Metric, bool) {
r.mutex.RLock()
defer r.mutex.RUnlock()
key := r.buildKey(name, labels)
metric, exists := r.metrics[key]
return metric, exists
}
// GetOrCreate 获取或创建指标
func (r *Registry) GetOrCreate(name string, labels map[string]string, metricType MetricType) Metric {
if metric, exists := r.Get(name, labels); exists {
return metric
}
r.mutex.Lock()
defer r.mutex.Unlock()
// 双重检查
key := r.buildKey(name, labels)
if metric, exists := r.metrics[key]; exists {
return metric
}
var metric Metric
switch metricType {
case CounterType:
metric = NewCounter(name, labels)
case GaugeType:
metric = NewGauge(name, labels)
case HistogramType:
// 使用默认桶
buckets := []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}
metric = NewHistogram(name, labels, buckets)
case SummaryType:
objectives := map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}
metric = NewSummary(name, labels, objectives, 10*time.Minute, 5)
default:
panic(fmt.Sprintf("unknown metric type: %v", metricType))
}
r.metrics[key] = metric
return metric
}
// All 获取所有指标
func (r *Registry) All() []Metric {
r.mutex.RLock()
defer r.mutex.RUnlock()
metrics := make([]Metric, 0, len(r.metrics))
for _, metric := range r.metrics {
metrics = append(metrics, metric)
}
return metrics
}
// buildKey 构建指标键
func (r *Registry) buildKey(name string, labels map[string]string) string {
if len(labels) == 0 {
return name
}
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
var parts []string
parts = append(parts, name)
for _, k := range keys {
parts = append(parts, fmt.Sprintf("%s=%s", k, labels[k]))
}
return strings.Join(parts, ",")
}
全局注册表 #
var (
defaultRegistry *Registry
registryOnce sync.Once
)
// DefaultRegistry 获取默认注册表
func DefaultRegistry() *Registry {
registryOnce.Do(func() {
defaultRegistry = NewRegistry()
})
return defaultRegistry
}
// 便利函数
func RegisterCounter(name string, labels map[string]string) *Counter {
counter := NewCounter(name, labels)
DefaultRegistry().Register(counter)
return counter
}
func RegisterGauge(name string, labels map[string]string) *Gauge {
gauge := NewGauge(name, labels)
DefaultRegistry().Register(gauge)
return gauge
}
func RegisterHistogram(name string, labels map[string]string, buckets []float64) *Histogram {
histogram := NewHistogram(name, labels, buckets)
DefaultRegistry().Register(histogram)
return histogram
}
func GetCounter(name string, labels map[string]string) *Counter {
metric := DefaultRegistry().GetOrCreate(name, labels, CounterType)
return metric.(*Counter)
}
func GetGauge(name string, labels map[string]string) *Gauge {
metric := DefaultRegistry().GetOrCreate(name, labels, GaugeType)
return metric.(*Gauge)
}
指标收集器 #
系统指标收集器 #
// SystemMetricsCollector 系统指标收集器
type SystemMetricsCollector struct {
registry *Registry
interval time.Duration
// 系统指标
cpuUsage *Gauge
memUsage *Gauge
diskUsage *Gauge
networkRx *Counter
networkTx *Counter
// 监控组件
cpuMonitor *CPUMonitor
memoryMonitor *MemoryMonitor
diskMonitor *DiskMonitor
networkMonitor *NetworkMonitor
}
// NewSystemMetricsCollector 创建系统指标收集器
func NewSystemMetricsCollector(registry *Registry, interval time.Duration) *SystemMetricsCollector {
collector := &SystemMetricsCollector{
registry: registry,
interval: interval,
cpuUsage: NewGauge("system_cpu_usage_percent", nil),
memUsage: NewGauge("system_memory_usage_percent", nil),
diskUsage: NewGauge("system_disk_usage_percent", map[string]string{"path": "/"}),
networkRx: NewCounter("system_network_rx_bytes_total", map[string]string{"interface": "eth0"}),
networkTx: NewCounter("system_network_tx_bytes_total", map[string]string{"interface": "eth0"}),
cpuMonitor: NewCPUMonitor(interval),
memoryMonitor: NewMemoryMonitor(),
diskMonitor: NewDiskMonitor(),
networkMonitor: NewNetworkMonitor(),
}
// 注册指标
registry.Register(collector.cpuUsage)
registry.Register(collector.memUsage)
registry.Register(collector.diskUsage)
registry.Register(collector.networkRx)
registry.Register(collector.networkTx)
return collector
}
// Start 开始收集
func (c *SystemMetricsCollector) Start(ctx context.Context) error {
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
c.collect()
}
}
}
// collect 收集指标
func (c *SystemMetricsCollector) collect() {
// 收集 CPU 使用率
if cpuUsage, err := c.cpuMonitor.GetCPUUsage(); err == nil {
c.cpuUsage.Set(cpuUsage)
}
// 收集内存使用率
if memUsage, err := c.memoryMonitor.GetMemoryUsage(); err == nil {
c.memUsage.Set(memUsage)
}
// 收集磁盘使用率
if diskInfo, err := c.diskMonitor.GetDiskUsage("/"); err == nil {
c.diskUsage.Set(diskInfo.UsageRate)
}
// 收集网络流量
if networkStats, err := c.networkMonitor.getNetworkStats(); err == nil {
if eth0, exists := networkStats["eth0"]; exists {
c.networkRx.Add(int64(eth0.RxBytes))
c.networkTx.Add(int64(eth0.TxBytes))
}
}
}
应用指标收集器 #
// AppMetricsCollector 应用指标收集器
type AppMetricsCollector struct {
registry *Registry
// HTTP 指标
httpRequests *Counter
httpDuration *Histogram
httpErrors *Counter
activeConnections *Gauge
// 业务指标
userLogins *Counter
orderCreated *Counter
paymentAmount *Histogram
}
// NewAppMetricsCollector 创建应用指标收集器
func NewAppMetricsCollector(registry *Registry) *AppMetricsCollector {
collector := &AppMetricsCollector{
registry: registry,
httpRequests: NewCounter("http_requests_total",
map[string]string{"method": "", "status": ""}),
httpDuration: NewHistogram("http_request_duration_seconds",
map[string]string{"method": "", "endpoint": ""},
[]float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}),
httpErrors: NewCounter("http_errors_total",
map[string]string{"method": "", "status": ""}),
activeConnections: NewGauge("http_active_connections", nil),
userLogins: NewCounter("user_logins_total", nil),
orderCreated: NewCounter("orders_created_total", map[string]string{"status": ""}),
paymentAmount: NewHistogram("payment_amount_dollars", nil,
[]float64{1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000}),
}
// 注册指标
registry.Register(collector.httpRequests)
registry.Register(collector.httpDuration)
registry.Register(collector.httpErrors)
registry.Register(collector.activeConnections)
registry.Register(collector.userLogins)
registry.Register(collector.orderCreated)
registry.Register(collector.paymentAmount)
return collector
}
// RecordHTTPRequest 记录 HTTP 请求
func (c *AppMetricsCollector) RecordHTTPRequest(method, status string, duration time.Duration) {
// 更新请求计数
requestCounter := c.registry.GetOrCreate("http_requests_total",
map[string]string{"method": method, "status": status}, CounterType).(*Counter)
requestCounter.Inc()
// 记录请求时长
durationHist := c.registry.GetOrCreate("http_request_duration_seconds",
map[string]string{"method": method}, HistogramType).(*Histogram)
durationHist.Observe(duration.Seconds())
// 记录错误
if status[0] == '4' || status[0] == '5' {
errorCounter := c.registry.GetOrCreate("http_errors_total",
map[string]string{"method": method, "status": status}, CounterType).(*Counter)
errorCounter.Inc()
}
}
// RecordUserLogin 记录用户登录
func (c *AppMetricsCollector) RecordUserLogin() {
c.userLogins.Inc()
}
// RecordOrderCreated 记录订单创建
func (c *AppMetricsCollector) RecordOrderCreated(status string) {
orderCounter := c.registry.GetOrCreate("orders_created_total",
map[string]string{"status": status}, CounterType).(*Counter)
orderCounter.Inc()
}
// RecordPayment 记录支付
func (c *AppMetricsCollector) RecordPayment(amount float64) {
c.paymentAmount.Observe(amount)
}
指标导出器 #
Prometheus 格式导出器 #
import (
"fmt"
"io"
"sort"
"strings"
)
// PrometheusExporter Prometheus 格式导出器
type PrometheusExporter struct {
registry *Registry
}
// NewPrometheusExporter 创建 Prometheus 导出器
func NewPrometheusExporter(registry *Registry) *PrometheusExporter {
return &PrometheusExporter{
registry: registry,
}
}
// Export 导出指标
func (e *PrometheusExporter) Export(writer io.Writer) error {
metrics := e.registry.All()
for _, metric := range metrics {
if err := e.writeMetric(writer, metric); err != nil {
return err
}
}
return nil
}
// writeMetric 写入单个指标
func (e *PrometheusExporter) writeMetric(writer io.Writer, metric Metric) error {
name := e.sanitizeName(metric.Name())
labels := metric.Labels()
switch metric.Type() {
case CounterType:
return e.writeCounter(writer, name, labels, metric)
case GaugeType:
return e.writeGauge(writer, name, labels, metric)
case HistogramType:
return e.writeHistogram(writer, name, labels, metric)
case SummaryType:
return e.writeSummary(writer, name, labels, metric)
default:
return fmt.Errorf("unknown metric type: %v", metric.Type())
}
}
// writeCounter 写入计数器
func (e *PrometheusExporter) writeCounter(writer io.Writer, name string, labels map[string]string, metric Metric) error {
labelStr := e.formatLabels(labels)
value := metric.Value()
_, err := fmt.Fprintf(writer, "# TYPE %s counter\n%s%s %v\n",
name, name, labelStr, value)
return err
}
// writeGauge 写入仪表盘
func (e *PrometheusExporter) writeGauge(writer io.Writer, name string, labels map[string]string, metric Metric) error {
labelStr := e.formatLabels(labels)
value := metric.Value()
_, err := fmt.Fprintf(writer, "# TYPE %s gauge\n%s%s %v\n",
name, name, labelStr, value)
return err
}
// writeHistogram 写入直方图
func (e *PrometheusExporter) writeHistogram(writer io.Writer, name string, labels map[string]string, metric Metric) error {
histogram := metric.(*Histogram)
// 写入类型声明
fmt.Fprintf(writer, "# TYPE %s histogram\n", name)
// 写入桶
buckets := histogram.Buckets()
bucketKeys := make([]float64, 0, len(buckets))
for bucket := range buckets {
bucketKeys = append(bucketKeys, bucket)
}
sort.Float64s(bucketKeys)
for _, bucket := range bucketKeys {
count := buckets[bucket]
bucketLabels := make(map[string]string)
for k, v := range labels {
bucketLabels[k] = v
}
bucketLabels["le"] = fmt.Sprintf("%g", bucket)
labelStr := e.formatLabels(bucketLabels)
fmt.Fprintf(writer, "%s_bucket%s %d\n", name, labelStr, count)
}
// 写入总数和总和
labelStr := e.formatLabels(labels)
fmt.Fprintf(writer, "%s_count%s %d\n", name, labelStr, histogram.Count())
fmt.Fprintf(writer, "%s_sum%s %f\n", name, labelStr, histogram.Sum())
return nil
}
// writeSummary 写入摘要
func (e *PrometheusExporter) writeSummary(writer io.Writer, name string, labels map[string]string, metric Metric) error {
summary := metric.(*Summary)
value := summary.Value().(map[string]interface{})
// 写入类型声明
fmt.Fprintf(writer, "# TYPE %s summary\n", name)
// 写入分位数
quantiles := value["quantiles"].(map[string]float64)
for q, v := range quantiles {
quantileLabels := make(map[string]string)
for k, v := range labels {
quantileLabels[k] = v
}
quantileLabels["quantile"] = q
labelStr := e.formatLabels(quantileLabels)
fmt.Fprintf(writer, "%s%s %f\n", name, labelStr, v)
}
// 写入总数和总和
labelStr := e.formatLabels(labels)
fmt.Fprintf(writer, "%s_count%s %v\n", name, labelStr, value["count"])
fmt.Fprintf(writer, "%s_sum%s %v\n", name, labelStr, value["sum"])
return nil
}
// formatLabels 格式化标签
func (e *PrometheusExporter) formatLabels(labels map[string]string) string {
if len(labels) == 0 {
return ""
}
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
var parts []string
for _, k := range keys {
parts = append(parts, fmt.Sprintf(`%s="%s"`, k, labels[k]))
}
return "{" + strings.Join(parts, ",") + "}"
}
// sanitizeName 清理指标名称
func (e *PrometheusExporter) sanitizeName(name string) string {
// 替换非法字符
result := strings.ReplaceAll(name, "-", "_")
result = strings.ReplaceAll(result, ".", "_")
result = strings.ReplaceAll(result, " ", "_")
return result
}
使用示例 #
基本使用 #
package main
import (
"context"
"fmt"
"net/http"
"os"
"time"
)
func main() {
// 创建注册表
registry := NewRegistry()
// 创建系统指标收集器
systemCollector := NewSystemMetricsCollector(registry, 5*time.Second)
// 创建应用指标收集器
appCollector := NewAppMetricsCollector(registry)
// 启动系统指标收集
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
if err := systemCollector.Start(ctx); err != nil {
fmt.Printf("System collector error: %v\n", err)
}
}()
// 模拟应用指标
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 模拟 HTTP 请求
appCollector.RecordHTTPRequest("GET", "200",
time.Duration(50+time.Now().UnixNano()%100)*time.Millisecond)
// 模拟用户登录
if time.Now().Second()%10 == 0 {
appCollector.RecordUserLogin()
}
// 模拟订单创建
if time.Now().Second()%15 == 0 {
appCollector.RecordOrderCreated("success")
}
// 模拟支付
if time.Now().Second()%20 == 0 {
appCollector.RecordPayment(float64(10 + time.Now().UnixNano()%100))
}
}
}
}()
// 创建 HTTP 服务器提供指标
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
exporter := NewPrometheusExporter(registry)
w.Header().Set("Content-Type", "text/plain")
exporter.Export(w)
})
fmt.Println("Metrics server starting on :8080")
fmt.Println("Visit http://localhost:8080/metrics to see metrics")
if err := http.ListenAndServe(":8080", nil); err != nil {
fmt.Printf("Server error: %v\n", err)
os.Exit(1)
}
}
中间件集成 #
// MetricsMiddleware HTTP 指标中间件
func MetricsMiddleware(collector *AppMetricsCollector) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 增加活跃连接数
collector.activeConnections.Inc()
defer collector.activeConnections.Dec()
// 包装 ResponseWriter 以捕获状态码
wrapped := &responseWriter{ResponseWriter: w, statusCode: 200}
// 处理请求
next.ServeHTTP(wrapped, r)
// 记录指标
duration := time.Since(start)
status := fmt.Sprintf("%d", wrapped.statusCode)
collector.RecordHTTPRequest(r.Method, status, duration)
})
}
}
type responseWriter struct {
http.ResponseWriter
statusCode int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}
小结 #
本节详细介绍了性能指标收集系统的设计和实现,包括:
- 指标类型:计数器、仪表盘、直方图和摘要的实现
- 注册表:指标的注册和管理机制
- 收集器:系统和应用指标的自动收集
- 导出器:Prometheus 格式的指标导出
- 中间件:HTTP 请求指标的自动收集
通过这个指标收集系统,我们可以全面监控应用程序和系统的性能状况,为性能优化和问题诊断提供数据支持。