4.7.3 插件架构设计

4.7.3 插件架构设计 #

插件架构是一种软件设计模式,它允许应用程序在运行时动态加载和卸载功能模块。这种架构提供了极大的灵活性和可扩展性,使得应用程序可以根据需要添加新功能而无需重新编译整个程序。本节将详细介绍如何在 Go 中设计和实现插件架构。

插件架构概述 #

插件架构的优势 #

  1. 模块化:将功能分解为独立的模块
  2. 可扩展性:可以在不修改核心代码的情况下添加新功能
  3. 热插拔:支持运行时加载和卸载插件
  4. 第三方开发:允许第三方开发者扩展应用功能
  5. 版本管理:不同版本的插件可以独立管理

插件架构的挑战 #

  1. 接口设计:需要设计稳定的插件接口
  2. 版本兼容性:处理插件与主程序的版本兼容问题
  3. 安全性:确保插件不会危害系统安全
  4. 性能:动态加载可能带来性能开销
  5. 调试复杂性:增加了调试和错误处理的复杂性

插件接口设计 #

基础插件接口 #

package plugin

import (
    "context"
    "fmt"
)

// PluginInfo 插件信息
type PluginInfo struct {
    Name        string            `json:"name"`
    Version     string            `json:"version"`
    Description string            `json:"description"`
    Author      string            `json:"author"`
    License     string            `json:"license"`
    Tags        []string          `json:"tags"`
    Metadata    map[string]string `json:"metadata"`
}

// PluginStatus 插件状态
type PluginStatus int

const (
    StatusUnloaded PluginStatus = iota
    StatusLoaded
    StatusActive
    StatusError
    StatusDisabled
)

func (s PluginStatus) String() string {
    switch s {
    case StatusUnloaded:
        return "unloaded"
    case StatusLoaded:
        return "loaded"
    case StatusActive:
        return "active"
    case StatusError:
        return "error"
    case StatusDisabled:
        return "disabled"
    default:
        return "unknown"
    }
}

// Plugin 基础插件接口
type Plugin interface {
    // 获取插件信息
    Info() PluginInfo

    // 初始化插件
    Initialize(ctx context.Context, config map[string]interface{}) error

    // 启动插件
    Start(ctx context.Context) error

    // 停止插件
    Stop(ctx context.Context) error

    // 清理插件资源
    Cleanup() error

    // 获取插件状态
    Status() PluginStatus

    // 健康检查
    HealthCheck() error
}

// ConfigurablePlugin 可配置插件接口
type ConfigurablePlugin interface {
    Plugin

    // 获取配置模式
    ConfigSchema() map[string]interface{}

    // 更新配置
    UpdateConfig(config map[string]interface{}) error

    // 获取当前配置
    GetConfig() map[string]interface{}
}

// EventPlugin 事件处理插件接口
type EventPlugin interface {
    Plugin

    // 处理事件
    HandleEvent(ctx context.Context, event Event) error

    // 获取支持的事件类型
    SupportedEvents() []string
}

// ServicePlugin 服务插件接口
type ServicePlugin interface {
    Plugin

    // 提供服务
    Serve(ctx context.Context, request ServiceRequest) (ServiceResponse, error)

    // 获取服务描述
    ServiceDescription() ServiceDescriptor
}

// Event 事件结构
type Event struct {
    Type      string                 `json:"type"`
    Source    string                 `json:"source"`
    Timestamp int64                  `json:"timestamp"`
    Data      map[string]interface{} `json:"data"`
    Metadata  map[string]string      `json:"metadata"`
}

// ServiceRequest 服务请求
type ServiceRequest struct {
    Method    string                 `json:"method"`
    Path      string                 `json:"path"`
    Headers   map[string]string      `json:"headers"`
    Body      []byte                 `json:"body"`
    Params    map[string]interface{} `json:"params"`
    Context   context.Context        `json:"-"`
}

// ServiceResponse 服务响应
type ServiceResponse struct {
    StatusCode int               `json:"status_code"`
    Headers    map[string]string `json:"headers"`
    Body       []byte            `json:"body"`
    Error      string            `json:"error,omitempty"`
}

// ServiceDescriptor 服务描述符
type ServiceDescriptor struct {
    Name        string            `json:"name"`
    Version     string            `json:"version"`
    Endpoints   []EndpointInfo    `json:"endpoints"`
    Schema      map[string]interface{} `json:"schema"`
    Metadata    map[string]string `json:"metadata"`
}

// EndpointInfo 端点信息
type EndpointInfo struct {
    Method      string            `json:"method"`
    Path        string            `json:"path"`
    Description string            `json:"description"`
    Parameters  []ParameterInfo   `json:"parameters"`
    Responses   []ResponseInfo    `json:"responses"`
}

