4.8.2 系统监控工具

4.8.2 系统监控工具 #

系统监控是运维工作的核心环节,本节将开发一个综合性的系统监控工具,实时收集系统资源使用情况、进程信息、网络状态等,并提供告警和可视化功能。

系统架构设计 #

整体架构 #

监控系统采用分层架构设计:

  • 数据采集层:收集系统指标数据
  • 数据处理层:数据聚合和分析
  • 存储层:时序数据存储
  • 告警层:阈值检测和通知
  • 展示层:Web 界面和 API
// pkg/monitor/types.go
package monitor

import (
    "time"
)

// MetricType 指标类型
type MetricType string

const (
    MetricCPU     MetricType = "cpu"
    MetricMemory  MetricType = "memory"
    MetricDisk    MetricType = "disk"
    MetricNetwork MetricType = "network"
    MetricProcess MetricType = "process"
)

// Metric 指标数据
type Metric struct {
    Type      MetricType        `json:"type"`
    Name      string            `json:"name"`
    Value     float64           `json:"value"`
    Unit      string            `json:"unit"`
    Labels    map[string]string `json:"labels"`
    Timestamp time.Time         `json:"timestamp"`
    Host      string            `json:"host"`
}

// SystemInfo 系统信息
type SystemInfo struct {
    Hostname     string    `json:"hostname"`
    OS           string    `json:"os"`
    Architecture string    `json:"architecture"`
    CPUCores     int       `json:"cpu_cores"`
    TotalMemory  uint64    `json:"total_memory"`
    Uptime       time.Time `json:"uptime"`
}

// CPUStats CPU统计信息
type CPUStats struct {
    Usage     float64 `json:"usage"`
    User      float64 `json:"user"`
    System    float64 `json:"system"`
    Idle      float64 `json:"idle"`
    IOWait    float64 `json:"iowait"`
    LoadAvg1  float64 `json:"load_avg_1"`
    LoadAvg5  float64 `json:"load_avg_5"`
    LoadAvg15 float64 `json:"load_avg_15"`
}

// MemoryStats 内存统计信息
type MemoryStats struct {
    Total       uint64  `json:"total"`
    Available   uint64  `json:"available"`
    Used        uint64  `json:"used"`
    Free        uint64  `json:"free"`
    UsedPercent float64 `json:"used_percent"`
    Buffers     uint64  `json:"buffers"`
    Cached      uint64  `json:"cached"`
    SwapTotal   uint64  `json:"swap_total"`
    SwapUsed    uint64  `json:"swap_used"`
    SwapFree    uint64  `json:"swap_free"`
}

// DiskStats 磁盘统计信息
type DiskStats struct {
    Device      string  `json:"device"`
    Mountpoint  string  `json:"mountpoint"`
    Filesystem  string  `json:"filesystem"`
    Total       uint64  `json:"total"`
    Used        uint64  `json:"used"`
    Free        uint64  `json:"free"`
    UsedPercent float64 `json:"used_percent"`
    InodesTotal uint64  `json:"inodes_total"`
    InodesUsed  uint64  `json:"inodes_used"`
    InodesFree  uint64  `json:"inodes_free"`
}

// NetworkStats 网络统计信息
type NetworkStats struct {
    Interface   string `json:"interface"`
    BytesSent   uint64 `json:"bytes_sent"`
    BytesRecv   uint64 `json:"bytes_recv"`
    PacketsSent uint64 `json:"packets_sent"`
    PacketsRecv uint64 `json:"packets_recv"`
    ErrorsIn    uint64 `json:"errors_in"`
    ErrorsOut   uint64 `json:"errors_out"`
    DropsIn     uint64 `json:"drops_in"`
    DropsOut    uint64 `json:"drops_out"`
}

