4.6.4 告警与通知

4.6.4 告警与通知 #

告警与通知系统是监控体系的重要组成部分,它能够在系统出现异常时及时通知相关人员,确保问题能够得到快速响应和处理。本节将详细介绍如何设计和实现一个完整的告警与通知系统。

告警系统架构 #

系统组件 #

一个完整的告警系统通常包含以下核心组件:

  • 规则引擎(Rule Engine):定义和评估告警规则
  • 告警管理器(Alert Manager):管理告警的生命周期
  • 通知器(Notifier):负责发送告警通知
  • 抑制器(Silencer):控制告警的抑制和静默
  • 聚合器(Aggregator):聚合相似的告警
  • 存储器(Storage):存储告警历史和配置

告警状态模型 #

package alerting

import (
    "time"
)

// AlertState 告警状态
type AlertState int

const (
    AlertStateInactive AlertState = iota // 未激活
    AlertStatePending                    // 待定
    AlertStateFiring                     // 触发中
    AlertStateResolved                   // 已解决
    AlertStateSilenced                   // 已静默
)

// String 返回状态字符串
func (s AlertState) String() string {
    switch s {
    case AlertStateInactive:
        return "inactive"
    case AlertStatePending:
        return "pending"
    case AlertStateFiring:
        return "firing"
    case AlertStateResolved:
        return "resolved"
    case AlertStateSilenced:
        return "silenced"
    default:
        return "unknown"
    }
}

// Alert 告警结构
type Alert struct {
    ID          string                 `json:"id"`
    Name        string                 `json:"name"`
    Description string                 `json:"description"`
    Labels      map[string]string      `json:"labels"`
    Annotations map[string]string      `json:"annotations"`
    State       AlertState             `json:"state"`
    Value       float64                `json:"value"`
    Threshold   float64                `json:"threshold"`
    StartsAt    time.Time              `json:"starts_at"`
    EndsAt      time.Time              `json:"ends_at,omitempty"`
    UpdatedAt   time.Time              `json:"updated_at"`
    Severity    Severity               `json:"severity"`
    Source      string                 `json:"source"`
    RuleID      string                 `json:"rule_id"`
}

// Severity 告警严重程度
type Severity int

const (
    SeverityInfo Severity = iota
    SeverityWarning
    SeverityError
    SeverityCritical
)

// String 返回严重程度字符串
func (s Severity) String() string {
    switch s {
    case SeverityInfo:
        return "info"
    case SeverityWarning:
        return "warning"
    case SeverityError:
        return "error"
    case SeverityCritical:
        return "critical"
    default:
        return "unknown"
    }
}

// IsActive 检查告警是否活跃
func (a *Alert) IsActive() bool {
    return a.State == AlertStateFiring || a.State == AlertStatePending
}

// Duration 获取告警持续时间
func (a *Alert) Duration() time.Duration {
    if a.EndsAt.IsZero() {
        return time.Since(a.StartsAt)
    }
    return a.EndsAt.Sub(a.StartsAt)
}

告警规则引擎 #

规则定义 #

import (
    "fmt"
    "regexp"
    "strconv"
    "strings"
    "time"
)

// Rule 告警规则
type Rule struct {
    ID          string            `json:"id"`
    Name        string            `json:"name"`
    Description string            `json:"description"`
    Query       string            `json:"query"`
    Condition   Condition         `json:"condition"`
    Duration    time.Duration     `json:"duration"`
    Labels      map[string]string `json:"labels"`
    Annotations map[string]string `json:"annotations"`
    Severity    Severity          `json:"severity"`
    Enabled     bool              `json:"enabled"`
    CreatedAt   time.Time         `json:"created_at"`
    UpdatedAt   time.Time         `json:"updated_at"`
}

// Condition 告警条件
type Condition struct {
    Operator  string  `json:"operator"`  // gt, lt, eq, ne, gte, lte
    Threshold float64 `json:"threshold"`
}

// RuleEngine 规则引擎
type RuleEngine struct {
    rules       map[string]*Rule
    evaluator   *Evaluator
    alertStates map[string]*AlertState
    mutex       sync.RWMutex
}

// AlertState 告警状态跟踪
type AlertStateTracker struct {
    Rule        *Rule
    LastValue   float64
    LastEval    time.Time
    FiringAt    time.Time
    State       AlertState
    Alert       *Alert
}

// NewRuleEngine 创建规则引擎
func NewRuleEngine(evaluator *Evaluator) *RuleEngine {
    return &RuleEngine{
        rules:       make(map[string]*Rule),
        evaluator:   evaluator,
        alertStates: make(map[string]*AlertStateTracker),
    }
}