// ParameterInfo 参数信息
type ParameterInfo struct {
    Name        string `json:"name"`
    Type        string `json:"type"`
    Required    bool   `json:"required"`
    Description string `json:"description"`
}

// ResponseInfo 响应信息
type ResponseInfo struct {
    StatusCode  int    `json:"status_code"`
    Description string `json:"description"`
    Schema      string `json:"schema"`
}

基础插件实现 #

package plugin

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// BasePlugin 基础插件实现
type BasePlugin struct {
    info     PluginInfo
    status   PluginStatus
    config   map[string]interface{}
    mutex    sync.RWMutex
    startTime time.Time
    stopTime  time.Time
}

// NewBasePlugin 创建基础插件
func NewBasePlugin(info PluginInfo) *BasePlugin {
    return &BasePlugin{
        info:   info,
        status: StatusUnloaded,
        config: make(map[string]interface{}),
    }
}

// Info 获取插件信息
func (bp *BasePlugin) Info() PluginInfo {
    bp.mutex.RLock()
    defer bp.mutex.RUnlock()
    return bp.info
}

// Initialize 初始化插件
func (bp *BasePlugin) Initialize(ctx context.Context, config map[string]interface{}) error {
    bp.mutex.Lock()
    defer bp.mutex.Unlock()

    if bp.status != StatusUnloaded {
        return fmt.Errorf("plugin already initialized")
    }

    bp.config = config
    bp.status = StatusLoaded

    return nil
}

// Start 启动插件
func (bp *BasePlugin) Start(ctx context.Context) error {
    bp.mutex.Lock()
    defer bp.mutex.Unlock()

    if bp.status != StatusLoaded {
        return fmt.Errorf("plugin not loaded or already started")
    }

    bp.status = StatusActive
    bp.startTime = time.Now()

    return nil
}

// Stop 停止插件
func (bp *BasePlugin) Stop(ctx context.Context) error {
    bp.mutex.Lock()
    defer bp.mutex.Unlock()

    if bp.status != StatusActive {
        return fmt.Errorf("plugin not active")
    }

    bp.status = StatusLoaded
    bp.stopTime = time.Now()

    return nil
}

// Cleanup 清理插件资源
func (bp *BasePlugin) Cleanup() error {
    bp.mutex.Lock()
    defer bp.mutex.Unlock()

    bp.status = StatusUnloaded
    bp.config = make(map[string]interface{})

    return nil
}

// Status 获取插件状态
func (bp *BasePlugin) Status() PluginStatus {
    bp.mutex.RLock()
    defer bp.mutex.RUnlock()
    return bp.status
}

// HealthCheck 健康检查
func (bp *BasePlugin) HealthCheck() error {
    bp.mutex.RLock()
    defer bp.mutex.RUnlock()

    if bp.status == StatusError {
        return fmt.Errorf("plugin is in error state")
    }

    return nil
}

// SetStatus 设置插件状态
func (bp *BasePlugin) SetStatus(status PluginStatus) {
    bp.mutex.Lock()
    defer bp.mutex.Unlock()
    bp.status = status
}

// GetConfig 获取配置
func (bp *BasePlugin) GetConfig() map[string]interface{} {
    bp.mutex.RLock()
    defer bp.mutex.RUnlock()

    result := make(map[string]interface{})
    for k, v := range bp.config {
        result[k] = v
    }
    return result
}

// UpdateConfig 更新配置
func (bp *BasePlugin) UpdateConfig(config map[string]interface{}) error {
    bp.mutex.Lock()
    defer bp.mutex.Unlock()

    for k, v := range config {
        bp.config[k] = v
    }

    return nil
}

// Uptime 获取运行时间
func (bp *BasePlugin) Uptime() time.Duration {
    bp.mutex.RLock()
    defer bp.mutex.RUnlock()

    if bp.status == StatusActive {
        return time.Since(bp.startTime)
    }

    if !bp.stopTime.IsZero() {
        return bp.stopTime.Sub(bp.startTime)
    }

    return 0
}

插件管理器 #

插件注册表 #

package plugin

import (
    "context"
    "fmt"
    "sync"
)

// PluginRegistry 插件注册表
type PluginRegistry struct {
    plugins map[string]Plugin
    mutex   sync.RWMutex
}

// NewPluginRegistry 创建插件注册表
func NewPluginRegistry() *PluginRegistry {
    return &PluginRegistry{
        plugins: make(map[string]Plugin),
    }
}

// Register 注册插件
func (pr *PluginRegistry) Register(name string, plugin Plugin) error {
    pr.mutex.Lock()
    defer pr.mutex.Unlock()

    if _, exists := pr.plugins[name]; exists {
        return fmt.Errorf("plugin %s already registered", name)
    }

    pr.plugins[name] = plugin
    return nil
}