// ProcessInfo 进程信息
type ProcessInfo struct {
    PID         int32   `json:"pid"`
    Name        string  `json:"name"`
    Status      string  `json:"status"`
    CPUPercent  float64 `json:"cpu_percent"`
    MemoryRSS   uint64  `json:"memory_rss"`
    MemoryVMS   uint64  `json:"memory_vms"`
    MemoryPercent float64 `json:"memory_percent"`
    CreateTime  int64   `json:"create_time"`
    NumThreads  int32   `json:"num_threads"`
    NumFDs      int32   `json:"num_fds"`
    Username    string  `json:"username"`
    Cmdline     string  `json:"cmdline"`
}

数据采集器实现 #

// pkg/collector/system_collector.go
package collector

import (
    "context"
    "fmt"
    "log"
    "runtime"
    "time"

    "github.com/shirou/gopsutil/v3/cpu"
    "github.com/shirou/gopsutil/v3/disk"
    "github.com/shirou/gopsutil/v3/host"
    "github.com/shirou/gopsutil/v3/load"
    "github.com/shirou/gopsutil/v3/mem"
    "github.com/shirou/gopsutil/v3/net"
    "github.com/shirou/gopsutil/v3/process"

    "your-project/pkg/monitor"
)

// SystemCollector 系统指标采集器
type SystemCollector struct {
    hostname string
    interval time.Duration
    logger   *log.Logger
    metrics  chan<- monitor.Metric
}

// NewSystemCollector 创建系统采集器
func NewSystemCollector(interval time.Duration, metrics chan<- monitor.Metric, logger *log.Logger) (*SystemCollector, error) {
    hostname, err := host.Info()
    if err != nil {
        return nil, fmt.Errorf("failed to get host info: %w", err)
    }

    return &SystemCollector{
        hostname: hostname.Hostname,
        interval: interval,
        logger:   logger,
        metrics:  metrics,
    }, nil
}

// Start 启动采集器
func (c *SystemCollector) Start(ctx context.Context) error {
    ticker := time.NewTicker(c.interval)
    defer ticker.Stop()

    c.logger.Printf("System collector started with interval %v", c.interval)

    for {
        select {
        case <-ticker.C:
            c.collectMetrics()
        case <-ctx.Done():
            c.logger.Println("System collector stopped")
            return ctx.Err()
        }
    }
}

// collectMetrics 采集所有指标
func (c *SystemCollector) collectMetrics() {
    now := time.Now()

    // 采集CPU指标
    if cpuStats, err := c.collectCPUStats(); err == nil {
        c.sendCPUMetrics(cpuStats, now)
    } else {
        c.logger.Printf("Failed to collect CPU stats: %v", err)
    }

    // 采集内存指标
    if memStats, err := c.collectMemoryStats(); err == nil {
        c.sendMemoryMetrics(memStats, now)
    } else {
        c.logger.Printf("Failed to collect memory stats: %v", err)
    }

    // 采集磁盘指标
    if diskStats, err := c.collectDiskStats(); err == nil {
        c.sendDiskMetrics(diskStats, now)
    } else {
        c.logger.Printf("Failed to collect disk stats: %v", err)
    }

    // 采集网络指标
    if netStats, err := c.collectNetworkStats(); err == nil {
        c.sendNetworkMetrics(netStats, now)
    } else {
        c.logger.Printf("Failed to collect network stats: %v", err)
    }

    // 采集进程指标
    if procStats, err := c.collectProcessStats(); err == nil {
        c.sendProcessMetrics(procStats, now)
    } else {
        c.logger.Printf("Failed to collect process stats: %v", err)
    }
}