// AddRule 添加规则
func (re *RuleEngine) AddRule(rule *Rule) error {
    re.mutex.Lock()
    defer re.mutex.Unlock()

    if rule.ID == "" {
        rule.ID = generateID()
    }

    rule.CreatedAt = time.Now()
    rule.UpdatedAt = time.Now()

    re.rules[rule.ID] = rule
    re.alertStates[rule.ID] = &AlertStateTracker{
        Rule:  rule,
        State: AlertStateInactive,
    }

    return nil
}

// RemoveRule 移除规则
func (re *RuleEngine) RemoveRule(ruleID string) {
    re.mutex.Lock()
    defer re.mutex.Unlock()

    delete(re.rules, ruleID)
    delete(re.alertStates, ruleID)
}

// GetRule 获取规则
func (re *RuleEngine) GetRule(ruleID string) (*Rule, bool) {
    re.mutex.RLock()
    defer re.mutex.RUnlock()

    rule, exists := re.rules[ruleID]
    return rule, exists
}

// ListRules 列出所有规则
func (re *RuleEngine) ListRules() []*Rule {
    re.mutex.RLock()
    defer re.mutex.RUnlock()

    rules := make([]*Rule, 0, len(re.rules))
    for _, rule := range re.rules {
        rules = append(rules, rule)
    }
    return rules
}

// Evaluate 评估所有规则
func (re *RuleEngine) Evaluate() ([]*Alert, error) {
    re.mutex.Lock()
    defer re.mutex.Unlock()

    var alerts []*Alert
    now := time.Now()

    for ruleID, rule := range re.rules {
        if !rule.Enabled {
            continue
        }

        alert, err := re.evaluateRule(rule, now)
        if err != nil {
            fmt.Printf("Error evaluating rule %s: %v\n", rule.Name, err)
            continue
        }

        if alert != nil {
            alerts = append(alerts, alert)
        }
    }

    return alerts, nil
}

// evaluateRule 评估单个规则
func (re *RuleEngine) evaluateRule(rule *Rule, now time.Time) (*Alert, error) {
    // 获取指标值
    value, err := re.evaluator.Query(rule.Query)
    if err != nil {
        return nil, err
    }

    state := re.alertStates[rule.ID]
    state.LastValue = value
    state.LastEval = now

    // 评估条件
    conditionMet := re.evaluateCondition(value, rule.Condition)

    switch state.State {
    case AlertStateInactive:
        if conditionMet {
            state.State = AlertStatePending
            state.FiringAt = now
        }

    case AlertStatePending:
        if conditionMet {
            // 检查是否超过持续时间
            if now.Sub(state.FiringAt) >= rule.Duration {
                state.State = AlertStateFiring
                return re.createAlert(rule, state, now), nil
            }
        } else {
            state.State = AlertStateInactive
            state.FiringAt = time.Time{}
        }

    case AlertStateFiring:
        if !conditionMet {
            state.State = AlertStateResolved
            if state.Alert != nil {
                state.Alert.State = AlertStateResolved
                state.Alert.EndsAt = now
                state.Alert.UpdatedAt = now
                return state.Alert, nil
            }
        } else {
            // 更新现有告警
            if state.Alert != nil {
                state.Alert.Value = value
                state.Alert.UpdatedAt = now
                return state.Alert, nil
            }
        }

    case AlertStateResolved:
        if conditionMet {
            state.State = AlertStatePending
            state.FiringAt = now
        }
    }

    return nil, nil
}

// evaluateCondition 评估条件
func (re *RuleEngine) evaluateCondition(value float64, condition Condition) bool {
    switch condition.Operator {
    case "gt":
        return value > condition.Threshold
    case "gte":
        return value >= condition.Threshold
    case "lt":
        return value < condition.Threshold
    case "lte":
        return value <= condition.Threshold
    case "eq":
        return value == condition.Threshold
    case "ne":
        return value != condition.Threshold
    default:
        return false
    }
}

// createAlert 创建告警
func (re *RuleEngine) createAlert(rule *Rule, state *AlertStateTracker, now time.Time) *Alert {
    alert := &Alert{
        ID:          generateID(),
        Name:        rule.Name,
        Description: rule.Description,
        Labels:      rule.Labels,
        Annotations: rule.Annotations,
        State:       AlertStateFiring,
        Value:       state.LastValue,
        Threshold:   rule.Condition.Threshold,
        StartsAt:    state.FiringAt,
        UpdatedAt:   now,
        Severity:    rule.Severity,
        Source:      "rule_engine",
        RuleID:      rule.ID,
    }

    state.Alert = alert
    return alert
}

