4.6.4 告警与通知 #
告警与通知系统是监控体系的重要组成部分,它能够在系统出现异常时及时通知相关人员,确保问题能够得到快速响应和处理。本节将详细介绍如何设计和实现一个完整的告警与通知系统。
告警系统架构 #
系统组件 #
一个完整的告警系统通常包含以下核心组件:
- 规则引擎(Rule Engine):定义和评估告警规则
- 告警管理器(Alert Manager):管理告警的生命周期
- 通知器(Notifier):负责发送告警通知
- 抑制器(Silencer):控制告警的抑制和静默
- 聚合器(Aggregator):聚合相似的告警
- 存储器(Storage):存储告警历史和配置
告警状态模型 #
package alerting
import (
"time"
)
// AlertState 告警状态
type AlertState int
const (
AlertStateInactive AlertState = iota // 未激活
AlertStatePending // 待定
AlertStateFiring // 触发中
AlertStateResolved // 已解决
AlertStateSilenced // 已静默
)
// String 返回状态字符串
func (s AlertState) String() string {
switch s {
case AlertStateInactive:
return "inactive"
case AlertStatePending:
return "pending"
case AlertStateFiring:
return "firing"
case AlertStateResolved:
return "resolved"
case AlertStateSilenced:
return "silenced"
default:
return "unknown"
}
}
// Alert 告警结构
type Alert struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
State AlertState `json:"state"`
Value float64 `json:"value"`
Threshold float64 `json:"threshold"`
StartsAt time.Time `json:"starts_at"`
EndsAt time.Time `json:"ends_at,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
Severity Severity `json:"severity"`
Source string `json:"source"`
RuleID string `json:"rule_id"`
}
// Severity 告警严重程度
type Severity int
const (
SeverityInfo Severity = iota
SeverityWarning
SeverityError
SeverityCritical
)
// String 返回严重程度字符串
func (s Severity) String() string {
switch s {
case SeverityInfo:
return "info"
case SeverityWarning:
return "warning"
case SeverityError:
return "error"
case SeverityCritical:
return "critical"
default:
return "unknown"
}
}
// IsActive 检查告警是否活跃
func (a *Alert) IsActive() bool {
return a.State == AlertStateFiring || a.State == AlertStatePending
}
// Duration 获取告警持续时间
func (a *Alert) Duration() time.Duration {
if a.EndsAt.IsZero() {
return time.Since(a.StartsAt)
}
return a.EndsAt.Sub(a.StartsAt)
}
告警规则引擎 #
规则定义 #
import (
"fmt"
"regexp"
"strconv"
"strings"
"time"
)
// Rule 告警规则
type Rule struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Query string `json:"query"`
Condition Condition `json:"condition"`
Duration time.Duration `json:"duration"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
Severity Severity `json:"severity"`
Enabled bool `json:"enabled"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// Condition 告警条件
type Condition struct {
Operator string `json:"operator"` // gt, lt, eq, ne, gte, lte
Threshold float64 `json:"threshold"`
}
// RuleEngine 规则引擎
type RuleEngine struct {
rules map[string]*Rule
evaluator *Evaluator
alertStates map[string]*AlertState
mutex sync.RWMutex
}
// AlertState 告警状态跟踪
type AlertStateTracker struct {
Rule *Rule
LastValue float64
LastEval time.Time
FiringAt time.Time
State AlertState
Alert *Alert
}
// NewRuleEngine 创建规则引擎
func NewRuleEngine(evaluator *Evaluator) *RuleEngine {
return &RuleEngine{
rules: make(map[string]*Rule),
evaluator: evaluator,
alertStates: make(map[string]*AlertStateTracker),
}
}
// AddRule 添加规则
func (re *RuleEngine) AddRule(rule *Rule) error {
re.mutex.Lock()
defer re.mutex.Unlock()
if rule.ID == "" {
rule.ID = generateID()
}
rule.CreatedAt = time.Now()
rule.UpdatedAt = time.Now()
re.rules[rule.ID] = rule
re.alertStates[rule.ID] = &AlertStateTracker{
Rule: rule,
State: AlertStateInactive,
}
return nil
}
// RemoveRule 移除规则
func (re *RuleEngine) RemoveRule(ruleID string) {
re.mutex.Lock()
defer re.mutex.Unlock()
delete(re.rules, ruleID)
delete(re.alertStates, ruleID)
}
// GetRule 获取规则
func (re *RuleEngine) GetRule(ruleID string) (*Rule, bool) {
re.mutex.RLock()
defer re.mutex.RUnlock()
rule, exists := re.rules[ruleID]
return rule, exists
}
// ListRules 列出所有规则
func (re *RuleEngine) ListRules() []*Rule {
re.mutex.RLock()
defer re.mutex.RUnlock()
rules := make([]*Rule, 0, len(re.rules))
for _, rule := range re.rules {
rules = append(rules, rule)
}
return rules
}
// Evaluate 评估所有规则
func (re *RuleEngine) Evaluate() ([]*Alert, error) {
re.mutex.Lock()
defer re.mutex.Unlock()
var alerts []*Alert
now := time.Now()
for ruleID, rule := range re.rules {
if !rule.Enabled {
continue
}
alert, err := re.evaluateRule(rule, now)
if err != nil {
fmt.Printf("Error evaluating rule %s: %v\n", rule.Name, err)
continue
}
if alert != nil {
alerts = append(alerts, alert)
}
}
return alerts, nil
}
// evaluateRule 评估单个规则
func (re *RuleEngine) evaluateRule(rule *Rule, now time.Time) (*Alert, error) {
// 获取指标值
value, err := re.evaluator.Query(rule.Query)
if err != nil {
return nil, err
}
state := re.alertStates[rule.ID]
state.LastValue = value
state.LastEval = now
// 评估条件
conditionMet := re.evaluateCondition(value, rule.Condition)
switch state.State {
case AlertStateInactive:
if conditionMet {
state.State = AlertStatePending
state.FiringAt = now
}
case AlertStatePending:
if conditionMet {
// 检查是否超过持续时间
if now.Sub(state.FiringAt) >= rule.Duration {
state.State = AlertStateFiring
return re.createAlert(rule, state, now), nil
}
} else {
state.State = AlertStateInactive
state.FiringAt = time.Time{}
}
case AlertStateFiring:
if !conditionMet {
state.State = AlertStateResolved
if state.Alert != nil {
state.Alert.State = AlertStateResolved
state.Alert.EndsAt = now
state.Alert.UpdatedAt = now
return state.Alert, nil
}
} else {
// 更新现有告警
if state.Alert != nil {
state.Alert.Value = value
state.Alert.UpdatedAt = now
return state.Alert, nil
}
}
case AlertStateResolved:
if conditionMet {
state.State = AlertStatePending
state.FiringAt = now
}
}
return nil, nil
}
// evaluateCondition 评估条件
func (re *RuleEngine) evaluateCondition(value float64, condition Condition) bool {
switch condition.Operator {
case "gt":
return value > condition.Threshold
case "gte":
return value >= condition.Threshold
case "lt":
return value < condition.Threshold
case "lte":
return value <= condition.Threshold
case "eq":
return value == condition.Threshold
case "ne":
return value != condition.Threshold
default:
return false
}
}
// createAlert 创建告警
func (re *RuleEngine) createAlert(rule *Rule, state *AlertStateTracker, now time.Time) *Alert {
alert := &Alert{
ID: generateID(),
Name: rule.Name,
Description: rule.Description,
Labels: rule.Labels,
Annotations: rule.Annotations,
State: AlertStateFiring,
Value: state.LastValue,
Threshold: rule.Condition.Threshold,
StartsAt: state.FiringAt,
UpdatedAt: now,
Severity: rule.Severity,
Source: "rule_engine",
RuleID: rule.ID,
}
state.Alert = alert
return alert
}
查询评估器 #
// Evaluator 查询评估器
type Evaluator struct {
metricsRegistry *Registry
}
// NewEvaluator 创建评估器
func NewEvaluator(registry *Registry) *Evaluator {
return &Evaluator{
metricsRegistry: registry,
}
}
// Query 执行查询
func (e *Evaluator) Query(query string) (float64, error) {
// 解析查询语句
parsed, err := e.parseQuery(query)
if err != nil {
return 0, err
}
// 获取指标
metric, exists := e.metricsRegistry.Get(parsed.MetricName, parsed.Labels)
if !exists {
return 0, fmt.Errorf("metric not found: %s", parsed.MetricName)
}
// 获取值
switch metric.Type() {
case CounterType:
return float64(metric.(*Counter).Get()), nil
case GaugeType:
return metric.(*Gauge).Get(), nil
case HistogramType:
hist := metric.(*Histogram)
if parsed.Function == "rate" {
// 简化的速率计算
return float64(hist.Count()), nil
}
return float64(hist.Count()), nil
default:
return 0, fmt.Errorf("unsupported metric type: %v", metric.Type())
}
}
// QueryResult 查询结果
type QueryResult struct {
MetricName string
Labels map[string]string
Function string
Aggregation string
}
// parseQuery 解析查询语句
func (e *Evaluator) parseQuery(query string) (*QueryResult, error) {
// 简化的查询解析器
// 支持格式: metric_name{label1="value1",label2="value2"}
result := &QueryResult{
Labels: make(map[string]string),
}
// 查找标签部分
labelStart := strings.Index(query, "{")
if labelStart == -1 {
result.MetricName = strings.TrimSpace(query)
return result, nil
}
result.MetricName = strings.TrimSpace(query[:labelStart])
labelEnd := strings.Index(query, "}")
if labelEnd == -1 {
return nil, fmt.Errorf("invalid query format: missing closing brace")
}
labelStr := query[labelStart+1 : labelEnd]
if labelStr != "" {
// 解析标签
labels := strings.Split(labelStr, ",")
for _, label := range labels {
parts := strings.Split(label, "=")
if len(parts) != 2 {
continue
}
key := strings.TrimSpace(parts[0])
value := strings.Trim(strings.TrimSpace(parts[1]), `"`)
result.Labels[key] = value
}
}
return result, nil
}
告警管理器 #
告警管理 #
// AlertManager 告警管理器
type AlertManager struct {
alerts map[string]*Alert
rules *RuleEngine
notifiers []Notifier
silences map[string]*Silence
inhibitors []Inhibitor
storage AlertStorage
mutex sync.RWMutex
}
// NewAlertManager 创建告警管理器
func NewAlertManager(rules *RuleEngine, storage AlertStorage) *AlertManager {
return &AlertManager{
alerts: make(map[string]*Alert),
rules: rules,
notifiers: make([]Notifier, 0),
silences: make(map[string]*Silence),
inhibitors: make([]Inhibitor, 0),
storage: storage,
}
}
// AddNotifier 添加通知器
func (am *AlertManager) AddNotifier(notifier Notifier) {
am.mutex.Lock()
defer am.mutex.Unlock()
am.notifiers = append(am.notifiers, notifier)
}
// AddInhibitor 添加抑制器
func (am *AlertManager) AddInhibitor(inhibitor Inhibitor) {
am.mutex.Lock()
defer am.mutex.Unlock()
am.inhibitors = append(am.inhibitors, inhibitor)
}
// ProcessAlerts 处理告警
func (am *AlertManager) ProcessAlerts(alerts []*Alert) error {
am.mutex.Lock()
defer am.mutex.Unlock()
for _, alert := range alerts {
if err := am.processAlert(alert); err != nil {
fmt.Printf("Error processing alert %s: %v\n", alert.ID, err)
}
}
return nil
}
// processAlert 处理单个告警
func (am *AlertManager) processAlert(alert *Alert) error {
// 检查是否被静默
if am.isSilenced(alert) {
alert.State = AlertStateSilenced
return nil
}
// 检查是否被抑制
if am.isInhibited(alert) {
return nil
}
// 存储告警
if err := am.storage.Store(alert); err != nil {
return err
}
// 更新内存中的告警
existingAlert, exists := am.alerts[alert.ID]
if exists {
// 检查状态变化
if existingAlert.State != alert.State {
am.sendNotification(alert, "state_change")
}
} else {
// 新告警
if alert.State == AlertStateFiring {
am.sendNotification(alert, "firing")
}
}
am.alerts[alert.ID] = alert
// 如果告警已解决,发送解决通知
if alert.State == AlertStateResolved {
am.sendNotification(alert, "resolved")
// 从内存中移除已解决的告警
delete(am.alerts, alert.ID)
}
return nil
}
// isSilenced 检查告警是否被静默
func (am *AlertManager) isSilenced(alert *Alert) bool {
for _, silence := range am.silences {
if silence.IsActive() && silence.Matches(alert) {
return true
}
}
return false
}
// isInhibited 检查告警是否被抑制
func (am *AlertManager) isInhibited(alert *Alert) bool {
for _, inhibitor := range am.inhibitors {
if inhibitor.ShouldInhibit(alert, am.getActiveAlerts()) {
return true
}
}
return false
}
// getActiveAlerts 获取活跃告警
func (am *AlertManager) getActiveAlerts() []*Alert {
var active []*Alert
for _, alert := range am.alerts {
if alert.IsActive() {
active = append(active, alert)
}
}
return active
}
// sendNotification 发送通知
func (am *AlertManager) sendNotification(alert *Alert, eventType string) {
notification := &Notification{
Alert: alert,
EventType: eventType,
Timestamp: time.Now(),
}
for _, notifier := range am.notifiers {
go func(n Notifier) {
if err := n.Send(notification); err != nil {
fmt.Printf("Error sending notification via %s: %v\n",
n.Name(), err)
}
}(notifier)
}
}
静默管理 #
// Silence 静默规则
type Silence struct {
ID string `json:"id"`
Matchers []Matcher `json:"matchers"`
StartsAt time.Time `json:"starts_at"`
EndsAt time.Time `json:"ends_at"`
CreatedBy string `json:"created_by"`
Comment string `json:"comment"`
CreatedAt time.Time `json:"created_at"`
}
// Matcher 匹配器
type Matcher struct {
Name string `json:"name"`
Value string `json:"value"`
IsRegex bool `json:"is_regex"`
IsEqual bool `json:"is_equal"`
}
// IsActive 检查静默是否活跃
func (s *Silence) IsActive() bool {
now := time.Now()
return now.After(s.StartsAt) && now.Before(s.EndsAt)
}
// Matches 检查告警是否匹配静默规则
func (s *Silence) Matches(alert *Alert) bool {
for _, matcher := range s.Matchers {
if !s.matchLabel(alert, matcher) {
return false
}
}
return true
}
// matchLabel 匹配标签
func (s *Silence) matchLabel(alert *Alert, matcher Matcher) bool {
value, exists := alert.Labels[matcher.Name]
if !exists {
return !matcher.IsEqual
}
if matcher.IsRegex {
regex, err := regexp.Compile(matcher.Value)
if err != nil {
return false
}
matched := regex.MatchString(value)
return matched == matcher.IsEqual
}
matched := value == matcher.Value
return matched == matcher.IsEqual
}
// AddSilence 添加静默
func (am *AlertManager) AddSilence(silence *Silence) error {
am.mutex.Lock()
defer am.mutex.Unlock()
if silence.ID == "" {
silence.ID = generateID()
}
silence.CreatedAt = time.Now()
am.silences[silence.ID] = silence
return nil
}
// RemoveSilence 移除静默
func (am *AlertManager) RemoveSilence(silenceID string) {
am.mutex.Lock()
defer am.mutex.Unlock()
delete(am.silences, silenceID)
}
// ListSilences 列出静默
func (am *AlertManager) ListSilences() []*Silence {
am.mutex.RLock()
defer am.mutex.RUnlock()
silences := make([]*Silence, 0, len(am.silences))
for _, silence := range am.silences {
silences = append(silences, silence)
}
return silences
}
通知系统 #
通知器接口 #
// Notifier 通知器接口
type Notifier interface {
Name() string
Send(notification *Notification) error
IsEnabled() bool
SetEnabled(enabled bool)
}
// Notification 通知消息
type Notification struct {
Alert *Alert `json:"alert"`
EventType string `json:"event_type"` // firing, resolved, state_change
Timestamp time.Time `json:"timestamp"`
}
// BaseNotifier 基础通知器
type BaseNotifier struct {
name string
enabled bool
mutex sync.RWMutex
}
// NewBaseNotifier 创建基础通知器
func NewBaseNotifier(name string) *BaseNotifier {
return &BaseNotifier{
name: name,
enabled: true,
}
}
// Name 返回通知器名称
func (bn *BaseNotifier) Name() string {
return bn.name
}
// IsEnabled 检查是否启用
func (bn *BaseNotifier) IsEnabled() bool {
bn.mutex.RLock()
defer bn.mutex.RUnlock()
return bn.enabled
}
// SetEnabled 设置启用状态
func (bn *BaseNotifier) SetEnabled(enabled bool) {
bn.mutex.Lock()
defer bn.mutex.Unlock()
bn.enabled = enabled
}
邮件通知器 #
import (
"bytes"
"fmt"
"net/smtp"
"text/template"
)
// EmailNotifier 邮件通知器
type EmailNotifier struct {
*BaseNotifier
smtpHost string
smtpPort int
username string
password string
from string
to []string
template *template.Template
}
// EmailConfig 邮件配置
type EmailConfig struct {
SMTPHost string `json:"smtp_host"`
SMTPPort int `json:"smtp_port"`
Username string `json:"username"`
Password string `json:"password"`
From string `json:"from"`
To []string `json:"to"`
}
// NewEmailNotifier 创建邮件通知器
func NewEmailNotifier(config *EmailConfig) (*EmailNotifier, error) {
tmpl, err := template.New("email").Parse(emailTemplate)
if err != nil {
return nil, err
}
return &EmailNotifier{
BaseNotifier: NewBaseNotifier("email"),
smtpHost: config.SMTPHost,
smtpPort: config.SMTPPort,
username: config.Username,
password: config.Password,
from: config.From,
to: config.To,
template: tmpl,
}, nil
}
// Send 发送邮件通知
func (en *EmailNotifier) Send(notification *Notification) error {
if !en.IsEnabled() {
return nil
}
// 渲染邮件内容
var body bytes.Buffer
if err := en.template.Execute(&body, notification); err != nil {
return err
}
// 构建邮件
subject := fmt.Sprintf("[%s] %s - %s",
notification.Alert.Severity.String(),
notification.EventType,
notification.Alert.Name)
msg := fmt.Sprintf("To: %s\r\n"+
"Subject: %s\r\n"+
"Content-Type: text/html; charset=UTF-8\r\n"+
"\r\n"+
"%s\r\n",
strings.Join(en.to, ","), subject, body.String())
// 发送邮件
auth := smtp.PlainAuth("", en.username, en.password, en.smtpHost)
addr := fmt.Sprintf("%s:%d", en.smtpHost, en.smtpPort)
return smtp.SendMail(addr, auth, en.from, en.to, []byte(msg))
}
const emailTemplate = `
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Alert Notification</title>
<style>
body { font-family: Arial, sans-serif; }
.alert { padding: 20px; margin: 10px 0; border-radius: 5px; }
.critical { background-color: #ffebee; border-left: 5px solid #f44336; }
.error { background-color: #fff3e0; border-left: 5px solid #ff9800; }
.warning { background-color: #fffde7; border-left: 5px solid #ffeb3b; }
.info { background-color: #e3f2fd; border-left: 5px solid #2196f3; }
.resolved { background-color: #e8f5e8; border-left: 5px solid #4caf50; }
.labels { margin: 10px 0; }
.label { display: inline-block; background: #f5f5f5; padding: 2px 8px; margin: 2px; border-radius: 3px; }
</style>
</head>
<body>
<div class="alert {{.Alert.Severity.String}}">
<h2>{{.Alert.Name}}</h2>
<p><strong>Status:</strong> {{.EventType}}</p>
<p><strong>Severity:</strong> {{.Alert.Severity.String}}</p>
<p><strong>Description:</strong> {{.Alert.Description}}</p>
<p><strong>Value:</strong> {{.Alert.Value}}</p>
<p><strong>Threshold:</strong> {{.Alert.Threshold}}</p>
<p><strong>Started At:</strong> {{.Alert.StartsAt.Format "2006-01-02 15:04:05"}}</p>
{{if not .Alert.EndsAt.IsZero}}
<p><strong>Ended At:</strong> {{.Alert.EndsAt.Format "2006-01-02 15:04:05"}}</p>
{{end}}
{{if .Alert.Labels}}
<div class="labels">
<strong>Labels:</strong><br>
{{range $key, $value := .Alert.Labels}}
<span class="label">{{$key}}: {{$value}}</span>
{{end}}
</div>
{{end}}
{{if .Alert.Annotations}}
<div class="annotations">
<strong>Annotations:</strong><br>
{{range $key, $value := .Alert.Annotations}}
<p><strong>{{$key}}:</strong> {{$value}}</p>
{{end}}
</div>
{{end}}
</div>
</body>
</html>
`
Webhook 通知器 #
import (
"bytes"
"encoding/json"
"net/http"
"time"
)
// WebhookNotifier Webhook 通知器
type WebhookNotifier struct {
*BaseNotifier
url string
timeout time.Duration
headers map[string]string
client *http.Client
}
// WebhookConfig Webhook 配置
type WebhookConfig struct {
URL string `json:"url"`
Timeout time.Duration `json:"timeout"`
Headers map[string]string `json:"headers"`
}
// NewWebhookNotifier 创建 Webhook 通知器
func NewWebhookNotifier(config *WebhookConfig) *WebhookNotifier {
if config.Timeout == 0 {
config.Timeout = 10 * time.Second
}
return &WebhookNotifier{
BaseNotifier: NewBaseNotifier("webhook"),
url: config.URL,
timeout: config.Timeout,
headers: config.Headers,
client: &http.Client{
Timeout: config.Timeout,
},
}
}
// Send 发送 Webhook 通知
func (wn *WebhookNotifier) Send(notification *Notification) error {
if !wn.IsEnabled() {
return nil
}
// 序列化通知
data, err := json.Marshal(notification)
if err != nil {
return err
}
// 创建请求
req, err := http.NewRequest("POST", wn.url, bytes.NewBuffer(data))
if err != nil {
return err
}
// 设置头部
req.Header.Set("Content-Type", "application/json")
for key, value := range wn.headers {
req.Header.Set(key, value)
}
// 发送请求
resp, err := wn.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("webhook returned status %d", resp.StatusCode)
}
return nil
}
Slack 通知器 #
// SlackNotifier Slack 通知器
type SlackNotifier struct {
*BaseNotifier
webhookURL string
channel string
username string
client *http.Client
}
// SlackConfig Slack 配置
type SlackConfig struct {
WebhookURL string `json:"webhook_url"`
Channel string `json:"channel"`
Username string `json:"username"`
}
// SlackMessage Slack 消息
type SlackMessage struct {
Channel string `json:"channel,omitempty"`
Username string `json:"username,omitempty"`
Text string `json:"text"`
Attachments []SlackAttachment `json:"attachments,omitempty"`
}
// SlackAttachment Slack 附件
type SlackAttachment struct {
Color string `json:"color"`
Title string `json:"title"`
Text string `json:"text"`
Fields []SlackField `json:"fields"`
Timestamp int64 `json:"ts"`
}
// SlackField Slack 字段
type SlackField struct {
Title string `json:"title"`
Value string `json:"value"`
Short bool `json:"short"`
}
// NewSlackNotifier 创建 Slack 通知器
func NewSlackNotifier(config *SlackConfig) *SlackNotifier {
return &SlackNotifier{
BaseNotifier: NewBaseNotifier("slack"),
webhookURL: config.WebhookURL,
channel: config.Channel,
username: config.Username,
client: &http.Client{
Timeout: 10 * time.Second,
},
}
}
// Send 发送 Slack 通知
func (sn *SlackNotifier) Send(notification *Notification) error {
if !sn.IsEnabled() {
return nil
}
alert := notification.Alert
// 确定颜色
color := sn.getColor(alert.Severity, notification.EventType)
// 构建字段
fields := []SlackField{
{Title: "Severity", Value: alert.Severity.String(), Short: true},
{Title: "Status", Value: notification.EventType, Short: true},
{Title: "Value", Value: fmt.Sprintf("%.2f", alert.Value), Short: true},
{Title: "Threshold", Value: fmt.Sprintf("%.2f", alert.Threshold), Short: true},
}
// 添加标签
if len(alert.Labels) > 0 {
var labels []string
for k, v := range alert.Labels {
labels = append(labels, fmt.Sprintf("%s: %s", k, v))
}
fields = append(fields, SlackField{
Title: "Labels",
Value: strings.Join(labels, "\n"),
Short: false,
})
}
// 构建消息
message := &SlackMessage{
Channel: sn.channel,
Username: sn.username,
Text: fmt.Sprintf("Alert: %s", alert.Name),
Attachments: []SlackAttachment{
{
Color: color,
Title: alert.Name,
Text: alert.Description,
Fields: fields,
Timestamp: alert.StartsAt.Unix(),
},
},
}
// 发送消息
data, err := json.Marshal(message)
if err != nil {
return err
}
resp, err := sn.client.Post(sn.webhookURL, "application/json", bytes.NewBuffer(data))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("slack returned status %d", resp.StatusCode)
}
return nil
}
// getColor 获取颜色
func (sn *SlackNotifier) getColor(severity Severity, eventType string) string {
if eventType == "resolved" {
return "good"
}
switch severity {
case SeverityCritical:
return "danger"
case SeverityError:
return "warning"
case SeverityWarning:
return "#ffeb3b"
case SeverityInfo:
return "#2196f3"
default:
return "#cccccc"
}
}
告警存储 #
存储接口 #
// AlertStorage 告警存储接口
type AlertStorage interface {
Store(alert *Alert) error
Get(id string) (*Alert, error)
List(filters map[string]string, limit, offset int) ([]*Alert, error)
Delete(id string) error
Count(filters map[string]string) (int, error)
}
// MemoryAlertStorage 内存告警存储
type MemoryAlertStorage struct {
alerts map[string]*Alert
mutex sync.RWMutex
}
// NewMemoryAlertStorage 创建内存存储
func NewMemoryAlertStorage() *MemoryAlertStorage {
return &MemoryAlertStorage{
alerts: make(map[string]*Alert),
}
}
// Store 存储告警
func (mas *MemoryAlertStorage) Store(alert *Alert) error {
mas.mutex.Lock()
defer mas.mutex.Unlock()
mas.alerts[alert.ID] = alert
return nil
}
// Get 获取告警
func (mas *MemoryAlertStorage) Get(id string) (*Alert, error) {
mas.mutex.RLock()
defer mas.mutex.RUnlock()
alert, exists := mas.alerts[id]
if !exists {
return nil, fmt.Errorf("alert not found: %s", id)
}
return alert, nil
}
// List 列出告警
func (mas *MemoryAlertStorage) List(filters map[string]string, limit, offset int) ([]*Alert, error) {
mas.mutex.RLock()
defer mas.mutex.RUnlock()
var alerts []*Alert
for _, alert := range mas.alerts {
if mas.matchesFilters(alert, filters) {
alerts = append(alerts, alert)
}
}
// 排序
sort.Slice(alerts, func(i, j int) bool {
return alerts[i].StartsAt.After(alerts[j].StartsAt)
})
// 分页
start := offset
if start > len(alerts) {
start = len(alerts)
}
end := start + limit
if end > len(alerts) {
end = len(alerts)
}
return alerts[start:end], nil
}
// Delete 删除告警
func (mas *MemoryAlertStorage) Delete(id string) error {
mas.mutex.Lock()
defer mas.mutex.Unlock()
delete(mas.alerts, id)
return nil
}
// Count 计数告警
func (mas *MemoryAlertStorage) Count(filters map[string]string) (int, error) {
mas.mutex.RLock()
defer mas.mutex.RUnlock()
count := 0
for _, alert := range mas.alerts {
if mas.matchesFilters(alert, filters) {
count++
}
}
return count, nil
}
// matchesFilters 检查是否匹配过滤器
func (mas *MemoryAlertStorage) matchesFilters(alert *Alert, filters map[string]string) bool {
for key, value := range filters {
switch key {
case "state":
if alert.State.String() != value {
return false
}
case "severity":
if alert.Severity.String() != value {
return false
}
case "name":
if !strings.Contains(alert.Name, value) {
return false
}
default:
// 检查标签
if labelValue, exists := alert.Labels[key]; !exists || labelValue != value {
return false
}
}
}
return true
}
使用示例 #
完整示例 #
package main
import (
"context"
"fmt"
"time"
)
func main() {
// 创建指标注册表
registry := NewRegistry()
// 创建评估器
evaluator := NewEvaluator(registry)
// 创建规则引擎
ruleEngine := NewRuleEngine(evaluator)
// 创建存储
storage := NewMemoryAlertStorage()
// 创建告警管理器
alertManager := NewAlertManager(ruleEngine, storage)
// 配置通知器
emailConfig := &EmailConfig{
SMTPHost: "smtp.gmail.com",
SMTPPort: 587,
Username: "[email protected]",
Password: "your-password",
From: "[email protected]",
To: []string{"[email protected]"},
}
emailNotifier, _ := NewEmailNotifier(emailConfig)
alertManager.AddNotifier(emailNotifier)
slackConfig := &SlackConfig{
WebhookURL: "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
Channel: "#alerts",
Username: "AlertBot",
}
slackNotifier := NewSlackNotifier(slackConfig)
alertManager.AddNotifier(slackNotifier)
// 添加告警规则
cpuRule := &Rule{
Name: "High CPU Usage",
Description: "CPU usage is above 80%",
Query: "system_cpu_usage_percent",
Condition: Condition{
Operator: "gt",
Threshold: 80.0,
},
Duration: 5 * time.Minute,
Labels: map[string]string{
"team": "infrastructure",
"severity": "warning",
},
Annotations: map[string]string{
"summary": "High CPU usage detected",
"description": "CPU usage has been above 80% for more than 5 minutes",
},
Severity: SeverityWarning,
Enabled: true,
}
ruleEngine.AddRule(cpuRule)
memoryRule := &Rule{
Name: "High Memory Usage",
Description: "Memory usage is above 90%",
Query: "system_memory_usage_percent",
Condition: Condition{
Operator: "gt",
Threshold: 90.0,
},
Duration: 2 * time.Minute,
Labels: map[string]string{
"team": "infrastructure",
"severity": "critical",
},
Annotations: map[string]string{
"summary": "Critical memory usage",
"description": "Memory usage has been above 90% for more than 2 minutes",
},
Severity: SeverityCritical,
Enabled: true,
}
ruleEngine.AddRule(memoryRule)
// 创建一些测试指标
cpuGauge := RegisterGauge("system_cpu_usage_percent", nil)
memoryGauge := RegisterGauge("system_memory_usage_percent", nil)
// 启动告警处理循环
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 评估规则
alerts, err := ruleEngine.Evaluate()
if err != nil {
fmt.Printf("Error evaluating rules: %v\n", err)
continue
}
// 处理告警
if len(alerts) > 0 {
fmt.Printf("Processing %d alerts\n", len(alerts))
alertManager.ProcessAlerts(alerts)
}
}
}
}()
// 模拟指标变化
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 模拟 CPU 使用率变化
cpuUsage := 70.0 + float64(time.Now().Unix()%30)
cpuGauge.Set(cpuUsage)
// 模拟内存使用率变化
memoryUsage := 85.0 + float64(time.Now().Unix()%20)
memoryGauge.Set(memoryUsage)
fmt.Printf("CPU: %.1f%%, Memory: %.1f%%\n", cpuUsage, memoryUsage)
}
}
}()
// 运行一段时间
time.Sleep(10 * time.Minute)
}
小结 #
本节详细介绍了告警与通知系统的设计和实现,包括:
- 告警模型:定义了完整的告警状态和数据结构
- 规则引擎:实现了灵活的告警规则定义和评估机制
- 告警管理:提供了告警的生命周期管理和处理流程
- 通知系统:支持多种通知方式,包括邮件、Webhook 和 Slack
- 静默管理:实现了告警的静默和抑制功能
- 存储系统:提供了告警历史的存储和查询功能
通过这个告警系统,我们可以实现全面的监控告警功能,确保系统异常能够及时发现和处理,提高系统的可靠性和稳定性。