// collectCPUStats 采集CPU统计信息
func (c *SystemCollector) collectCPUStats() (*monitor.CPUStats, error) {
    // CPU使用率
    cpuPercent, err := cpu.Percent(time.Second, false)
    if err != nil {
        return nil, err
    }

    // CPU时间统计
    cpuTimes, err := cpu.Times(false)
    if err != nil {
        return nil, err
    }

    // 负载平均值
    loadAvg, err := load.Avg()
    if err != nil {
        return nil, err
    }

    var usage float64
    if len(cpuPercent) > 0 {
        usage = cpuPercent[0]
    }

    var cpuTime cpu.TimesStat
    if len(cpuTimes) > 0 {
        cpuTime = cpuTimes[0]
    }

    return &monitor.CPUStats{
        Usage:     usage,
        User:      cpuTime.User,
        System:    cpuTime.System,
        Idle:      cpuTime.Idle,
        IOWait:    cpuTime.Iowait,
        LoadAvg1:  loadAvg.Load1,
        LoadAvg5:  loadAvg.Load5,
        LoadAvg15: loadAvg.Load15,
    }, nil
}

// collectMemoryStats 采集内存统计信息
func (c *SystemCollector) collectMemoryStats() (*monitor.MemoryStats, error) {
    vmem, err := mem.VirtualMemory()
    if err != nil {
        return nil, err
    }

    swap, err := mem.SwapMemory()
    if err != nil {
        return nil, err
    }

    return &monitor.MemoryStats{
        Total:       vmem.Total,
        Available:   vmem.Available,
        Used:        vmem.Used,
        Free:        vmem.Free,
        UsedPercent: vmem.UsedPercent,
        Buffers:     vmem.Buffers,
        Cached:      vmem.Cached,
        SwapTotal:   swap.Total,
        SwapUsed:    swap.Used,
        SwapFree:    swap.Free,
    }, nil
}

// collectDiskStats 采集磁盘统计信息
func (c *SystemCollector) collectDiskStats() ([]*monitor.DiskStats, error) {
    partitions, err := disk.Partitions(false)
    if err != nil {
        return nil, err
    }

    var diskStats []*monitor.DiskStats

    for _, partition := range partitions {
        usage, err := disk.Usage(partition.Mountpoint)
        if err != nil {
            continue
        }

        diskStat := &monitor.DiskStats{
            Device:      partition.Device,
            Mountpoint:  partition.Mountpoint,
            Filesystem:  partition.Fstype,
            Total:       usage.Total,
            Used:        usage.Used,
            Free:        usage.Free,
            UsedPercent: usage.UsedPercent,
            InodesTotal: usage.InodesTotal,
            InodesUsed:  usage.InodesUsed,
            InodesFree:  usage.InodesFree,
        }

        diskStats = append(diskStats, diskStat)
    }

    return diskStats, nil
}

// collectNetworkStats 采集网络统计信息
func (c *SystemCollector) collectNetworkStats() ([]*monitor.NetworkStats, error) {
    interfaces, err := net.IOCounters(true)
    if err != nil {
        return nil, err
    }

    var netStats []*monitor.NetworkStats

    for _, iface := range interfaces {
        netStat := &monitor.NetworkStats{
            Interface:   iface.Name,
            BytesSent:   iface.BytesSent,
            BytesRecv:   iface.BytesRecv,
            PacketsSent: iface.PacketsSent,
            PacketsRecv: iface.PacketsRecv,
            ErrorsIn:    iface.Errin,
            ErrorsOut:   iface.Errout,
            DropsIn:     iface.Dropin,
            DropsOut:    iface.Dropout,
        }

        netStats = append(netStats, netStat)
    }

    return netStats, nil
}