查询评估器 #

// Evaluator 查询评估器
type Evaluator struct {
    metricsRegistry *Registry
}

// NewEvaluator 创建评估器
func NewEvaluator(registry *Registry) *Evaluator {
    return &Evaluator{
        metricsRegistry: registry,
    }
}

// Query 执行查询
func (e *Evaluator) Query(query string) (float64, error) {
    // 解析查询语句
    parsed, err := e.parseQuery(query)
    if err != nil {
        return 0, err
    }

    // 获取指标
    metric, exists := e.metricsRegistry.Get(parsed.MetricName, parsed.Labels)
    if !exists {
        return 0, fmt.Errorf("metric not found: %s", parsed.MetricName)
    }

    // 获取值
    switch metric.Type() {
    case CounterType:
        return float64(metric.(*Counter).Get()), nil
    case GaugeType:
        return metric.(*Gauge).Get(), nil
    case HistogramType:
        hist := metric.(*Histogram)
        if parsed.Function == "rate" {
            // 简化的速率计算
            return float64(hist.Count()), nil
        }
        return float64(hist.Count()), nil
    default:
        return 0, fmt.Errorf("unsupported metric type: %v", metric.Type())
    }
}

// QueryResult 查询结果
type QueryResult struct {
    MetricName string
    Labels     map[string]string
    Function   string
    Aggregation string
}

// parseQuery 解析查询语句
func (e *Evaluator) parseQuery(query string) (*QueryResult, error) {
    // 简化的查询解析器
    // 支持格式: metric_name{label1="value1",label2="value2"}

    result := &QueryResult{
        Labels: make(map[string]string),
    }

    // 查找标签部分
    labelStart := strings.Index(query, "{")
    if labelStart == -1 {
        result.MetricName = strings.TrimSpace(query)
        return result, nil
    }

    result.MetricName = strings.TrimSpace(query[:labelStart])

    labelEnd := strings.Index(query, "}")
    if labelEnd == -1 {
        return nil, fmt.Errorf("invalid query format: missing closing brace")
    }

    labelStr := query[labelStart+1 : labelEnd]
    if labelStr != "" {
        // 解析标签
        labels := strings.Split(labelStr, ",")
        for _, label := range labels {
            parts := strings.Split(label, "=")
            if len(parts) != 2 {
                continue
            }
            key := strings.TrimSpace(parts[0])
            value := strings.Trim(strings.TrimSpace(parts[1]), `"`)
            result.Labels[key] = value
        }
    }

    return result, nil
}

告警管理器 #

告警管理 #

// AlertManager 告警管理器
type AlertManager struct {
    alerts      map[string]*Alert
    rules       *RuleEngine
    notifiers   []Notifier
    silences    map[string]*Silence
    inhibitors  []Inhibitor
    storage     AlertStorage
    mutex       sync.RWMutex
}

// NewAlertManager 创建告警管理器
func NewAlertManager(rules *RuleEngine, storage AlertStorage) *AlertManager {
    return &AlertManager{
        alerts:     make(map[string]*Alert),
        rules:      rules,
        notifiers:  make([]Notifier, 0),
        silences:   make(map[string]*Silence),
        inhibitors: make([]Inhibitor, 0),
        storage:    storage,
    }
}

// AddNotifier 添加通知器
func (am *AlertManager) AddNotifier(notifier Notifier) {
    am.mutex.Lock()
    defer am.mutex.Unlock()
    am.notifiers = append(am.notifiers, notifier)
}

// AddInhibitor 添加抑制器
func (am *AlertManager) AddInhibitor(inhibitor Inhibitor) {
    am.mutex.Lock()
    defer am.mutex.Unlock()
    am.inhibitors = append(am.inhibitors, inhibitor)
}

// ProcessAlerts 处理告警
func (am *AlertManager) ProcessAlerts(alerts []*Alert) error {
    am.mutex.Lock()
    defer am.mutex.Unlock()

    for _, alert := range alerts {
        if err := am.processAlert(alert); err != nil {
            fmt.Printf("Error processing alert %s: %v\n", alert.ID, err)
        }
    }

    return nil
}