// Unregister 注销插件
func (pr *PluginRegistry) Unregister(name string) error {
    pr.mutex.Lock()
    defer pr.mutex.Unlock()

    plugin, exists := pr.plugins[name]
    if !exists {
        return fmt.Errorf("plugin %s not found", name)
    }

    // 停止并清理插件
    if plugin.Status() == StatusActive {
        plugin.Stop(context.Background())
    }
    plugin.Cleanup()

    delete(pr.plugins, name)
    return nil
}

// Get 获取插件
func (pr *PluginRegistry) Get(name string) (Plugin, error) {
    pr.mutex.RLock()
    defer pr.mutex.RUnlock()

    plugin, exists := pr.plugins[name]
    if !exists {
        return nil, fmt.Errorf("plugin %s not found", name)
    }

    return plugin, nil
}

// List 列出所有插件
func (pr *PluginRegistry) List() []string {
    pr.mutex.RLock()
    defer pr.mutex.RUnlock()

    names := make([]string, 0, len(pr.plugins))
    for name := range pr.plugins {
        names = append(names, name)
    }

    return names
}

// ListByStatus 按状态列出插件
func (pr *PluginRegistry) ListByStatus(status PluginStatus) []string {
    pr.mutex.RLock()
    defer pr.mutex.RUnlock()

    var names []string
    for name, plugin := range pr.plugins {
        if plugin.Status() == status {
            names = append(names, name)
        }
    }

    return names
}

// GetPluginInfo 获取插件信息
func (pr *PluginRegistry) GetPluginInfo(name string) (PluginInfo, error) {
    plugin, err := pr.Get(name)
    if err != nil {
        return PluginInfo{}, err
    }

    return plugin.Info(), nil
}

// GetAllPluginInfo 获取所有插件信息
func (pr *PluginRegistry) GetAllPluginInfo() map[string]PluginInfo {
    pr.mutex.RLock()
    defer pr.mutex.RUnlock()

    info := make(map[string]PluginInfo)
    for name, plugin := range pr.plugins {
        info[name] = plugin.Info()
    }

    return info
}

插件管理器 #

package plugin

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"
)

// PluginManager 插件管理器
type PluginManager struct {
    registry    *PluginRegistry
    eventBus    *EventBus
    config      ManagerConfig
    mutex       sync.RWMutex
    running     bool
    stopChan    chan struct{}
    healthTicker *time.Ticker
}

// ManagerConfig 管理器配置
type ManagerConfig struct {
    HealthCheckInterval time.Duration
    StartTimeout        time.Duration
    StopTimeout         time.Duration
    EnableAutoRestart   bool
    MaxRestartAttempts  int
}

// DefaultManagerConfig 默认管理器配置
func DefaultManagerConfig() ManagerConfig {
    return ManagerConfig{
        HealthCheckInterval: 30 * time.Second,
        StartTimeout:        10 * time.Second,
        StopTimeout:         10 * time.Second,
        EnableAutoRestart:   true,
        MaxRestartAttempts:  3,
    }
}

// NewPluginManager 创建插件管理器
func NewPluginManager(config ManagerConfig) *PluginManager {
    return &PluginManager{
        registry: NewPluginRegistry(),
        eventBus: NewEventBus(),
        config:   config,
        stopChan: make(chan struct{}),
    }
}

// Start 启动插件管理器
func (pm *PluginManager) Start(ctx context.Context) error {
    pm.mutex.Lock()
    defer pm.mutex.Unlock()

    if pm.running {
        return fmt.Errorf("plugin manager already running")
    }

    pm.running = true

    // 启动健康检查
    if pm.config.HealthCheckInterval > 0 {
        pm.healthTicker = time.NewTicker(pm.config.HealthCheckInterval)
        go pm.healthCheckLoop()
    }

    return nil
}

// Stop 停止插件管理器
func (pm *PluginManager) Stop(ctx context.Context) error {
    pm.mutex.Lock()
    defer pm.mutex.Unlock()

    if !pm.running {
        return fmt.Errorf("plugin manager not running")
    }

    pm.running = false
    close(pm.stopChan)

    if pm.healthTicker != nil {
        pm.healthTicker.Stop()
    }

    // 停止所有活跃插件
    activePlugins := pm.registry.ListByStatus(StatusActive)
    for _, name := range activePlugins {
        if err := pm.StopPlugin(ctx, name); err != nil {
            log.Printf("Error stopping plugin %s: %v", name, err)
        }
    }

    return nil
}