// collectProcessStats 采集进程统计信息
func (c *SystemCollector) collectProcessStats() ([]*monitor.ProcessInfo, error) {
    pids, err := process.Pids()
    if err != nil {
        return nil, err
    }

    var processStats []*monitor.ProcessInfo

    // 限制进程数量,避免过多数据
    maxProcesses := 100
    count := 0

    for _, pid := range pids {
        if count >= maxProcesses {
            break
        }

        proc, err := process.NewProcess(pid)
        if err != nil {
            continue
        }

        name, _ := proc.Name()
        status, _ := proc.Status()
        cpuPercent, _ := proc.CPUPercent()
        memInfo, _ := proc.MemoryInfo()
        memPercent, _ := proc.MemoryPercent()
        createTime, _ := proc.CreateTime()
        numThreads, _ := proc.NumThreads()
        numFDs, _ := proc.NumFDs()
        username, _ := proc.Username()
        cmdline, _ := proc.Cmdline()

        var memRSS, memVMS uint64
        if memInfo != nil {
            memRSS = memInfo.RSS
            memVMS = memInfo.VMS
        }

        processInfo := &monitor.ProcessInfo{
            PID:           pid,
            Name:          name,
            Status:        status,
            CPUPercent:    cpuPercent,
            MemoryRSS:     memRSS,
            MemoryVMS:     memVMS,
            MemoryPercent: float64(memPercent),
            CreateTime:    createTime,
            NumThreads:    numThreads,
            NumFDs:        numFDs,
            Username:      username,
            Cmdline:       cmdline,
        }

        processStats = append(processStats, processInfo)
        count++
    }

    return processStats, nil
}

// sendCPUMetrics 发送CPU指标
func (c *SystemCollector) sendCPUMetrics(stats *monitor.CPUStats, timestamp time.Time) {
    metrics := []monitor.Metric{
        {
            Type:      monitor.MetricCPU,
            Name:      "cpu_usage_percent",
            Value:     stats.Usage,
            Unit:      "percent",
            Timestamp: timestamp,
            Host:      c.hostname,
        },
        {
            Type:      monitor.MetricCPU,
            Name:      "cpu_user_time",
            Value:     stats.User,
            Unit:      "seconds",
            Timestamp: timestamp,
            Host:      c.hostname,
        },
        {
            Type:      monitor.MetricCPU,
            Name:      "cpu_system_time",
            Value:     stats.System,
            Unit:      "seconds",
            Timestamp: timestamp,
            Host:      c.hostname,
        },
        {
            Type:      monitor.MetricCPU,
            Name:      "load_average_1m",
            Value:     stats.LoadAvg1,
            Unit:      "load",
            Timestamp: timestamp,
            Host:      c.hostname,
        },
        {
            Type:      monitor.MetricCPU,
            Name:      "load_average_5m",
            Value:     stats.LoadAvg5,
            Unit:      "load",
            Timestamp: timestamp,
            Host:      c.hostname,
        },
        {
            Type:      monitor.MetricCPU,
            Name:      "load_average_15m",
            Value:     stats.LoadAvg15,
            Unit:      "load",
            Timestamp: timestamp,
            Host:      c.hostname,
        },
    }

    for _, metric := range metrics {
        select {
        case c.metrics <- metric:
        default:
            c.logger.Println("Metrics channel is full, dropping CPU metric")
        }
    }
}

// sendMemoryMetrics 发送内存指标
func (c *SystemCollector) sendMemoryMetrics(stats *monitor.MemoryStats, timestamp time.Time) {
    metrics := []monitor.Metric{
        {
            Type:      monitor.MetricMemory,
            Name:      "memory_total",
            Value:     float64(stats.Total),
            Unit:      "bytes",
            Timestamp: timestamp,
            Host:      c.hostname,
        },
        {
            Type:      monitor.MetricMemory,
            Name:      "memory_used",
            Value:     float64(stats.Used),
            Unit:      "bytes",
            Timestamp: timestamp,
            Host:      c.hostname,
        },
        {
            Type:      monitor.MetricMemory,
            Name:      "memory_available",
            Value:     float64(stats.Available),
            Unit:      "bytes",
            Timestamp: timestamp,
            Host:      c.hostname,
        },
        {
            Type:      monitor.MetricMemory,
            Name:      "memory_used_percent",
            Value:     stats.UsedPercent,
            Unit:      "percent",
            Timestamp: timestamp,
            Host:      c.hostname,
        },
        {
            Type:      monitor.MetricMemory,
            Name:      "swap_total",
            Value:     float64(stats.SwapTotal),
            Unit:      "bytes",
            Timestamp: timestamp,
            Host:      c.hostname,
        },
        {
            Type:      monitor.MetricMemory,
            Name:      "swap_used",
            Value:     float64(stats.SwapUsed),
            Unit:      "bytes",
            Timestamp: timestamp,
            Host:      c.hostname,
        },
    }

    for _, metric := range metrics {
        select {
        case c.metrics <- metric:
        default:
            c.logger.Println("Metrics channel is full, dropping memory metric")
        }
    }
}