// processAlert 处理单个告警
func (am *AlertManager) processAlert(alert *Alert) error {
    // 检查是否被静默
    if am.isSilenced(alert) {
        alert.State = AlertStateSilenced
        return nil
    }

    // 检查是否被抑制
    if am.isInhibited(alert) {
        return nil
    }

    // 存储告警
    if err := am.storage.Store(alert); err != nil {
        return err
    }

    // 更新内存中的告警
    existingAlert, exists := am.alerts[alert.ID]
    if exists {
        // 检查状态变化
        if existingAlert.State != alert.State {
            am.sendNotification(alert, "state_change")
        }
    } else {
        // 新告警
        if alert.State == AlertStateFiring {
            am.sendNotification(alert, "firing")
        }
    }

    am.alerts[alert.ID] = alert

    // 如果告警已解决,发送解决通知
    if alert.State == AlertStateResolved {
        am.sendNotification(alert, "resolved")
        // 从内存中移除已解决的告警
        delete(am.alerts, alert.ID)
    }

    return nil
}

// isSilenced 检查告警是否被静默
func (am *AlertManager) isSilenced(alert *Alert) bool {
    for _, silence := range am.silences {
        if silence.IsActive() && silence.Matches(alert) {
            return true
        }
    }
    return false
}

// isInhibited 检查告警是否被抑制
func (am *AlertManager) isInhibited(alert *Alert) bool {
    for _, inhibitor := range am.inhibitors {
        if inhibitor.ShouldInhibit(alert, am.getActiveAlerts()) {
            return true
        }
    }
    return false
}

// getActiveAlerts 获取活跃告警
func (am *AlertManager) getActiveAlerts() []*Alert {
    var active []*Alert
    for _, alert := range am.alerts {
        if alert.IsActive() {
            active = append(active, alert)
        }
    }
    return active
}

// sendNotification 发送通知
func (am *AlertManager) sendNotification(alert *Alert, eventType string) {
    notification := &Notification{
        Alert:     alert,
        EventType: eventType,
        Timestamp: time.Now(),
    }

    for _, notifier := range am.notifiers {
        go func(n Notifier) {
            if err := n.Send(notification); err != nil {
                fmt.Printf("Error sending notification via %s: %v\n",
                    n.Name(), err)
            }
        }(notifier)
    }
}

静默管理 #

// Silence 静默规则
type Silence struct {
    ID        string            `json:"id"`
    Matchers  []Matcher         `json:"matchers"`
    StartsAt  time.Time         `json:"starts_at"`
    EndsAt    time.Time         `json:"ends_at"`
    CreatedBy string            `json:"created_by"`
    Comment   string            `json:"comment"`
    CreatedAt time.Time         `json:"created_at"`
}

// Matcher 匹配器
type Matcher struct {
    Name    string `json:"name"`
    Value   string `json:"value"`
    IsRegex bool   `json:"is_regex"`
    IsEqual bool   `json:"is_equal"`
}

// IsActive 检查静默是否活跃
func (s *Silence) IsActive() bool {
    now := time.Now()
    return now.After(s.StartsAt) && now.Before(s.EndsAt)
}

// Matches 检查告警是否匹配静默规则
func (s *Silence) Matches(alert *Alert) bool {
    for _, matcher := range s.Matchers {
        if !s.matchLabel(alert, matcher) {
            return false
        }
    }
    return true
}

// matchLabel 匹配标签
func (s *Silence) matchLabel(alert *Alert, matcher Matcher) bool {
    value, exists := alert.Labels[matcher.Name]
    if !exists {
        return !matcher.IsEqual
    }

    if matcher.IsRegex {
        regex, err := regexp.Compile(matcher.Value)
        if err != nil {
            return false
        }
        matched := regex.MatchString(value)
        return matched == matcher.IsEqual
    }

    matched := value == matcher.Value
    return matched == matcher.IsEqual
}

// AddSilence 添加静默
func (am *AlertManager) AddSilence(silence *Silence) error {
    am.mutex.Lock()
    defer am.mutex.Unlock()

    if silence.ID == "" {
        silence.ID = generateID()
    }
    silence.CreatedAt = time.Now()

    am.silences[silence.ID] = silence
    return nil
}

// RemoveSilence 移除静默
func (am *AlertManager) RemoveSilence(silenceID string) {
    am.mutex.Lock()
    defer am.mutex.Unlock()
    delete(am.silences, silenceID)
}

// ListSilences 列出静默
func (am *AlertManager) ListSilences() []*Silence {
    am.mutex.RLock()
    defer am.mutex.RUnlock()

    silences := make([]*Silence, 0, len(am.silences))
    for _, silence := range am.silences {
        silences = append(silences, silence)
    }
    return silences
}

通知系统 #

通知器接口 #

// Notifier 通知器接口
type Notifier interface {
    Name() string
    Send(notification *Notification) error
    IsEnabled() bool
    SetEnabled(enabled bool)
}