// LoadPlugin 加载插件
func (pm *PluginManager) LoadPlugin(name string, plugin Plugin, config map[string]interface{}) error {
    pm.mutex.Lock()
    defer pm.mutex.Unlock()

    // 注册插件
    if err := pm.registry.Register(name, plugin); err != nil {
        return err
    }

    // 初始化插件
    ctx, cancel := context.WithTimeout(context.Background(), pm.config.StartTimeout)
    defer cancel()

    if err := plugin.Initialize(ctx, config); err != nil {
        pm.registry.Unregister(name)
        return fmt.Errorf("failed to initialize plugin %s: %v", name, err)
    }

    // 发送插件加载事件
    pm.eventBus.Publish(Event{
        Type:   "plugin.loaded",
        Source: "plugin_manager",
        Data: map[string]interface{}{
            "plugin_name": name,
            "plugin_info": plugin.Info(),
        },
    })

    return nil
}

// UnloadPlugin 卸载插件
func (pm *PluginManager) UnloadPlugin(name string) error {
    pm.mutex.Lock()
    defer pm.mutex.Unlock()

    plugin, err := pm.registry.Get(name)
    if err != nil {
        return err
    }

    // 如果插件正在运行,先停止它
    if plugin.Status() == StatusActive {
        ctx, cancel := context.WithTimeout(context.Background(), pm.config.StopTimeout)
        defer cancel()

        if err := plugin.Stop(ctx); err != nil {
            log.Printf("Error stopping plugin %s: %v", name, err)
        }
    }

    // 注销插件
    if err := pm.registry.Unregister(name); err != nil {
        return err
    }

    // 发送插件卸载事件
    pm.eventBus.Publish(Event{
        Type:   "plugin.unloaded",
        Source: "plugin_manager",
        Data: map[string]interface{}{
            "plugin_name": name,
        },
    })

    return nil
}

// StartPlugin 启动插件
func (pm *PluginManager) StartPlugin(ctx context.Context, name string) error {
    plugin, err := pm.registry.Get(name)
    if err != nil {
        return err
    }

    if plugin.Status() != StatusLoaded {
        return fmt.Errorf("plugin %s is not in loaded state", name)
    }

    startCtx, cancel := context.WithTimeout(ctx, pm.config.StartTimeout)
    defer cancel()

    if err := plugin.Start(startCtx); err != nil {
        return fmt.Errorf("failed to start plugin %s: %v", name, err)
    }

    // 发送插件启动事件
    pm.eventBus.Publish(Event{
        Type:   "plugin.started",
        Source: "plugin_manager",
        Data: map[string]interface{}{
            "plugin_name": name,
        },
    })

    return nil
}

// StopPlugin 停止插件
func (pm *PluginManager) StopPlugin(ctx context.Context, name string) error {
    plugin, err := pm.registry.Get(name)
    if err != nil {
        return err
    }

    if plugin.Status() != StatusActive {
        return fmt.Errorf("plugin %s is not active", name)
    }

    stopCtx, cancel := context.WithTimeout(ctx, pm.config.StopTimeout)
    defer cancel()

    if err := plugin.Stop(stopCtx); err != nil {
        return fmt.Errorf("failed to stop plugin %s: %v", name, err)
    }

    // 发送插件停止事件
    pm.eventBus.Publish(Event{
        Type:   "plugin.stopped",
        Source: "plugin_manager",
        Data: map[string]interface{}{
            "plugin_name": name,
        },
    })

    return nil
}

// RestartPlugin 重启插件
func (pm *PluginManager) RestartPlugin(ctx context.Context, name string) error {
    if err := pm.StopPlugin(ctx, name); err != nil {
        return err
    }

    return pm.StartPlugin(ctx, name)
}

// GetPluginStatus 获取插件状态
func (pm *PluginManager) GetPluginStatus(name string) (PluginStatus, error) {
    plugin, err := pm.registry.Get(name)
    if err != nil {
        return StatusUnloaded, err
    }

    return plugin.Status(), nil
}

// ListPlugins 列出所有插件
func (pm *PluginManager) ListPlugins() []string {
    return pm.registry.List()
}

// GetPluginInfo 获取插件信息
func (pm *PluginManager) GetPluginInfo(name string) (PluginInfo, error) {
    return pm.registry.GetPluginInfo(name)
}

// GetAllPluginInfo 获取所有插件信息
func (pm *PluginManager) GetAllPluginInfo() map[string]PluginInfo {
    return pm.registry.GetAllPluginInfo()
}

// healthCheckLoop 健康检查循环
func (pm *PluginManager) healthCheckLoop() {
    for {
        select {
        case <-pm.stopChan:
            return
        case <-pm.healthTicker.C:
            pm.performHealthCheck()
        }
    }
}