告警系统实现 #

// pkg/alert/alert_manager.go
package alert

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"

    "your-project/pkg/monitor"
)

// AlertLevel 告警级别
type AlertLevel string

const (
    AlertInfo     AlertLevel = "info"
    AlertWarning  AlertLevel = "warning"
    AlertCritical AlertLevel = "critical"
)

// AlertRule 告警规则
type AlertRule struct {
    ID          string                 `json:"id"`
    Name        string                 `json:"name"`
    MetricType  monitor.MetricType     `json:"metric_type"`
    MetricName  string                 `json:"metric_name"`
    Operator    string                 `json:"operator"` // >, <, >=, <=, ==, !=
    Threshold   float64                `json:"threshold"`
    Duration    time.Duration          `json:"duration"`
    Level       AlertLevel             `json:"level"`
    Labels      map[string]string      `json:"labels"`
    Enabled     bool                   `json:"enabled"`
    CreatedAt   time.Time              `json:"created_at"`
    UpdatedAt   time.Time              `json:"updated_at"`
}

// Alert 告警事件
type Alert struct {
    ID          string            `json:"id"`
    RuleID      string            `json:"rule_id"`
    RuleName    string            `json:"rule_name"`
    Level       AlertLevel        `json:"level"`
    Message     string            `json:"message"`
    Labels      map[string]string `json:"labels"`
    Value       float64           `json:"value"`
    Threshold   float64           `json:"threshold"`
    Host        string            `json:"host"`
    Status      string            `json:"status"` // firing, resolved
    StartsAt    time.Time         `json:"starts_at"`
    EndsAt      *time.Time        `json:"ends_at,omitempty"`
    CreatedAt   time.Time         `json:"created_at"`
}

// AlertManager 告警管理器
type AlertManager struct {
    rules       map[string]*AlertRule
    alerts      map[string]*Alert
    ruleStates  map[string]*RuleState
    mu          sync.RWMutex
    logger      *log.Logger
    notifiers   []Notifier
}

// RuleState 规则状态
type RuleState struct {
    RuleID      string
    LastValue   float64
    LastCheck   time.Time
    FiringStart *time.Time
    AlertID     string
}

// Notifier 通知器接口
type Notifier interface {
    Notify(ctx context.Context, alert *Alert) error
}

// NewAlertManager 创建告警管理器
func NewAlertManager(logger *log.Logger) *AlertManager {
    return &AlertManager{
        rules:      make(map[string]*AlertRule),
        alerts:     make(map[string]*Alert),
        ruleStates: make(map[string]*RuleState),
        logger:     logger,
        notifiers:  make([]Notifier, 0),
    }
}

// AddRule 添加告警规则
func (am *AlertManager) AddRule(rule *AlertRule) {
    am.mu.Lock()
    defer am.mu.Unlock()

    rule.CreatedAt = time.Now()
    rule.UpdatedAt = time.Now()
    am.rules[rule.ID] = rule

    // 初始化规则状态
    am.ruleStates[rule.ID] = &RuleState{
        RuleID:    rule.ID,
        LastCheck: time.Now(),
    }

    am.logger.Printf("Alert rule added: %s", rule.Name)
}

// AddNotifier 添加通知器
func (am *AlertManager) AddNotifier(notifier Notifier) {
    am.notifiers = append(am.notifiers, notifier)
}

// ProcessMetric 处理指标数据
func (am *AlertManager) ProcessMetric(metric monitor.Metric) {
    am.mu.RLock()
    defer am.mu.RUnlock()

    for _, rule := range am.rules {
        if !rule.Enabled {
            continue
        }

        if rule.MetricType == metric.Type && rule.MetricName == metric.Name {
            am.evaluateRule(rule, metric)
        }
    }
}