// Notification 通知消息
type Notification struct {
    Alert     *Alert    `json:"alert"`
    EventType string    `json:"event_type"` // firing, resolved, state_change
    Timestamp time.Time `json:"timestamp"`
}

// BaseNotifier 基础通知器
type BaseNotifier struct {
    name    string
    enabled bool
    mutex   sync.RWMutex
}

// NewBaseNotifier 创建基础通知器
func NewBaseNotifier(name string) *BaseNotifier {
    return &BaseNotifier{
        name:    name,
        enabled: true,
    }
}

// Name 返回通知器名称
func (bn *BaseNotifier) Name() string {
    return bn.name
}

// IsEnabled 检查是否启用
func (bn *BaseNotifier) IsEnabled() bool {
    bn.mutex.RLock()
    defer bn.mutex.RUnlock()
    return bn.enabled
}

// SetEnabled 设置启用状态
func (bn *BaseNotifier) SetEnabled(enabled bool) {
    bn.mutex.Lock()
    defer bn.mutex.Unlock()
    bn.enabled = enabled
}

邮件通知器 #

import (
    "bytes"
    "fmt"
    "net/smtp"
    "text/template"
)

// EmailNotifier 邮件通知器
type EmailNotifier struct {
    *BaseNotifier
    smtpHost     string
    smtpPort     int
    username     string
    password     string
    from         string
    to           []string
    template     *template.Template
}

// EmailConfig 邮件配置
type EmailConfig struct {
    SMTPHost string   `json:"smtp_host"`
    SMTPPort int      `json:"smtp_port"`
    Username string   `json:"username"`
    Password string   `json:"password"`
    From     string   `json:"from"`
    To       []string `json:"to"`
}

// NewEmailNotifier 创建邮件通知器
func NewEmailNotifier(config *EmailConfig) (*EmailNotifier, error) {
    tmpl, err := template.New("email").Parse(emailTemplate)
    if err != nil {
        return nil, err
    }

    return &EmailNotifier{
        BaseNotifier: NewBaseNotifier("email"),
        smtpHost:     config.SMTPHost,
        smtpPort:     config.SMTPPort,
        username:     config.Username,
        password:     config.Password,
        from:         config.From,
        to:           config.To,
        template:     tmpl,
    }, nil
}

// Send 发送邮件通知
func (en *EmailNotifier) Send(notification *Notification) error {
    if !en.IsEnabled() {
        return nil
    }

    // 渲染邮件内容
    var body bytes.Buffer
    if err := en.template.Execute(&body, notification); err != nil {
        return err
    }

    // 构建邮件
    subject := fmt.Sprintf("[%s] %s - %s",
        notification.Alert.Severity.String(),
        notification.EventType,
        notification.Alert.Name)

    msg := fmt.Sprintf("To: %s\r\n"+
        "Subject: %s\r\n"+
        "Content-Type: text/html; charset=UTF-8\r\n"+
        "\r\n"+
        "%s\r\n",
        strings.Join(en.to, ","), subject, body.String())

    // 发送邮件
    auth := smtp.PlainAuth("", en.username, en.password, en.smtpHost)
    addr := fmt.Sprintf("%s:%d", en.smtpHost, en.smtpPort)

    return smtp.SendMail(addr, auth, en.from, en.to, []byte(msg))
}

const emailTemplate = `
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Alert Notification</title>
    <style>
        body { font-family: Arial, sans-serif; }
        .alert { padding: 20px; margin: 10px 0; border-radius: 5px; }
        .critical { background-color: #ffebee; border-left: 5px solid #f44336; }
        .error { background-color: #fff3e0; border-left: 5px solid #ff9800; }
        .warning { background-color: #fffde7; border-left: 5px solid #ffeb3b; }
        .info { background-color: #e3f2fd; border-left: 5px solid #2196f3; }
        .resolved { background-color: #e8f5e8; border-left: 5px solid #4caf50; }
        .labels { margin: 10px 0; }
        .label { display: inline-block; background: #f5f5f5; padding: 2px 8px; margin: 2px; border-radius: 3px; }
    </style>
</head>
<body>
    <div class="alert {{.Alert.Severity.String}}">
        <h2>{{.Alert.Name}}</h2>
        <p><strong>Status:</strong> {{.EventType}}</p>
        <p><strong>Severity:</strong> {{.Alert.Severity.String}}</p>
        <p><strong>Description:</strong> {{.Alert.Description}}</p>
        <p><strong>Value:</strong> {{.Alert.Value}}</p>
        <p><strong>Threshold:</strong> {{.Alert.Threshold}}</p>
        <p><strong>Started At:</strong> {{.Alert.StartsAt.Format "2006-01-02 15:04:05"}}</p>
        {{if not .Alert.EndsAt.IsZero}}
        <p><strong>Ended At:</strong> {{.Alert.EndsAt.Format "2006-01-02 15:04:05"}}</p>
        {{end}}

        {{if .Alert.Labels}}
        <div class="labels">
            <strong>Labels:</strong><br>
            {{range $key, $value := .Alert.Labels}}
            <span class="label">{{$key}}: {{$value}}</span>
            {{end}}
        </div>
        {{end}}

        {{if .Alert.Annotations}}
        <div class="annotations">
            <strong>Annotations:</strong><br>
            {{range $key, $value := .Alert.Annotations}}
            <p><strong>{{$key}}:</strong> {{$value}}</p>
            {{end}}
        </div>
        {{end}}
    </div>
</body>
</html>
`