// performHealthCheck 执行健康检查
func (pm *PluginManager) performHealthCheck() {
    activePlugins := pm.registry.ListByStatus(StatusActive)

    for _, name := range activePlugins {
        plugin, err := pm.registry.Get(name)
        if err != nil {
            continue
        }

        if err := plugin.HealthCheck(); err != nil {
            log.Printf("Plugin %s health check failed: %v", name, err)

            // 设置插件状态为错误
            if bp, ok := plugin.(*BasePlugin); ok {
                bp.SetStatus(StatusError)
            }

            // 发送健康检查失败事件
            pm.eventBus.Publish(Event{
                Type:   "plugin.health_check_failed",
                Source: "plugin_manager",
                Data: map[string]interface{}{
                    "plugin_name": name,
                    "error":       err.Error(),
                },
            })

            // 如果启用了自动重启,尝试重启插件
            if pm.config.EnableAutoRestart {
                go pm.attemptRestart(name)
            }
        }
    }
}

// attemptRestart 尝试重启插件
func (pm *PluginManager) attemptRestart(name string) {
    for attempt := 1; attempt <= pm.config.MaxRestartAttempts; attempt++ {
        log.Printf("Attempting to restart plugin %s (attempt %d/%d)",
                   name, attempt, pm.config.MaxRestartAttempts)

        ctx, cancel := context.WithTimeout(context.Background(), pm.config.StartTimeout)
        err := pm.RestartPlugin(ctx, name)
        cancel()

        if err == nil {
            log.Printf("Successfully restarted plugin %s", name)

            // 发送插件重启成功事件
            pm.eventBus.Publish(Event{
                Type:   "plugin.restarted",
                Source: "plugin_manager",
                Data: map[string]interface{}{
                    "plugin_name": name,
                    "attempt":     attempt,
                },
            })
            return
        }

        log.Printf("Failed to restart plugin %s (attempt %d): %v", name, attempt, err)

        if attempt < pm.config.MaxRestartAttempts {
            time.Sleep(time.Duration(attempt) * time.Second)
        }
    }

    log.Printf("Failed to restart plugin %s after %d attempts", name, pm.config.MaxRestartAttempts)

    // 发送插件重启失败事件
    pm.eventBus.Publish(Event{
        Type:   "plugin.restart_failed",
        Source: "plugin_manager",
        Data: map[string]interface{}{
            "plugin_name": name,
            "attempts":    pm.config.MaxRestartAttempts,
        },
    })
}

// SubscribeToEvents 订阅事件
func (pm *PluginManager) SubscribeToEvents(eventType string, handler EventHandler) {
    pm.eventBus.Subscribe(eventType, handler)
}

// PublishEvent 发布事件
func (pm *PluginManager) PublishEvent(event Event) {
    pm.eventBus.Publish(event)
}

事件总线系统 #

事件总线实现 #

package plugin

import (
    "sync"
)

// EventHandler 事件处理器
type EventHandler func(event Event)

// EventBus 事件总线
type EventBus struct {
    handlers map[string][]EventHandler
    mutex    sync.RWMutex
}

// NewEventBus 创建事件总线
func NewEventBus() *EventBus {
    return &EventBus{
        handlers: make(map[string][]EventHandler),
    }
}

// Subscribe 订阅事件
func (eb *EventBus) Subscribe(eventType string, handler EventHandler) {
    eb.mutex.Lock()
    defer eb.mutex.Unlock()

    eb.handlers[eventType] = append(eb.handlers[eventType], handler)
}

// Unsubscribe 取消订阅(简化版,实际实现可能需要更复杂的逻辑)
func (eb *EventBus) Unsubscribe(eventType string) {
    eb.mutex.Lock()
    defer eb.mutex.Unlock()

    delete(eb.handlers, eventType)
}

// Publish 发布事件
func (eb *EventBus) Publish(event Event) {
    eb.mutex.RLock()
    handlers, exists := eb.handlers[event.Type]
    eb.mutex.RUnlock()

    if !exists {
        return
    }

    // 异步处理事件
    for _, handler := range handlers {
        go func(h EventHandler) {
            defer func() {
                if r := recover(); r != nil {
                    // 处理 panic,避免影响其他处理器
                    log.Printf("Event handler panic: %v", r)
                }
            }()
            h(event)
        }(handler)
    }
}

// PublishSync 同步发布事件
func (eb *EventBus) PublishSync(event Event) {
    eb.mutex.RLock()
    handlers, exists := eb.handlers[event.Type]
    eb.mutex.RUnlock()

    if !exists {
        return
    }

    for _, handler := range handlers {
        func(h EventHandler) {
            defer func() {
                if r := recover(); r != nil {
                    log.Printf("Event handler panic: %v", r)
                }
            }()
            h(event)
        }(handler)
    }
}