// evaluateRule 评估告警规则
func (am *AlertManager) evaluateRule(rule *AlertRule, metric monitor.Metric) {
    state := am.ruleStates[rule.ID]
    state.LastValue = metric.Value
    state.LastCheck = time.Now()

    // 检查阈值
    triggered := am.checkThreshold(rule.Operator, metric.Value, rule.Threshold)

    if triggered {
        if state.FiringStart == nil {
            // 开始触发
            now := time.Now()
            state.FiringStart = &now
        } else if time.Since(*state.FiringStart) >= rule.Duration {
            // 持续时间满足,触发告警
            if state.AlertID == "" {
                alert := am.createAlert(rule, metric)
                state.AlertID = alert.ID
                am.alerts[alert.ID] = alert
                am.sendAlert(alert)
            }
        }
    } else {
        // 恢复
        if state.AlertID != "" {
            if alert, exists := am.alerts[state.AlertID]; exists {
                alert.Status = "resolved"
                now := time.Now()
                alert.EndsAt = &now
                am.sendAlert(alert)
            }
            state.AlertID = ""
        }
        state.FiringStart = nil
    }
}

// checkThreshold 检查阈值
func (am *AlertManager) checkThreshold(operator string, value, threshold float64) bool {
    switch operator {
    case ">":
        return value > threshold
    case "<":
        return value < threshold
    case ">=":
        return value >= threshold
    case "<=":
        return value <= threshold
    case "==":
        return value == threshold
    case "!=":
        return value != threshold
    default:
        return false
    }
}

// createAlert 创建告警
func (am *AlertManager) createAlert(rule *AlertRule, metric monitor.Metric) *Alert {
    alert := &Alert{
        ID:        fmt.Sprintf("alert_%d", time.Now().UnixNano()),
        RuleID:    rule.ID,
        RuleName:  rule.Name,
        Level:     rule.Level,
        Message:   fmt.Sprintf("%s %s %.2f (threshold: %.2f)", rule.Name, rule.Operator, metric.Value, rule.Threshold),
        Labels:    rule.Labels,
        Value:     metric.Value,
        Threshold: rule.Threshold,
        Host:      metric.Host,
        Status:    "firing",
        StartsAt:  time.Now(),
        CreatedAt: time.Now(),
    }

    return alert
}

// sendAlert 发送告警
func (am *AlertManager) sendAlert(alert *Alert) {
    for _, notifier := range am.notifiers {
        go func(n Notifier) {
            ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
            defer cancel()

            if err := n.Notify(ctx, alert); err != nil {
                am.logger.Printf("Failed to send alert via notifier: %v", err)
            }
        }(notifier)
    }

    am.logger.Printf("Alert %s: %s", alert.Status, alert.Message)
}

通知器实现 #

// pkg/alert/email_notifier.go
package alert

import (
    "context"
    "fmt"
    "net/smtp"
    "strings"
)

// EmailNotifier 邮件通知器
type EmailNotifier struct {
    SMTPHost     string
    SMTPPort     string
    Username     string
    Password     string
    From         string
    To           []string
}

// NewEmailNotifier 创建邮件通知器
func NewEmailNotifier(host, port, username, password, from string, to []string) *EmailNotifier {
    return &EmailNotifier{
        SMTPHost: host,
        SMTPPort: port,
        Username: username,
        Password: password,
        From:     from,
        To:       to,
    }
}