Webhook 通知器 #

import (
    "bytes"
    "encoding/json"
    "net/http"
    "time"
)

// WebhookNotifier Webhook 通知器
type WebhookNotifier struct {
    *BaseNotifier
    url     string
    timeout time.Duration
    headers map[string]string
    client  *http.Client
}

// WebhookConfig Webhook 配置
type WebhookConfig struct {
    URL     string            `json:"url"`
    Timeout time.Duration     `json:"timeout"`
    Headers map[string]string `json:"headers"`
}

// NewWebhookNotifier 创建 Webhook 通知器
func NewWebhookNotifier(config *WebhookConfig) *WebhookNotifier {
    if config.Timeout == 0 {
        config.Timeout = 10 * time.Second
    }

    return &WebhookNotifier{
        BaseNotifier: NewBaseNotifier("webhook"),
        url:          config.URL,
        timeout:      config.Timeout,
        headers:      config.Headers,
        client: &http.Client{
            Timeout: config.Timeout,
        },
    }
}

// Send 发送 Webhook 通知
func (wn *WebhookNotifier) Send(notification *Notification) error {
    if !wn.IsEnabled() {
        return nil
    }

    // 序列化通知
    data, err := json.Marshal(notification)
    if err != nil {
        return err
    }

    // 创建请求
    req, err := http.NewRequest("POST", wn.url, bytes.NewBuffer(data))
    if err != nil {
        return err
    }

    // 设置头部
    req.Header.Set("Content-Type", "application/json")
    for key, value := range wn.headers {
        req.Header.Set(key, value)
    }

    // 发送请求
    resp, err := wn.client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    if resp.StatusCode >= 400 {
        return fmt.Errorf("webhook returned status %d", resp.StatusCode)
    }

    return nil
}

Slack 通知器 #

// SlackNotifier Slack 通知器
type SlackNotifier struct {
    *BaseNotifier
    webhookURL string
    channel    string
    username   string
    client     *http.Client
}

// SlackConfig Slack 配置
type SlackConfig struct {
    WebhookURL string `json:"webhook_url"`
    Channel    string `json:"channel"`
    Username   string `json:"username"`
}

// SlackMessage Slack 消息
type SlackMessage struct {
    Channel     string            `json:"channel,omitempty"`
    Username    string            `json:"username,omitempty"`
    Text        string            `json:"text"`
    Attachments []SlackAttachment `json:"attachments,omitempty"`
}

// SlackAttachment Slack 附件
type SlackAttachment struct {
    Color     string       `json:"color"`
    Title     string       `json:"title"`
    Text      string       `json:"text"`
    Fields    []SlackField `json:"fields"`
    Timestamp int64        `json:"ts"`
}

// SlackField Slack 字段
type SlackField struct {
    Title string `json:"title"`
    Value string `json:"value"`
    Short bool   `json:"short"`
}

// NewSlackNotifier 创建 Slack 通知器
func NewSlackNotifier(config *SlackConfig) *SlackNotifier {
    return &SlackNotifier{
        BaseNotifier: NewBaseNotifier("slack"),
        webhookURL:   config.WebhookURL,
        channel:      config.Channel,
        username:     config.Username,
        client: &http.Client{
            Timeout: 10 * time.Second,
        },
    }
}