// GetSubscriberCount 获取订阅者数量
func (eb *EventBus) GetSubscriberCount(eventType string) int {
    eb.mutex.RLock()
    defer eb.mutex.RUnlock()

    return len(eb.handlers[eventType])
}

// GetAllEventTypes 获取所有事件类型
func (eb *EventBus) GetAllEventTypes() []string {
    eb.mutex.RLock()
    defer eb.mutex.RUnlock()

    types := make([]string, 0, len(eb.handlers))
    for eventType := range eb.handlers {
        types = append(types, eventType)
    }

    return types
}

具体插件实现示例 #

日志插件示例 #

package plugins

import (
    "context"
    "fmt"
    "log"
    "os"
    "path/filepath"
    "time"

    "your-project/plugin"
)

// LoggerPlugin 日志插件
type LoggerPlugin struct {
    *plugin.BasePlugin
    logFile   *os.File
    logger    *log.Logger
    logLevel  string
    logFormat string
}

// NewLoggerPlugin 创建日志插件
func NewLoggerPlugin() *LoggerPlugin {
    info := plugin.PluginInfo{
        Name:        "logger",
        Version:     "1.0.0",
        Description: "File logging plugin",
        Author:      "System",
        License:     "MIT",
        Tags:        []string{"logging", "file"},
    }

    return &LoggerPlugin{
        BasePlugin: plugin.NewBasePlugin(info),
    }
}

// Initialize 初始化插件
func (lp *LoggerPlugin) Initialize(ctx context.Context, config map[string]interface{}) error {
    if err := lp.BasePlugin.Initialize(ctx, config); err != nil {
        return err
    }

    // 解析配置
    logPath, ok := config["log_path"].(string)
    if !ok {
        logPath = "logs/app.log"
    }

    lp.logLevel, _ = config["log_level"].(string)
    if lp.logLevel == "" {
        lp.logLevel = "INFO"
    }

    lp.logFormat, _ = config["log_format"].(string)
    if lp.logFormat == "" {
        lp.logFormat = "2006-01-02 15:04:05"
    }

    // 创建日志目录
    if err := os.MkdirAll(filepath.Dir(logPath), 0755); err != nil {
        return fmt.Errorf("failed to create log directory: %v", err)
    }

    // 打开日志文件
    file, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
    if err != nil {
        return fmt.Errorf("failed to open log file: %v", err)
    }

    lp.logFile = file
    lp.logger = log.New(file, "", 0)

    return nil
}

// Start 启动插件
func (lp *LoggerPlugin) Start(ctx context.Context) error {
    if err := lp.BasePlugin.Start(ctx); err != nil {
        return err
    }

    lp.logger.Printf("[%s] Logger plugin started", time.Now().Format(lp.logFormat))
    return nil
}

// Stop 停止插件
func (lp *LoggerPlugin) Stop(ctx context.Context) error {
    if err := lp.BasePlugin.Stop(ctx); err != nil {
        return err
    }

    lp.logger.Printf("[%s] Logger plugin stopped", time.Now().Format(lp.logFormat))
    return nil
}

// Cleanup 清理资源
func (lp *LoggerPlugin) Cleanup() error {
    if lp.logFile != nil {
        lp.logFile.Close()
        lp.logFile = nil
    }

    return lp.BasePlugin.Cleanup()
}

// HandleEvent 处理事件
func (lp *LoggerPlugin) HandleEvent(ctx context.Context, event plugin.Event) error {
    if lp.Status() != plugin.StatusActive {
        return fmt.Errorf("plugin not active")
    }

    timestamp := time.Unix(event.Timestamp, 0).Format(lp.logFormat)
    logEntry := fmt.Sprintf("[%s] Event: %s from %s - %v",
                           timestamp, event.Type, event.Source, event.Data)

    lp.logger.Println(logEntry)
    return nil
}

// SupportedEvents 支持的事件类型
func (lp *LoggerPlugin) SupportedEvents() []string {
    return []string{"*"} // 支持所有事件类型
}

// ConfigSchema 配置模式
func (lp *LoggerPlugin) ConfigSchema() map[string]interface{} {
    return map[string]interface{}{
        "log_path": map[string]interface{}{
            "type":        "string",
            "description": "Log file path",
            "default":     "logs/app.log",
        },
        "log_level": map[string]interface{}{
            "type":        "string",
            "description": "Log level",
            "enum":        []string{"DEBUG", "INFO", "WARN", "ERROR"},
            "default":     "INFO",
        },
        "log_format": map[string]interface{}{
            "type":        "string",
            "description": "Timestamp format",
            "default":     "2006-01-02 15:04:05",
        },
    }
}