// Notify 发送邮件通知
func (en *EmailNotifier) Notify(ctx context.Context, alert *Alert) error {
    subject := fmt.Sprintf("[%s] %s", strings.ToUpper(string(alert.Level)), alert.RuleName)
    body := fmt.Sprintf(`
Alert: %s
Level: %s
Host: %s
Message: %s
Value: %.2f
Threshold: %.2f
Status: %s
Time: %s
`, alert.RuleName, alert.Level, alert.Host, alert.Message,
   alert.Value, alert.Threshold, alert.Status, alert.StartsAt.Format("2006-01-02 15:04:05"))

    msg := fmt.Sprintf("To: %s\r\nSubject: %s\r\n\r\n%s",
        strings.Join(en.To, ","), subject, body)

    auth := smtp.PlainAuth("", en.Username, en.Password, en.SMTPHost)
    addr := fmt.Sprintf("%s:%s", en.SMTPHost, en.SMTPPort)

    return smtp.SendMail(addr, auth, en.From, en.To, []byte(msg))
}

// WebhookNotifier Webhook通知器
type WebhookNotifier struct {
    URL string
}

// NewWebhookNotifier 创建Webhook通知器
func NewWebhookNotifier(url string) *WebhookNotifier {
    return &WebhookNotifier{URL: url}
}

// Notify 发送Webhook通知
func (wn *WebhookNotifier) Notify(ctx context.Context, alert *Alert) error {
    // 实现Webhook通知逻辑
    // 这里可以发送HTTP POST请求到指定URL
    return nil
}

Web API 实现 #

// pkg/api/server.go
package api

import (
    "net/http"
    "strconv"
    "time"

    "github.com/gin-gonic/gin"
    "your-project/pkg/monitor"
    "your-project/pkg/storage"
)

// Server API服务器
type Server struct {
    storage storage.Storage
    router  *gin.Engine
}

// NewServer 创建API服务器
func NewServer(storage storage.Storage) *Server {
    server := &Server{
        storage: storage,
        router:  gin.Default(),
    }

    server.setupRoutes()
    return server
}

// setupRoutes 设置路由
func (s *Server) setupRoutes() {
    api := s.router.Group("/api/v1")

    // 指标相关API
    api.GET("/metrics", s.getMetrics)
    api.GET("/metrics/:type", s.getMetricsByType)

    // 系统信息API
    api.GET("/system/info", s.getSystemInfo)
    api.GET("/system/status", s.getSystemStatus)

    // 告警相关API
    api.GET("/alerts", s.getAlerts)
    api.GET("/alerts/:id", s.getAlert)
    api.POST("/alerts/rules", s.createAlertRule)
    api.PUT("/alerts/rules/:id", s.updateAlertRule)
    api.DELETE("/alerts/rules/:id", s.deleteAlertRule)

    // 静态文件服务
    s.router.Static("/static", "./web/static")
    s.router.LoadHTMLGlob("web/templates/*")
    s.router.GET("/", s.dashboard)
}

// getMetrics 获取指标数据
func (s *Server) getMetrics(c *gin.Context) {
    // 解析查询参数
    startTime := c.DefaultQuery("start", "")
    endTime := c.DefaultQuery("end", "")
    host := c.DefaultQuery("host", "")

    var start, end time.Time
    var err error

    if startTime != "" {
        start, err = time.Parse(time.RFC3339, startTime)
        if err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "invalid start time"})
            return
        }
    } else {
        start = time.Now().Add(-1 * time.Hour)
    }

    if endTime != "" {
        end, err = time.Parse(time.RFC3339, endTime)
        if err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "invalid end time"})
            return
        }
    } else {
        end = time.Now()
    }

    metrics, err := s.storage.QueryMetrics(start, end, host)
    if err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
        return
    }

    c.JSON(http.StatusOK, gin.H{"metrics": metrics})
}

// dashboard 仪表板页面
func (s *Server) dashboard(c *gin.Context) {
    c.HTML(http.StatusOK, "dashboard.html", gin.H{
        "title": "System Monitor Dashboard",
    })
}

// Start 启动服务器
func (s *Server) Start(addr string) error {
    return s.router.Run(addr)
}

主程序实现 #

// cmd/monitor/main.go
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "your-project/pkg/alert"
    "your-project/pkg/api"
    "your-project/pkg/collector"
    "your-project/pkg/monitor"
    "your-project/pkg/storage"
)

