4.8.2 系统监控工具 #
系统监控是运维工作的核心环节,本节将开发一个综合性的系统监控工具,实时收集系统资源使用情况、进程信息、网络状态等,并提供告警和可视化功能。
系统架构设计 #
整体架构 #
监控系统采用分层架构设计:
- 数据采集层:收集系统指标数据
- 数据处理层:数据聚合和分析
- 存储层:时序数据存储
- 告警层:阈值检测和通知
- 展示层:Web 界面和 API
// pkg/monitor/types.go
package monitor
import (
"time"
)
// MetricType 指标类型
type MetricType string
const (
MetricCPU MetricType = "cpu"
MetricMemory MetricType = "memory"
MetricDisk MetricType = "disk"
MetricNetwork MetricType = "network"
MetricProcess MetricType = "process"
)
// Metric 指标数据
type Metric struct {
Type MetricType `json:"type"`
Name string `json:"name"`
Value float64 `json:"value"`
Unit string `json:"unit"`
Labels map[string]string `json:"labels"`
Timestamp time.Time `json:"timestamp"`
Host string `json:"host"`
}
// SystemInfo 系统信息
type SystemInfo struct {
Hostname string `json:"hostname"`
OS string `json:"os"`
Architecture string `json:"architecture"`
CPUCores int `json:"cpu_cores"`
TotalMemory uint64 `json:"total_memory"`
Uptime time.Time `json:"uptime"`
}
// CPUStats CPU统计信息
type CPUStats struct {
Usage float64 `json:"usage"`
User float64 `json:"user"`
System float64 `json:"system"`
Idle float64 `json:"idle"`
IOWait float64 `json:"iowait"`
LoadAvg1 float64 `json:"load_avg_1"`
LoadAvg5 float64 `json:"load_avg_5"`
LoadAvg15 float64 `json:"load_avg_15"`
}
// MemoryStats 内存统计信息
type MemoryStats struct {
Total uint64 `json:"total"`
Available uint64 `json:"available"`
Used uint64 `json:"used"`
Free uint64 `json:"free"`
UsedPercent float64 `json:"used_percent"`
Buffers uint64 `json:"buffers"`
Cached uint64 `json:"cached"`
SwapTotal uint64 `json:"swap_total"`
SwapUsed uint64 `json:"swap_used"`
SwapFree uint64 `json:"swap_free"`
}
// DiskStats 磁盘统计信息
type DiskStats struct {
Device string `json:"device"`
Mountpoint string `json:"mountpoint"`
Filesystem string `json:"filesystem"`
Total uint64 `json:"total"`
Used uint64 `json:"used"`
Free uint64 `json:"free"`
UsedPercent float64 `json:"used_percent"`
InodesTotal uint64 `json:"inodes_total"`
InodesUsed uint64 `json:"inodes_used"`
InodesFree uint64 `json:"inodes_free"`
}
// NetworkStats 网络统计信息
type NetworkStats struct {
Interface string `json:"interface"`
BytesSent uint64 `json:"bytes_sent"`
BytesRecv uint64 `json:"bytes_recv"`
PacketsSent uint64 `json:"packets_sent"`
PacketsRecv uint64 `json:"packets_recv"`
ErrorsIn uint64 `json:"errors_in"`
ErrorsOut uint64 `json:"errors_out"`
DropsIn uint64 `json:"drops_in"`
DropsOut uint64 `json:"drops_out"`
}
// ProcessInfo 进程信息
type ProcessInfo struct {
PID int32 `json:"pid"`
Name string `json:"name"`
Status string `json:"status"`
CPUPercent float64 `json:"cpu_percent"`
MemoryRSS uint64 `json:"memory_rss"`
MemoryVMS uint64 `json:"memory_vms"`
MemoryPercent float64 `json:"memory_percent"`
CreateTime int64 `json:"create_time"`
NumThreads int32 `json:"num_threads"`
NumFDs int32 `json:"num_fds"`
Username string `json:"username"`
Cmdline string `json:"cmdline"`
}
数据采集器实现 #
// pkg/collector/system_collector.go
package collector
import (
"context"
"fmt"
"log"
"runtime"
"time"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/host"
"github.com/shirou/gopsutil/v3/load"
"github.com/shirou/gopsutil/v3/mem"
"github.com/shirou/gopsutil/v3/net"
"github.com/shirou/gopsutil/v3/process"
"your-project/pkg/monitor"
)
// SystemCollector 系统指标采集器
type SystemCollector struct {
hostname string
interval time.Duration
logger *log.Logger
metrics chan<- monitor.Metric
}
// NewSystemCollector 创建系统采集器
func NewSystemCollector(interval time.Duration, metrics chan<- monitor.Metric, logger *log.Logger) (*SystemCollector, error) {
hostname, err := host.Info()
if err != nil {
return nil, fmt.Errorf("failed to get host info: %w", err)
}
return &SystemCollector{
hostname: hostname.Hostname,
interval: interval,
logger: logger,
metrics: metrics,
}, nil
}
// Start 启动采集器
func (c *SystemCollector) Start(ctx context.Context) error {
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
c.logger.Printf("System collector started with interval %v", c.interval)
for {
select {
case <-ticker.C:
c.collectMetrics()
case <-ctx.Done():
c.logger.Println("System collector stopped")
return ctx.Err()
}
}
}
// collectMetrics 采集所有指标
func (c *SystemCollector) collectMetrics() {
now := time.Now()
// 采集CPU指标
if cpuStats, err := c.collectCPUStats(); err == nil {
c.sendCPUMetrics(cpuStats, now)
} else {
c.logger.Printf("Failed to collect CPU stats: %v", err)
}
// 采集内存指标
if memStats, err := c.collectMemoryStats(); err == nil {
c.sendMemoryMetrics(memStats, now)
} else {
c.logger.Printf("Failed to collect memory stats: %v", err)
}
// 采集磁盘指标
if diskStats, err := c.collectDiskStats(); err == nil {
c.sendDiskMetrics(diskStats, now)
} else {
c.logger.Printf("Failed to collect disk stats: %v", err)
}
// 采集网络指标
if netStats, err := c.collectNetworkStats(); err == nil {
c.sendNetworkMetrics(netStats, now)
} else {
c.logger.Printf("Failed to collect network stats: %v", err)
}
// 采集进程指标
if procStats, err := c.collectProcessStats(); err == nil {
c.sendProcessMetrics(procStats, now)
} else {
c.logger.Printf("Failed to collect process stats: %v", err)
}
}
// collectCPUStats 采集CPU统计信息
func (c *SystemCollector) collectCPUStats() (*monitor.CPUStats, error) {
// CPU使用率
cpuPercent, err := cpu.Percent(time.Second, false)
if err != nil {
return nil, err
}
// CPU时间统计
cpuTimes, err := cpu.Times(false)
if err != nil {
return nil, err
}
// 负载平均值
loadAvg, err := load.Avg()
if err != nil {
return nil, err
}
var usage float64
if len(cpuPercent) > 0 {
usage = cpuPercent[0]
}
var cpuTime cpu.TimesStat
if len(cpuTimes) > 0 {
cpuTime = cpuTimes[0]
}
return &monitor.CPUStats{
Usage: usage,
User: cpuTime.User,
System: cpuTime.System,
Idle: cpuTime.Idle,
IOWait: cpuTime.Iowait,
LoadAvg1: loadAvg.Load1,
LoadAvg5: loadAvg.Load5,
LoadAvg15: loadAvg.Load15,
}, nil
}
// collectMemoryStats 采集内存统计信息
func (c *SystemCollector) collectMemoryStats() (*monitor.MemoryStats, error) {
vmem, err := mem.VirtualMemory()
if err != nil {
return nil, err
}
swap, err := mem.SwapMemory()
if err != nil {
return nil, err
}
return &monitor.MemoryStats{
Total: vmem.Total,
Available: vmem.Available,
Used: vmem.Used,
Free: vmem.Free,
UsedPercent: vmem.UsedPercent,
Buffers: vmem.Buffers,
Cached: vmem.Cached,
SwapTotal: swap.Total,
SwapUsed: swap.Used,
SwapFree: swap.Free,
}, nil
}
// collectDiskStats 采集磁盘统计信息
func (c *SystemCollector) collectDiskStats() ([]*monitor.DiskStats, error) {
partitions, err := disk.Partitions(false)
if err != nil {
return nil, err
}
var diskStats []*monitor.DiskStats
for _, partition := range partitions {
usage, err := disk.Usage(partition.Mountpoint)
if err != nil {
continue
}
diskStat := &monitor.DiskStats{
Device: partition.Device,
Mountpoint: partition.Mountpoint,
Filesystem: partition.Fstype,
Total: usage.Total,
Used: usage.Used,
Free: usage.Free,
UsedPercent: usage.UsedPercent,
InodesTotal: usage.InodesTotal,
InodesUsed: usage.InodesUsed,
InodesFree: usage.InodesFree,
}
diskStats = append(diskStats, diskStat)
}
return diskStats, nil
}
// collectNetworkStats 采集网络统计信息
func (c *SystemCollector) collectNetworkStats() ([]*monitor.NetworkStats, error) {
interfaces, err := net.IOCounters(true)
if err != nil {
return nil, err
}
var netStats []*monitor.NetworkStats
for _, iface := range interfaces {
netStat := &monitor.NetworkStats{
Interface: iface.Name,
BytesSent: iface.BytesSent,
BytesRecv: iface.BytesRecv,
PacketsSent: iface.PacketsSent,
PacketsRecv: iface.PacketsRecv,
ErrorsIn: iface.Errin,
ErrorsOut: iface.Errout,
DropsIn: iface.Dropin,
DropsOut: iface.Dropout,
}
netStats = append(netStats, netStat)
}
return netStats, nil
}
// collectProcessStats 采集进程统计信息
func (c *SystemCollector) collectProcessStats() ([]*monitor.ProcessInfo, error) {
pids, err := process.Pids()
if err != nil {
return nil, err
}
var processStats []*monitor.ProcessInfo
// 限制进程数量,避免过多数据
maxProcesses := 100
count := 0
for _, pid := range pids {
if count >= maxProcesses {
break
}
proc, err := process.NewProcess(pid)
if err != nil {
continue
}
name, _ := proc.Name()
status, _ := proc.Status()
cpuPercent, _ := proc.CPUPercent()
memInfo, _ := proc.MemoryInfo()
memPercent, _ := proc.MemoryPercent()
createTime, _ := proc.CreateTime()
numThreads, _ := proc.NumThreads()
numFDs, _ := proc.NumFDs()
username, _ := proc.Username()
cmdline, _ := proc.Cmdline()
var memRSS, memVMS uint64
if memInfo != nil {
memRSS = memInfo.RSS
memVMS = memInfo.VMS
}
processInfo := &monitor.ProcessInfo{
PID: pid,
Name: name,
Status: status,
CPUPercent: cpuPercent,
MemoryRSS: memRSS,
MemoryVMS: memVMS,
MemoryPercent: float64(memPercent),
CreateTime: createTime,
NumThreads: numThreads,
NumFDs: numFDs,
Username: username,
Cmdline: cmdline,
}
processStats = append(processStats, processInfo)
count++
}
return processStats, nil
}
// sendCPUMetrics 发送CPU指标
func (c *SystemCollector) sendCPUMetrics(stats *monitor.CPUStats, timestamp time.Time) {
metrics := []monitor.Metric{
{
Type: monitor.MetricCPU,
Name: "cpu_usage_percent",
Value: stats.Usage,
Unit: "percent",
Timestamp: timestamp,
Host: c.hostname,
},
{
Type: monitor.MetricCPU,
Name: "cpu_user_time",
Value: stats.User,
Unit: "seconds",
Timestamp: timestamp,
Host: c.hostname,
},
{
Type: monitor.MetricCPU,
Name: "cpu_system_time",
Value: stats.System,
Unit: "seconds",
Timestamp: timestamp,
Host: c.hostname,
},
{
Type: monitor.MetricCPU,
Name: "load_average_1m",
Value: stats.LoadAvg1,
Unit: "load",
Timestamp: timestamp,
Host: c.hostname,
},
{
Type: monitor.MetricCPU,
Name: "load_average_5m",
Value: stats.LoadAvg5,
Unit: "load",
Timestamp: timestamp,
Host: c.hostname,
},
{
Type: monitor.MetricCPU,
Name: "load_average_15m",
Value: stats.LoadAvg15,
Unit: "load",
Timestamp: timestamp,
Host: c.hostname,
},
}
for _, metric := range metrics {
select {
case c.metrics <- metric:
default:
c.logger.Println("Metrics channel is full, dropping CPU metric")
}
}
}
// sendMemoryMetrics 发送内存指标
func (c *SystemCollector) sendMemoryMetrics(stats *monitor.MemoryStats, timestamp time.Time) {
metrics := []monitor.Metric{
{
Type: monitor.MetricMemory,
Name: "memory_total",
Value: float64(stats.Total),
Unit: "bytes",
Timestamp: timestamp,
Host: c.hostname,
},
{
Type: monitor.MetricMemory,
Name: "memory_used",
Value: float64(stats.Used),
Unit: "bytes",
Timestamp: timestamp,
Host: c.hostname,
},
{
Type: monitor.MetricMemory,
Name: "memory_available",
Value: float64(stats.Available),
Unit: "bytes",
Timestamp: timestamp,
Host: c.hostname,
},
{
Type: monitor.MetricMemory,
Name: "memory_used_percent",
Value: stats.UsedPercent,
Unit: "percent",
Timestamp: timestamp,
Host: c.hostname,
},
{
Type: monitor.MetricMemory,
Name: "swap_total",
Value: float64(stats.SwapTotal),
Unit: "bytes",
Timestamp: timestamp,
Host: c.hostname,
},
{
Type: monitor.MetricMemory,
Name: "swap_used",
Value: float64(stats.SwapUsed),
Unit: "bytes",
Timestamp: timestamp,
Host: c.hostname,
},
}
for _, metric := range metrics {
select {
case c.metrics <- metric:
default:
c.logger.Println("Metrics channel is full, dropping memory metric")
}
}
}
告警系统实现 #
// pkg/alert/alert_manager.go
package alert
import (
"context"
"fmt"
"log"
"sync"
"time"
"your-project/pkg/monitor"
)
// AlertLevel 告警级别
type AlertLevel string
const (
AlertInfo AlertLevel = "info"
AlertWarning AlertLevel = "warning"
AlertCritical AlertLevel = "critical"
)
// AlertRule 告警规则
type AlertRule struct {
ID string `json:"id"`
Name string `json:"name"`
MetricType monitor.MetricType `json:"metric_type"`
MetricName string `json:"metric_name"`
Operator string `json:"operator"` // >, <, >=, <=, ==, !=
Threshold float64 `json:"threshold"`
Duration time.Duration `json:"duration"`
Level AlertLevel `json:"level"`
Labels map[string]string `json:"labels"`
Enabled bool `json:"enabled"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// Alert 告警事件
type Alert struct {
ID string `json:"id"`
RuleID string `json:"rule_id"`
RuleName string `json:"rule_name"`
Level AlertLevel `json:"level"`
Message string `json:"message"`
Labels map[string]string `json:"labels"`
Value float64 `json:"value"`
Threshold float64 `json:"threshold"`
Host string `json:"host"`
Status string `json:"status"` // firing, resolved
StartsAt time.Time `json:"starts_at"`
EndsAt *time.Time `json:"ends_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
// AlertManager 告警管理器
type AlertManager struct {
rules map[string]*AlertRule
alerts map[string]*Alert
ruleStates map[string]*RuleState
mu sync.RWMutex
logger *log.Logger
notifiers []Notifier
}
// RuleState 规则状态
type RuleState struct {
RuleID string
LastValue float64
LastCheck time.Time
FiringStart *time.Time
AlertID string
}
// Notifier 通知器接口
type Notifier interface {
Notify(ctx context.Context, alert *Alert) error
}
// NewAlertManager 创建告警管理器
func NewAlertManager(logger *log.Logger) *AlertManager {
return &AlertManager{
rules: make(map[string]*AlertRule),
alerts: make(map[string]*Alert),
ruleStates: make(map[string]*RuleState),
logger: logger,
notifiers: make([]Notifier, 0),
}
}
// AddRule 添加告警规则
func (am *AlertManager) AddRule(rule *AlertRule) {
am.mu.Lock()
defer am.mu.Unlock()
rule.CreatedAt = time.Now()
rule.UpdatedAt = time.Now()
am.rules[rule.ID] = rule
// 初始化规则状态
am.ruleStates[rule.ID] = &RuleState{
RuleID: rule.ID,
LastCheck: time.Now(),
}
am.logger.Printf("Alert rule added: %s", rule.Name)
}
// AddNotifier 添加通知器
func (am *AlertManager) AddNotifier(notifier Notifier) {
am.notifiers = append(am.notifiers, notifier)
}
// ProcessMetric 处理指标数据
func (am *AlertManager) ProcessMetric(metric monitor.Metric) {
am.mu.RLock()
defer am.mu.RUnlock()
for _, rule := range am.rules {
if !rule.Enabled {
continue
}
if rule.MetricType == metric.Type && rule.MetricName == metric.Name {
am.evaluateRule(rule, metric)
}
}
}
// evaluateRule 评估告警规则
func (am *AlertManager) evaluateRule(rule *AlertRule, metric monitor.Metric) {
state := am.ruleStates[rule.ID]
state.LastValue = metric.Value
state.LastCheck = time.Now()
// 检查阈值
triggered := am.checkThreshold(rule.Operator, metric.Value, rule.Threshold)
if triggered {
if state.FiringStart == nil {
// 开始触发
now := time.Now()
state.FiringStart = &now
} else if time.Since(*state.FiringStart) >= rule.Duration {
// 持续时间满足,触发告警
if state.AlertID == "" {
alert := am.createAlert(rule, metric)
state.AlertID = alert.ID
am.alerts[alert.ID] = alert
am.sendAlert(alert)
}
}
} else {
// 恢复
if state.AlertID != "" {
if alert, exists := am.alerts[state.AlertID]; exists {
alert.Status = "resolved"
now := time.Now()
alert.EndsAt = &now
am.sendAlert(alert)
}
state.AlertID = ""
}
state.FiringStart = nil
}
}
// checkThreshold 检查阈值
func (am *AlertManager) checkThreshold(operator string, value, threshold float64) bool {
switch operator {
case ">":
return value > threshold
case "<":
return value < threshold
case ">=":
return value >= threshold
case "<=":
return value <= threshold
case "==":
return value == threshold
case "!=":
return value != threshold
default:
return false
}
}
// createAlert 创建告警
func (am *AlertManager) createAlert(rule *AlertRule, metric monitor.Metric) *Alert {
alert := &Alert{
ID: fmt.Sprintf("alert_%d", time.Now().UnixNano()),
RuleID: rule.ID,
RuleName: rule.Name,
Level: rule.Level,
Message: fmt.Sprintf("%s %s %.2f (threshold: %.2f)", rule.Name, rule.Operator, metric.Value, rule.Threshold),
Labels: rule.Labels,
Value: metric.Value,
Threshold: rule.Threshold,
Host: metric.Host,
Status: "firing",
StartsAt: time.Now(),
CreatedAt: time.Now(),
}
return alert
}
// sendAlert 发送告警
func (am *AlertManager) sendAlert(alert *Alert) {
for _, notifier := range am.notifiers {
go func(n Notifier) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := n.Notify(ctx, alert); err != nil {
am.logger.Printf("Failed to send alert via notifier: %v", err)
}
}(notifier)
}
am.logger.Printf("Alert %s: %s", alert.Status, alert.Message)
}
通知器实现 #
// pkg/alert/email_notifier.go
package alert
import (
"context"
"fmt"
"net/smtp"
"strings"
)
// EmailNotifier 邮件通知器
type EmailNotifier struct {
SMTPHost string
SMTPPort string
Username string
Password string
From string
To []string
}
// NewEmailNotifier 创建邮件通知器
func NewEmailNotifier(host, port, username, password, from string, to []string) *EmailNotifier {
return &EmailNotifier{
SMTPHost: host,
SMTPPort: port,
Username: username,
Password: password,
From: from,
To: to,
}
}
// Notify 发送邮件通知
func (en *EmailNotifier) Notify(ctx context.Context, alert *Alert) error {
subject := fmt.Sprintf("[%s] %s", strings.ToUpper(string(alert.Level)), alert.RuleName)
body := fmt.Sprintf(`
Alert: %s
Level: %s
Host: %s
Message: %s
Value: %.2f
Threshold: %.2f
Status: %s
Time: %s
`, alert.RuleName, alert.Level, alert.Host, alert.Message,
alert.Value, alert.Threshold, alert.Status, alert.StartsAt.Format("2006-01-02 15:04:05"))
msg := fmt.Sprintf("To: %s\r\nSubject: %s\r\n\r\n%s",
strings.Join(en.To, ","), subject, body)
auth := smtp.PlainAuth("", en.Username, en.Password, en.SMTPHost)
addr := fmt.Sprintf("%s:%s", en.SMTPHost, en.SMTPPort)
return smtp.SendMail(addr, auth, en.From, en.To, []byte(msg))
}
// WebhookNotifier Webhook通知器
type WebhookNotifier struct {
URL string
}
// NewWebhookNotifier 创建Webhook通知器
func NewWebhookNotifier(url string) *WebhookNotifier {
return &WebhookNotifier{URL: url}
}
// Notify 发送Webhook通知
func (wn *WebhookNotifier) Notify(ctx context.Context, alert *Alert) error {
// 实现Webhook通知逻辑
// 这里可以发送HTTP POST请求到指定URL
return nil
}
Web API 实现 #
// pkg/api/server.go
package api
import (
"net/http"
"strconv"
"time"
"github.com/gin-gonic/gin"
"your-project/pkg/monitor"
"your-project/pkg/storage"
)
// Server API服务器
type Server struct {
storage storage.Storage
router *gin.Engine
}
// NewServer 创建API服务器
func NewServer(storage storage.Storage) *Server {
server := &Server{
storage: storage,
router: gin.Default(),
}
server.setupRoutes()
return server
}
// setupRoutes 设置路由
func (s *Server) setupRoutes() {
api := s.router.Group("/api/v1")
// 指标相关API
api.GET("/metrics", s.getMetrics)
api.GET("/metrics/:type", s.getMetricsByType)
// 系统信息API
api.GET("/system/info", s.getSystemInfo)
api.GET("/system/status", s.getSystemStatus)
// 告警相关API
api.GET("/alerts", s.getAlerts)
api.GET("/alerts/:id", s.getAlert)
api.POST("/alerts/rules", s.createAlertRule)
api.PUT("/alerts/rules/:id", s.updateAlertRule)
api.DELETE("/alerts/rules/:id", s.deleteAlertRule)
// 静态文件服务
s.router.Static("/static", "./web/static")
s.router.LoadHTMLGlob("web/templates/*")
s.router.GET("/", s.dashboard)
}
// getMetrics 获取指标数据
func (s *Server) getMetrics(c *gin.Context) {
// 解析查询参数
startTime := c.DefaultQuery("start", "")
endTime := c.DefaultQuery("end", "")
host := c.DefaultQuery("host", "")
var start, end time.Time
var err error
if startTime != "" {
start, err = time.Parse(time.RFC3339, startTime)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid start time"})
return
}
} else {
start = time.Now().Add(-1 * time.Hour)
}
if endTime != "" {
end, err = time.Parse(time.RFC3339, endTime)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid end time"})
return
}
} else {
end = time.Now()
}
metrics, err := s.storage.QueryMetrics(start, end, host)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"metrics": metrics})
}
// dashboard 仪表板页面
func (s *Server) dashboard(c *gin.Context) {
c.HTML(http.StatusOK, "dashboard.html", gin.H{
"title": "System Monitor Dashboard",
})
}
// Start 启动服务器
func (s *Server) Start(addr string) error {
return s.router.Run(addr)
}
主程序实现 #
// cmd/monitor/main.go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"your-project/pkg/alert"
"your-project/pkg/api"
"your-project/pkg/collector"
"your-project/pkg/monitor"
"your-project/pkg/storage"
)
func main() {
logger := log.New(os.Stdout, "[MONITOR] ", log.LstdFlags)
// 创建指标通道
metrics := make(chan monitor.Metric, 10000)
// 创建存储
store := storage.NewTimeSeriesStorage()
// 创建告警管理器
alertManager := alert.NewAlertManager(logger)
// 添加告警规则
cpuRule := &alert.AlertRule{
ID: "cpu_high",
Name: "CPU使用率过高",
MetricType: monitor.MetricCPU,
MetricName: "cpu_usage_percent",
Operator: ">",
Threshold: 80.0,
Duration: 5 * time.Minute,
Level: alert.AlertWarning,
Enabled: true,
}
alertManager.AddRule(cpuRule)
memoryRule := &alert.AlertRule{
ID: "memory_high",
Name: "内存使用率过高",
MetricType: monitor.MetricMemory,
MetricName: "memory_used_percent",
Operator: ">",
Threshold: 90.0,
Duration: 3 * time.Minute,
Level: alert.AlertCritical,
Enabled: true,
}
alertManager.AddRule(memoryRule)
// 添加邮件通知器
emailNotifier := alert.NewEmailNotifier(
"smtp.example.com", "587",
"[email protected]", "password",
"[email protected]",
[]string{"[email protected]"},
)
alertManager.AddNotifier(emailNotifier)
// 创建系统采集器
systemCollector, err := collector.NewSystemCollector(
30*time.Second, metrics, logger)
if err != nil {
log.Fatalf("Failed to create system collector: %v", err)
}
// 创建API服务器
apiServer := api.NewServer(store)
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 启动指标处理协程
go func() {
for metric := range metrics {
// 存储指标
if err := store.StoreMetric(metric); err != nil {
logger.Printf("Failed to store metric: %v", err)
}
// 处理告警
alertManager.ProcessMetric(metric)
}
}()
// 启动采集器
go func() {
if err := systemCollector.Start(ctx); err != nil {
logger.Printf("System collector error: %v", err)
}
}()
// 启动API服务器
go func() {
if err := apiServer.Start(":8080"); err != nil {
logger.Printf("API server error: %v", err)
}
}()
logger.Println("System monitor started")
// 等待信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
logger.Println("Shutting down system monitor...")
cancel()
close(metrics)
}
功能扩展 #
自定义指标采集 #
// CustomCollector 自定义采集器接口
type CustomCollector interface {
Name() string
Collect() ([]monitor.Metric, error)
}
// 应用指标采集器示例
type ApplicationCollector struct {
appName string
port int
}
func (ac *ApplicationCollector) Collect() ([]monitor.Metric, error) {
// 实现应用特定的指标采集逻辑
return nil, nil
}
数据可视化 #
<!-- web/templates/dashboard.html -->
<!DOCTYPE html>
<html>
<head>
<title>{{.title}}</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head>
<body>
<div id="cpu-chart">
<canvas id="cpuChart"></canvas>
</div>
<div id="memory-chart">
<canvas id="memoryChart"></canvas>
</div>
<script>
// 实现图表渲染逻辑
function updateCharts() {
fetch("/api/v1/metrics?type=cpu")
.then((response) => response.json())
.then((data) => {
// 更新CPU图表
});
}
setInterval(updateCharts, 30000);
</script>
</body>
</html>
通过本节的学习,我们实现了一个功能完整的系统监控工具,包括数据采集、存储、告警和可视化等核心功能。这个系统展示了如何运用 Go 语言进行系统级编程,构建实用的运维工具。