HTTP 服务插件示例 #

package plugins

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "strconv"
    "time"

    "your-project/plugin"
)

// HTTPServicePlugin HTTP 服务插件
type HTTPServicePlugin struct {
    *plugin.BasePlugin
    server *http.Server
    port   int
}

// NewHTTPServicePlugin 创建 HTTP 服务插件
func NewHTTPServicePlugin() *HTTPServicePlugin {
    info := plugin.PluginInfo{
        Name:        "http_service",
        Version:     "1.0.0",
        Description: "HTTP service plugin",
        Author:      "System",
        License:     "MIT",
        Tags:        []string{"http", "service", "api"},
    }

    return &HTTPServicePlugin{
        BasePlugin: plugin.NewBasePlugin(info),
    }
}

// Initialize 初始化插件
func (hsp *HTTPServicePlugin) Initialize(ctx context.Context, config map[string]interface{}) error {
    if err := hsp.BasePlugin.Initialize(ctx, config); err != nil {
        return err
    }

    // 解析端口配置
    if portVal, ok := config["port"]; ok {
        switch v := portVal.(type) {
        case int:
            hsp.port = v
        case float64:
            hsp.port = int(v)
        case string:
            if p, err := strconv.Atoi(v); err == nil {
                hsp.port = p
            }
        }
    }

    if hsp.port == 0 {
        hsp.port = 8080
    }

    // 创建 HTTP 服务器
    mux := http.NewServeMux()
    hsp.setupRoutes(mux)

    hsp.server = &http.Server{
        Addr:    fmt.Sprintf(":%d", hsp.port),
        Handler: mux,
    }

    return nil
}

// Start 启动插件
func (hsp *HTTPServicePlugin) Start(ctx context.Context) error {
    if err := hsp.BasePlugin.Start(ctx); err != nil {
        return err
    }

    // 启动 HTTP 服务器
    go func() {
        if err := hsp.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            hsp.SetStatus(plugin.StatusError)
        }
    }()

    return nil
}

// Stop 停止插件
func (hsp *HTTPServicePlugin) Stop(ctx context.Context) error {
    if err := hsp.BasePlugin.Stop(ctx); err != nil {
        return err
    }

    // 停止 HTTP 服务器
    if hsp.server != nil {
        return hsp.server.Shutdown(ctx)
    }

    return nil
}

// setupRoutes 设置路由
func (hsp *HTTPServicePlugin) setupRoutes(mux *http.ServeMux) {
    mux.HandleFunc("/health", hsp.healthHandler)
    mux.HandleFunc("/info", hsp.infoHandler)
    mux.HandleFunc("/config", hsp.configHandler)
}

// healthHandler 健康检查处理器
func (hsp *HTTPServicePlugin) healthHandler(w http.ResponseWriter, r *http.Request) {
    response := map[string]interface{}{
        "status":    "healthy",
        "timestamp": time.Now().Unix(),
        "uptime":    hsp.Uptime().Seconds(),
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}

// infoHandler 信息处理器
func (hsp *HTTPServicePlugin) infoHandler(w http.ResponseWriter, r *http.Request) {
    info := hsp.Info()

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(info)
}

// configHandler 配置处理器
func (hsp *HTTPServicePlugin) configHandler(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case http.MethodGet:
        config := hsp.GetConfig()
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(config)

    case http.MethodPut:
        var newConfig map[string]interface{}
        if err := json.NewDecoder(r.Body).Decode(&newConfig); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }

        if err := hsp.UpdateConfig(newConfig); err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }

        w.WriteHeader(http.StatusOK)

    default:
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
    }
}

// Serve 提供服务
func (hsp *HTTPServicePlugin) Serve(ctx context.Context, request plugin.ServiceRequest) (plugin.ServiceResponse, error) {
    // 这里可以实现更复杂的服务逻辑
    response := plugin.ServiceResponse{
        StatusCode: 200,
        Headers:    map[string]string{"Content-Type": "application/json"},
        Body:       []byte(`{"message": "Hello from HTTP service plugin"}`),
    }

    return response, nil
}

// ServiceDescription 服务描述
func (hsp *HTTPServicePlugin) ServiceDescription() plugin.ServiceDescriptor {
    return plugin.ServiceDescriptor{
        Name:    "http_service",
        Version: "1.0.0",
        Endpoints: []plugin.EndpointInfo{
            {
                Method:      "GET",
                Path:        "/health",
                Description: "Health check endpoint",
                Responses: []plugin.ResponseInfo{
                    {StatusCode: 200, Description: "Healthy"},
                },
            },
            {
                Method:      "GET",
                Path:        "/info",
                Description: "Plugin information endpoint",
                Responses: []plugin.ResponseInfo{
                    {StatusCode: 200, Description: "Plugin info"},
                },
            },
        },
    }
}