// Send 发送 Slack 通知
func (sn *SlackNotifier) Send(notification *Notification) error {
    if !sn.IsEnabled() {
        return nil
    }

    alert := notification.Alert

    // 确定颜色
    color := sn.getColor(alert.Severity, notification.EventType)

    // 构建字段
    fields := []SlackField{
        {Title: "Severity", Value: alert.Severity.String(), Short: true},
        {Title: "Status", Value: notification.EventType, Short: true},
        {Title: "Value", Value: fmt.Sprintf("%.2f", alert.Value), Short: true},
        {Title: "Threshold", Value: fmt.Sprintf("%.2f", alert.Threshold), Short: true},
    }

    // 添加标签
    if len(alert.Labels) > 0 {
        var labels []string
        for k, v := range alert.Labels {
            labels = append(labels, fmt.Sprintf("%s: %s", k, v))
        }
        fields = append(fields, SlackField{
            Title: "Labels",
            Value: strings.Join(labels, "\n"),
            Short: false,
        })
    }

    // 构建消息
    message := &SlackMessage{
        Channel:  sn.channel,
        Username: sn.username,
        Text:     fmt.Sprintf("Alert: %s", alert.Name),
        Attachments: []SlackAttachment{
            {
                Color:     color,
                Title:     alert.Name,
                Text:      alert.Description,
                Fields:    fields,
                Timestamp: alert.StartsAt.Unix(),
            },
        },
    }

    // 发送消息
    data, err := json.Marshal(message)
    if err != nil {
        return err
    }

    resp, err := sn.client.Post(sn.webhookURL, "application/json", bytes.NewBuffer(data))
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    if resp.StatusCode >= 400 {
        return fmt.Errorf("slack returned status %d", resp.StatusCode)
    }

    return nil
}

// getColor 获取颜色
func (sn *SlackNotifier) getColor(severity Severity, eventType string) string {
    if eventType == "resolved" {
        return "good"
    }

    switch severity {
    case SeverityCritical:
        return "danger"
    case SeverityError:
        return "warning"
    case SeverityWarning:
        return "#ffeb3b"
    case SeverityInfo:
        return "#2196f3"
    default:
        return "#cccccc"
    }
}

告警存储 #

存储接口 #

// AlertStorage 告警存储接口
type AlertStorage interface {
    Store(alert *Alert) error
    Get(id string) (*Alert, error)
    List(filters map[string]string, limit, offset int) ([]*Alert, error)
    Delete(id string) error
    Count(filters map[string]string) (int, error)
}

// MemoryAlertStorage 内存告警存储
type MemoryAlertStorage struct {
    alerts map[string]*Alert
    mutex  sync.RWMutex
}

// NewMemoryAlertStorage 创建内存存储
func NewMemoryAlertStorage() *MemoryAlertStorage {
    return &MemoryAlertStorage{
        alerts: make(map[string]*Alert),
    }
}

// Store 存储告警
func (mas *MemoryAlertStorage) Store(alert *Alert) error {
    mas.mutex.Lock()
    defer mas.mutex.Unlock()

    mas.alerts[alert.ID] = alert
    return nil
}

// Get 获取告警
func (mas *MemoryAlertStorage) Get(id string) (*Alert, error) {
    mas.mutex.RLock()
    defer mas.mutex.RUnlock()

    alert, exists := mas.alerts[id]
    if !exists {
        return nil, fmt.Errorf("alert not found: %s", id)
    }

    return alert, nil
}

// List 列出告警
func (mas *MemoryAlertStorage) List(filters map[string]string, limit, offset int) ([]*Alert, error) {
    mas.mutex.RLock()
    defer mas.mutex.RUnlock()

    var alerts []*Alert
    for _, alert := range mas.alerts {
        if mas.matchesFilters(alert, filters) {
            alerts = append(alerts, alert)
        }
    }

    // 排序
    sort.Slice(alerts, func(i, j int) bool {
        return alerts[i].StartsAt.After(alerts[j].StartsAt)
    })

    // 分页
    start := offset
    if start > len(alerts) {
        start = len(alerts)
    }

    end := start + limit
    if end > len(alerts) {
        end = len(alerts)
    }

    return alerts[start:end], nil
}

// Delete 删除告警
func (mas *MemoryAlertStorage) Delete(id string) error {
    mas.mutex.Lock()
    defer mas.mutex.Unlock()

    delete(mas.alerts, id)
    return nil
}

// Count 计数告警
func (mas *MemoryAlertStorage) Count(filters map[string]string) (int, error) {
    mas.mutex.RLock()
    defer mas.mutex.RUnlock()

    count := 0
    for _, alert := range mas.alerts {
        if mas.matchesFilters(alert, filters) {
            count++
        }
    }

    return count, nil
}

// matchesFilters 检查是否匹配过滤器
func (mas *MemoryAlertStorage) matchesFilters(alert *Alert, filters map[string]string) bool {
    for key, value := range filters {
        switch key {
        case "state":
            if alert.State.String() != value {
                return false
            }
        case "severity":
            if alert.Severity.String() != value {
                return false
            }
        case "name":
            if !strings.Contains(alert.Name, value) {
                return false
            }
        default:
            // 检查标签
            if labelValue, exists := alert.Labels[key]; !exists || labelValue != value {
                return false
            }
        }
    }
    return true
}