func main() {
    logger := log.New(os.Stdout, "[MONITOR] ", log.LstdFlags)

    // 创建指标通道
    metrics := make(chan monitor.Metric, 10000)

    // 创建存储
    store := storage.NewTimeSeriesStorage()

    // 创建告警管理器
    alertManager := alert.NewAlertManager(logger)

    // 添加告警规则
    cpuRule := &alert.AlertRule{
        ID:         "cpu_high",
        Name:       "CPU使用率过高",
        MetricType: monitor.MetricCPU,
        MetricName: "cpu_usage_percent",
        Operator:   ">",
        Threshold:  80.0,
        Duration:   5 * time.Minute,
        Level:      alert.AlertWarning,
        Enabled:    true,
    }
    alertManager.AddRule(cpuRule)

    memoryRule := &alert.AlertRule{
        ID:         "memory_high",
        Name:       "内存使用率过高",
        MetricType: monitor.MetricMemory,
        MetricName: "memory_used_percent",
        Operator:   ">",
        Threshold:  90.0,
        Duration:   3 * time.Minute,
        Level:      alert.AlertCritical,
        Enabled:    true,
    }
    alertManager.AddRule(memoryRule)

    // 添加邮件通知器
    emailNotifier := alert.NewEmailNotifier(
        "smtp.example.com", "587",
        "[email protected]", "password",
        "[email protected]",
        []string{"[email protected]"},
    )
    alertManager.AddNotifier(emailNotifier)

    // 创建系统采集器
    systemCollector, err := collector.NewSystemCollector(
        30*time.Second, metrics, logger)
    if err != nil {
        log.Fatalf("Failed to create system collector: %v", err)
    }

    // 创建API服务器
    apiServer := api.NewServer(store)

    // 创建上下文
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 启动指标处理协程
    go func() {
        for metric := range metrics {
            // 存储指标
            if err := store.StoreMetric(metric); err != nil {
                logger.Printf("Failed to store metric: %v", err)
            }

            // 处理告警
            alertManager.ProcessMetric(metric)
        }
    }()

    // 启动采集器
    go func() {
        if err := systemCollector.Start(ctx); err != nil {
            logger.Printf("System collector error: %v", err)
        }
    }()

    // 启动API服务器
    go func() {
        if err := apiServer.Start(":8080"); err != nil {
            logger.Printf("API server error: %v", err)
        }
    }()

    logger.Println("System monitor started")

    // 等待信号
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    <-sigChan
    logger.Println("Shutting down system monitor...")
    cancel()
    close(metrics)
}

功能扩展 #

自定义指标采集 #

// CustomCollector 自定义采集器接口
type CustomCollector interface {
    Name() string
    Collect() ([]monitor.Metric, error)
}

// 应用指标采集器示例
type ApplicationCollector struct {
    appName string
    port    int
}

func (ac *ApplicationCollector) Collect() ([]monitor.Metric, error) {
    // 实现应用特定的指标采集逻辑
    return nil, nil
}

数据可视化 #

<!-- web/templates/dashboard.html -->
<!DOCTYPE html>
<html>
  <head>
    <title>{{.title}}</title>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
  </head>
  <body>
    <div id="cpu-chart">
      <canvas id="cpuChart"></canvas>
    </div>
    <div id="memory-chart">
      <canvas id="memoryChart"></canvas>
    </div>

    <script>
      // 实现图表渲染逻辑
      function updateCharts() {
        fetch("/api/v1/metrics?type=cpu")
          .then((response) => response.json())
          .then((data) => {
            // 更新CPU图表
          });
      }

      setInterval(updateCharts, 30000);
    </script>
  </body>
</html>

通过本节的学习,我们实现了一个功能完整的系统监控工具,包括数据采集、存储、告警和可视化等核心功能。这个系统展示了如何运用 Go 语言进行系统级编程,构建实用的运维工具。