// ConfigSchema 配置模式
func (hsp *HTTPServicePlugin) ConfigSchema() map[string]interface{} {
    return map[string]interface{}{
        "port": map[string]interface{}{
            "type":        "integer",
            "description": "HTTP server port",
            "default":     8080,
            "minimum":     1024,
            "maximum":     65535,
        },
    }
}

使用示例 #

完整的插件系统示例 #

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "your-project/plugin"
    "your-project/plugins"
)

func main() {
    // 创建插件管理器
    config := plugin.DefaultManagerConfig()
    config.HealthCheckInterval = 10 * time.Second

    manager := plugin.NewPluginManager(config)

    // 启动插件管理器
    ctx := context.Background()
    if err := manager.Start(ctx); err != nil {
        log.Fatalf("Failed to start plugin manager: %v", err)
    }
    defer manager.Stop(ctx)

    // 订阅插件事件
    manager.SubscribeToEvents("plugin.loaded", func(event plugin.Event) {
        fmt.Printf("Plugin loaded: %s\n", event.Data["plugin_name"])
    })

    manager.SubscribeToEvents("plugin.started", func(event plugin.Event) {
        fmt.Printf("Plugin started: %s\n", event.Data["plugin_name"])
    })

    manager.SubscribeToEvents("plugin.health_check_failed", func(event plugin.Event) {
        fmt.Printf("Plugin health check failed: %s - %s\n",
                   event.Data["plugin_name"], event.Data["error"])
    })

    // 加载日志插件
    loggerPlugin := plugins.NewLoggerPlugin()
    loggerConfig := map[string]interface{}{
        "log_path":   "logs/system.log",
        "log_level":  "INFO",
        "log_format": "2006-01-02 15:04:05",
    }

    if err := manager.LoadPlugin("logger", loggerPlugin, loggerConfig); err != nil {
        log.Fatalf("Failed to load logger plugin: %v", err)
    }

    // 启动日志插件
    if err := manager.StartPlugin(ctx, "logger"); err != nil {
        log.Fatalf("Failed to start logger plugin: %v", err)
    }

    // 加载 HTTP 服务插件
    httpPlugin := plugins.NewHTTPServicePlugin()
    httpConfig := map[string]interface{}{
        "port": 8080,
    }

    if err := manager.LoadPlugin("http_service", httpPlugin, httpConfig); err != nil {
        log.Fatalf("Failed to load HTTP service plugin: %v", err)
    }

    // 启动 HTTP 服务插件
    if err := manager.StartPlugin(ctx, "http_service"); err != nil {
        log.Fatalf("Failed to start HTTP service plugin: %v", err)
    }

    // 发布一些测试事件
    go func() {
        ticker := time.NewTicker(5 * time.Second)
        defer ticker.Stop()

        counter := 0
        for {
            select {
            case <-ticker.C:
                counter++
                manager.PublishEvent(plugin.Event{
                    Type:      "test.event",
                    Source:    "main",
                    Timestamp: time.Now().Unix(),
                    Data: map[string]interface{}{
                        "counter": counter,
                        "message": fmt.Sprintf("Test event #%d", counter),
                    },
                })
            }
        }
    }()

    // 打印插件状态
    go func() {
        ticker := time.NewTicker(30 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-ticker.C:
                fmt.Println("\n=== Plugin Status ===")
                allInfo := manager.GetAllPluginInfo()
                for name, info := range allInfo {
                    status, _ := manager.GetPluginStatus(name)
                    fmt.Printf("Plugin: %s (v%s) - Status: %s\n",
                               info.Name, info.Version, status)
                }
                fmt.Println("====================\n")
            }
        }
    }()

    // 等待中断信号
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    fmt.Println("Plugin system started. Press Ctrl+C to stop.")
    fmt.Println("HTTP service available at: http://localhost:8080")

    <-sigChan
    fmt.Println("\nShutting down plugin system...")
}

小结 #

本节详细介绍了插件架构的设计和实现,包括:

  1. 插件接口设计:定义了灵活的插件接口体系
  2. 插件管理器:实现了完整的插件生命周期管理
  3. 事件总线系统:提供了插件间的通信机制
  4. 具体插件实现:展示了日志插件和 HTTP 服务插件的实现
  5. 完整示例:演示了如何构建一个完整的插件系统

插件架构为应用程序提供了极大的灵活性和可扩展性,但同时也增加了系统的复杂性。在设计插件系统时,需要仔细考虑接口稳定性、版本兼容性、安全性和性能等因素。在下一节中,我们将学习如何实现插件的动态加载功能。