使用示例 #

完整示例 #

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    // 创建指标注册表
    registry := NewRegistry()

    // 创建评估器
    evaluator := NewEvaluator(registry)

    // 创建规则引擎
    ruleEngine := NewRuleEngine(evaluator)

    // 创建存储
    storage := NewMemoryAlertStorage()

    // 创建告警管理器
    alertManager := NewAlertManager(ruleEngine, storage)

    // 配置通知器
    emailConfig := &EmailConfig{
        SMTPHost: "smtp.gmail.com",
        SMTPPort: 587,
        Username: "[email protected]",
        Password: "your-password",
        From:     "[email protected]",
        To:       []string{"[email protected]"},
    }
    emailNotifier, _ := NewEmailNotifier(emailConfig)
    alertManager.AddNotifier(emailNotifier)

    slackConfig := &SlackConfig{
        WebhookURL: "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
        Channel:    "#alerts",
        Username:   "AlertBot",
    }
    slackNotifier := NewSlackNotifier(slackConfig)
    alertManager.AddNotifier(slackNotifier)

    // 添加告警规则
    cpuRule := &Rule{
        Name:        "High CPU Usage",
        Description: "CPU usage is above 80%",
        Query:       "system_cpu_usage_percent",
        Condition: Condition{
            Operator:  "gt",
            Threshold: 80.0,
        },
        Duration: 5 * time.Minute,
        Labels: map[string]string{
            "team":     "infrastructure",
            "severity": "warning",
        },
        Annotations: map[string]string{
            "summary":     "High CPU usage detected",
            "description": "CPU usage has been above 80% for more than 5 minutes",
        },
        Severity: SeverityWarning,
        Enabled:  true,
    }
    ruleEngine.AddRule(cpuRule)

    memoryRule := &Rule{
        Name:        "High Memory Usage",
        Description: "Memory usage is above 90%",
        Query:       "system_memory_usage_percent",
        Condition: Condition{
            Operator:  "gt",
            Threshold: 90.0,
        },
        Duration: 2 * time.Minute,
        Labels: map[string]string{
            "team":     "infrastructure",
            "severity": "critical",
        },
        Annotations: map[string]string{
            "summary":     "Critical memory usage",
            "description": "Memory usage has been above 90% for more than 2 minutes",
        },
        Severity: SeverityCritical,
        Enabled:  true,
    }
    ruleEngine.AddRule(memoryRule)

    // 创建一些测试指标
    cpuGauge := RegisterGauge("system_cpu_usage_percent", nil)
    memoryGauge := RegisterGauge("system_memory_usage_percent", nil)

    // 启动告警处理循环
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        ticker := time.NewTicker(30 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
                // 评估规则
                alerts, err := ruleEngine.Evaluate()
                if err != nil {
                    fmt.Printf("Error evaluating rules: %v\n", err)
                    continue
                }

                // 处理告警
                if len(alerts) > 0 {
                    fmt.Printf("Processing %d alerts\n", len(alerts))
                    alertManager.ProcessAlerts(alerts)
                }
            }
        }
    }()

    // 模拟指标变化
    go func() {
        ticker := time.NewTicker(10 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
                // 模拟 CPU 使用率变化
                cpuUsage := 70.0 + float64(time.Now().Unix()%30)
                cpuGauge.Set(cpuUsage)

                // 模拟内存使用率变化
                memoryUsage := 85.0 + float64(time.Now().Unix()%20)
                memoryGauge.Set(memoryUsage)

                fmt.Printf("CPU: %.1f%%, Memory: %.1f%%\n", cpuUsage, memoryUsage)
            }
        }
    }()

    // 运行一段时间
    time.Sleep(10 * time.Minute)
}

小结 #

本节详细介绍了告警与通知系统的设计和实现,包括:

  1. 告警模型:定义了完整的告警状态和数据结构
  2. 规则引擎:实现了灵活的告警规则定义和评估机制
  3. 告警管理:提供了告警的生命周期管理和处理流程
  4. 通知系统:支持多种通知方式,包括邮件、Webhook 和 Slack
  5. 静默管理:实现了告警的静默和抑制功能
  6. 存储系统:提供了告警历史的存储和查询功能

通过这个告警系统,我们可以实现全面的监控告警功能,确保系统异常能够及时发现和处理,提高系统的可靠性和稳定性。