4.7.3 插件架构设计 #
插件架构是一种软件设计模式,它允许应用程序在运行时动态加载和卸载功能模块。这种架构提供了极大的灵活性和可扩展性,使得应用程序可以根据需要添加新功能而无需重新编译整个程序。本节将详细介绍如何在 Go 中设计和实现插件架构。
插件架构概述 #
插件架构的优势 #
- 模块化:将功能分解为独立的模块
- 可扩展性:可以在不修改核心代码的情况下添加新功能
- 热插拔:支持运行时加载和卸载插件
- 第三方开发:允许第三方开发者扩展应用功能
- 版本管理:不同版本的插件可以独立管理
插件架构的挑战 #
- 接口设计:需要设计稳定的插件接口
- 版本兼容性:处理插件与主程序的版本兼容问题
- 安全性:确保插件不会危害系统安全
- 性能:动态加载可能带来性能开销
- 调试复杂性:增加了调试和错误处理的复杂性
插件接口设计 #
基础插件接口 #
package plugin
import (
"context"
"fmt"
)
// PluginInfo 插件信息
type PluginInfo struct {
Name string `json:"name"`
Version string `json:"version"`
Description string `json:"description"`
Author string `json:"author"`
License string `json:"license"`
Tags []string `json:"tags"`
Metadata map[string]string `json:"metadata"`
}
// PluginStatus 插件状态
type PluginStatus int
const (
StatusUnloaded PluginStatus = iota
StatusLoaded
StatusActive
StatusError
StatusDisabled
)
func (s PluginStatus) String() string {
switch s {
case StatusUnloaded:
return "unloaded"
case StatusLoaded:
return "loaded"
case StatusActive:
return "active"
case StatusError:
return "error"
case StatusDisabled:
return "disabled"
default:
return "unknown"
}
}
// Plugin 基础插件接口
type Plugin interface {
// 获取插件信息
Info() PluginInfo
// 初始化插件
Initialize(ctx context.Context, config map[string]interface{}) error
// 启动插件
Start(ctx context.Context) error
// 停止插件
Stop(ctx context.Context) error
// 清理插件资源
Cleanup() error
// 获取插件状态
Status() PluginStatus
// 健康检查
HealthCheck() error
}
// ConfigurablePlugin 可配置插件接口
type ConfigurablePlugin interface {
Plugin
// 获取配置模式
ConfigSchema() map[string]interface{}
// 更新配置
UpdateConfig(config map[string]interface{}) error
// 获取当前配置
GetConfig() map[string]interface{}
}
// EventPlugin 事件处理插件接口
type EventPlugin interface {
Plugin
// 处理事件
HandleEvent(ctx context.Context, event Event) error
// 获取支持的事件类型
SupportedEvents() []string
}
// ServicePlugin 服务插件接口
type ServicePlugin interface {
Plugin
// 提供服务
Serve(ctx context.Context, request ServiceRequest) (ServiceResponse, error)
// 获取服务描述
ServiceDescription() ServiceDescriptor
}
// Event 事件结构
type Event struct {
Type string `json:"type"`
Source string `json:"source"`
Timestamp int64 `json:"timestamp"`
Data map[string]interface{} `json:"data"`
Metadata map[string]string `json:"metadata"`
}
// ServiceRequest 服务请求
type ServiceRequest struct {
Method string `json:"method"`
Path string `json:"path"`
Headers map[string]string `json:"headers"`
Body []byte `json:"body"`
Params map[string]interface{} `json:"params"`
Context context.Context `json:"-"`
}
// ServiceResponse 服务响应
type ServiceResponse struct {
StatusCode int `json:"status_code"`
Headers map[string]string `json:"headers"`
Body []byte `json:"body"`
Error string `json:"error,omitempty"`
}
// ServiceDescriptor 服务描述符
type ServiceDescriptor struct {
Name string `json:"name"`
Version string `json:"version"`
Endpoints []EndpointInfo `json:"endpoints"`
Schema map[string]interface{} `json:"schema"`
Metadata map[string]string `json:"metadata"`
}
// EndpointInfo 端点信息
type EndpointInfo struct {
Method string `json:"method"`
Path string `json:"path"`
Description string `json:"description"`
Parameters []ParameterInfo `json:"parameters"`
Responses []ResponseInfo `json:"responses"`
}
// ParameterInfo 参数信息
type ParameterInfo struct {
Name string `json:"name"`
Type string `json:"type"`
Required bool `json:"required"`
Description string `json:"description"`
}
// ResponseInfo 响应信息
type ResponseInfo struct {
StatusCode int `json:"status_code"`
Description string `json:"description"`
Schema string `json:"schema"`
}
基础插件实现 #
package plugin
import (
"context"
"fmt"
"sync"
"time"
)
// BasePlugin 基础插件实现
type BasePlugin struct {
info PluginInfo
status PluginStatus
config map[string]interface{}
mutex sync.RWMutex
startTime time.Time
stopTime time.Time
}
// NewBasePlugin 创建基础插件
func NewBasePlugin(info PluginInfo) *BasePlugin {
return &BasePlugin{
info: info,
status: StatusUnloaded,
config: make(map[string]interface{}),
}
}
// Info 获取插件信息
func (bp *BasePlugin) Info() PluginInfo {
bp.mutex.RLock()
defer bp.mutex.RUnlock()
return bp.info
}
// Initialize 初始化插件
func (bp *BasePlugin) Initialize(ctx context.Context, config map[string]interface{}) error {
bp.mutex.Lock()
defer bp.mutex.Unlock()
if bp.status != StatusUnloaded {
return fmt.Errorf("plugin already initialized")
}
bp.config = config
bp.status = StatusLoaded
return nil
}
// Start 启动插件
func (bp *BasePlugin) Start(ctx context.Context) error {
bp.mutex.Lock()
defer bp.mutex.Unlock()
if bp.status != StatusLoaded {
return fmt.Errorf("plugin not loaded or already started")
}
bp.status = StatusActive
bp.startTime = time.Now()
return nil
}
// Stop 停止插件
func (bp *BasePlugin) Stop(ctx context.Context) error {
bp.mutex.Lock()
defer bp.mutex.Unlock()
if bp.status != StatusActive {
return fmt.Errorf("plugin not active")
}
bp.status = StatusLoaded
bp.stopTime = time.Now()
return nil
}
// Cleanup 清理插件资源
func (bp *BasePlugin) Cleanup() error {
bp.mutex.Lock()
defer bp.mutex.Unlock()
bp.status = StatusUnloaded
bp.config = make(map[string]interface{})
return nil
}
// Status 获取插件状态
func (bp *BasePlugin) Status() PluginStatus {
bp.mutex.RLock()
defer bp.mutex.RUnlock()
return bp.status
}
// HealthCheck 健康检查
func (bp *BasePlugin) HealthCheck() error {
bp.mutex.RLock()
defer bp.mutex.RUnlock()
if bp.status == StatusError {
return fmt.Errorf("plugin is in error state")
}
return nil
}
// SetStatus 设置插件状态
func (bp *BasePlugin) SetStatus(status PluginStatus) {
bp.mutex.Lock()
defer bp.mutex.Unlock()
bp.status = status
}
// GetConfig 获取配置
func (bp *BasePlugin) GetConfig() map[string]interface{} {
bp.mutex.RLock()
defer bp.mutex.RUnlock()
result := make(map[string]interface{})
for k, v := range bp.config {
result[k] = v
}
return result
}
// UpdateConfig 更新配置
func (bp *BasePlugin) UpdateConfig(config map[string]interface{}) error {
bp.mutex.Lock()
defer bp.mutex.Unlock()
for k, v := range config {
bp.config[k] = v
}
return nil
}
// Uptime 获取运行时间
func (bp *BasePlugin) Uptime() time.Duration {
bp.mutex.RLock()
defer bp.mutex.RUnlock()
if bp.status == StatusActive {
return time.Since(bp.startTime)
}
if !bp.stopTime.IsZero() {
return bp.stopTime.Sub(bp.startTime)
}
return 0
}
插件管理器 #
插件注册表 #
package plugin
import (
"context"
"fmt"
"sync"
)
// PluginRegistry 插件注册表
type PluginRegistry struct {
plugins map[string]Plugin
mutex sync.RWMutex
}
// NewPluginRegistry 创建插件注册表
func NewPluginRegistry() *PluginRegistry {
return &PluginRegistry{
plugins: make(map[string]Plugin),
}
}
// Register 注册插件
func (pr *PluginRegistry) Register(name string, plugin Plugin) error {
pr.mutex.Lock()
defer pr.mutex.Unlock()
if _, exists := pr.plugins[name]; exists {
return fmt.Errorf("plugin %s already registered", name)
}
pr.plugins[name] = plugin
return nil
}
// Unregister 注销插件
func (pr *PluginRegistry) Unregister(name string) error {
pr.mutex.Lock()
defer pr.mutex.Unlock()
plugin, exists := pr.plugins[name]
if !exists {
return fmt.Errorf("plugin %s not found", name)
}
// 停止并清理插件
if plugin.Status() == StatusActive {
plugin.Stop(context.Background())
}
plugin.Cleanup()
delete(pr.plugins, name)
return nil
}
// Get 获取插件
func (pr *PluginRegistry) Get(name string) (Plugin, error) {
pr.mutex.RLock()
defer pr.mutex.RUnlock()
plugin, exists := pr.plugins[name]
if !exists {
return nil, fmt.Errorf("plugin %s not found", name)
}
return plugin, nil
}
// List 列出所有插件
func (pr *PluginRegistry) List() []string {
pr.mutex.RLock()
defer pr.mutex.RUnlock()
names := make([]string, 0, len(pr.plugins))
for name := range pr.plugins {
names = append(names, name)
}
return names
}
// ListByStatus 按状态列出插件
func (pr *PluginRegistry) ListByStatus(status PluginStatus) []string {
pr.mutex.RLock()
defer pr.mutex.RUnlock()
var names []string
for name, plugin := range pr.plugins {
if plugin.Status() == status {
names = append(names, name)
}
}
return names
}
// GetPluginInfo 获取插件信息
func (pr *PluginRegistry) GetPluginInfo(name string) (PluginInfo, error) {
plugin, err := pr.Get(name)
if err != nil {
return PluginInfo{}, err
}
return plugin.Info(), nil
}
// GetAllPluginInfo 获取所有插件信息
func (pr *PluginRegistry) GetAllPluginInfo() map[string]PluginInfo {
pr.mutex.RLock()
defer pr.mutex.RUnlock()
info := make(map[string]PluginInfo)
for name, plugin := range pr.plugins {
info[name] = plugin.Info()
}
return info
}
插件管理器 #
package plugin
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// PluginManager 插件管理器
type PluginManager struct {
registry *PluginRegistry
eventBus *EventBus
config ManagerConfig
mutex sync.RWMutex
running bool
stopChan chan struct{}
healthTicker *time.Ticker
}
// ManagerConfig 管理器配置
type ManagerConfig struct {
HealthCheckInterval time.Duration
StartTimeout time.Duration
StopTimeout time.Duration
EnableAutoRestart bool
MaxRestartAttempts int
}
// DefaultManagerConfig 默认管理器配置
func DefaultManagerConfig() ManagerConfig {
return ManagerConfig{
HealthCheckInterval: 30 * time.Second,
StartTimeout: 10 * time.Second,
StopTimeout: 10 * time.Second,
EnableAutoRestart: true,
MaxRestartAttempts: 3,
}
}
// NewPluginManager 创建插件管理器
func NewPluginManager(config ManagerConfig) *PluginManager {
return &PluginManager{
registry: NewPluginRegistry(),
eventBus: NewEventBus(),
config: config,
stopChan: make(chan struct{}),
}
}
// Start 启动插件管理器
func (pm *PluginManager) Start(ctx context.Context) error {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if pm.running {
return fmt.Errorf("plugin manager already running")
}
pm.running = true
// 启动健康检查
if pm.config.HealthCheckInterval > 0 {
pm.healthTicker = time.NewTicker(pm.config.HealthCheckInterval)
go pm.healthCheckLoop()
}
return nil
}
// Stop 停止插件管理器
func (pm *PluginManager) Stop(ctx context.Context) error {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if !pm.running {
return fmt.Errorf("plugin manager not running")
}
pm.running = false
close(pm.stopChan)
if pm.healthTicker != nil {
pm.healthTicker.Stop()
}
// 停止所有活跃插件
activePlugins := pm.registry.ListByStatus(StatusActive)
for _, name := range activePlugins {
if err := pm.StopPlugin(ctx, name); err != nil {
log.Printf("Error stopping plugin %s: %v", name, err)
}
}
return nil
}
// LoadPlugin 加载插件
func (pm *PluginManager) LoadPlugin(name string, plugin Plugin, config map[string]interface{}) error {
pm.mutex.Lock()
defer pm.mutex.Unlock()
// 注册插件
if err := pm.registry.Register(name, plugin); err != nil {
return err
}
// 初始化插件
ctx, cancel := context.WithTimeout(context.Background(), pm.config.StartTimeout)
defer cancel()
if err := plugin.Initialize(ctx, config); err != nil {
pm.registry.Unregister(name)
return fmt.Errorf("failed to initialize plugin %s: %v", name, err)
}
// 发送插件加载事件
pm.eventBus.Publish(Event{
Type: "plugin.loaded",
Source: "plugin_manager",
Data: map[string]interface{}{
"plugin_name": name,
"plugin_info": plugin.Info(),
},
})
return nil
}
// UnloadPlugin 卸载插件
func (pm *PluginManager) UnloadPlugin(name string) error {
pm.mutex.Lock()
defer pm.mutex.Unlock()
plugin, err := pm.registry.Get(name)
if err != nil {
return err
}
// 如果插件正在运行,先停止它
if plugin.Status() == StatusActive {
ctx, cancel := context.WithTimeout(context.Background(), pm.config.StopTimeout)
defer cancel()
if err := plugin.Stop(ctx); err != nil {
log.Printf("Error stopping plugin %s: %v", name, err)
}
}
// 注销插件
if err := pm.registry.Unregister(name); err != nil {
return err
}
// 发送插件卸载事件
pm.eventBus.Publish(Event{
Type: "plugin.unloaded",
Source: "plugin_manager",
Data: map[string]interface{}{
"plugin_name": name,
},
})
return nil
}
// StartPlugin 启动插件
func (pm *PluginManager) StartPlugin(ctx context.Context, name string) error {
plugin, err := pm.registry.Get(name)
if err != nil {
return err
}
if plugin.Status() != StatusLoaded {
return fmt.Errorf("plugin %s is not in loaded state", name)
}
startCtx, cancel := context.WithTimeout(ctx, pm.config.StartTimeout)
defer cancel()
if err := plugin.Start(startCtx); err != nil {
return fmt.Errorf("failed to start plugin %s: %v", name, err)
}
// 发送插件启动事件
pm.eventBus.Publish(Event{
Type: "plugin.started",
Source: "plugin_manager",
Data: map[string]interface{}{
"plugin_name": name,
},
})
return nil
}
// StopPlugin 停止插件
func (pm *PluginManager) StopPlugin(ctx context.Context, name string) error {
plugin, err := pm.registry.Get(name)
if err != nil {
return err
}
if plugin.Status() != StatusActive {
return fmt.Errorf("plugin %s is not active", name)
}
stopCtx, cancel := context.WithTimeout(ctx, pm.config.StopTimeout)
defer cancel()
if err := plugin.Stop(stopCtx); err != nil {
return fmt.Errorf("failed to stop plugin %s: %v", name, err)
}
// 发送插件停止事件
pm.eventBus.Publish(Event{
Type: "plugin.stopped",
Source: "plugin_manager",
Data: map[string]interface{}{
"plugin_name": name,
},
})
return nil
}
// RestartPlugin 重启插件
func (pm *PluginManager) RestartPlugin(ctx context.Context, name string) error {
if err := pm.StopPlugin(ctx, name); err != nil {
return err
}
return pm.StartPlugin(ctx, name)
}
// GetPluginStatus 获取插件状态
func (pm *PluginManager) GetPluginStatus(name string) (PluginStatus, error) {
plugin, err := pm.registry.Get(name)
if err != nil {
return StatusUnloaded, err
}
return plugin.Status(), nil
}
// ListPlugins 列出所有插件
func (pm *PluginManager) ListPlugins() []string {
return pm.registry.List()
}
// GetPluginInfo 获取插件信息
func (pm *PluginManager) GetPluginInfo(name string) (PluginInfo, error) {
return pm.registry.GetPluginInfo(name)
}
// GetAllPluginInfo 获取所有插件信息
func (pm *PluginManager) GetAllPluginInfo() map[string]PluginInfo {
return pm.registry.GetAllPluginInfo()
}
// healthCheckLoop 健康检查循环
func (pm *PluginManager) healthCheckLoop() {
for {
select {
case <-pm.stopChan:
return
case <-pm.healthTicker.C:
pm.performHealthCheck()
}
}
}
// performHealthCheck 执行健康检查
func (pm *PluginManager) performHealthCheck() {
activePlugins := pm.registry.ListByStatus(StatusActive)
for _, name := range activePlugins {
plugin, err := pm.registry.Get(name)
if err != nil {
continue
}
if err := plugin.HealthCheck(); err != nil {
log.Printf("Plugin %s health check failed: %v", name, err)
// 设置插件状态为错误
if bp, ok := plugin.(*BasePlugin); ok {
bp.SetStatus(StatusError)
}
// 发送健康检查失败事件
pm.eventBus.Publish(Event{
Type: "plugin.health_check_failed",
Source: "plugin_manager",
Data: map[string]interface{}{
"plugin_name": name,
"error": err.Error(),
},
})
// 如果启用了自动重启,尝试重启插件
if pm.config.EnableAutoRestart {
go pm.attemptRestart(name)
}
}
}
}
// attemptRestart 尝试重启插件
func (pm *PluginManager) attemptRestart(name string) {
for attempt := 1; attempt <= pm.config.MaxRestartAttempts; attempt++ {
log.Printf("Attempting to restart plugin %s (attempt %d/%d)",
name, attempt, pm.config.MaxRestartAttempts)
ctx, cancel := context.WithTimeout(context.Background(), pm.config.StartTimeout)
err := pm.RestartPlugin(ctx, name)
cancel()
if err == nil {
log.Printf("Successfully restarted plugin %s", name)
// 发送插件重启成功事件
pm.eventBus.Publish(Event{
Type: "plugin.restarted",
Source: "plugin_manager",
Data: map[string]interface{}{
"plugin_name": name,
"attempt": attempt,
},
})
return
}
log.Printf("Failed to restart plugin %s (attempt %d): %v", name, attempt, err)
if attempt < pm.config.MaxRestartAttempts {
time.Sleep(time.Duration(attempt) * time.Second)
}
}
log.Printf("Failed to restart plugin %s after %d attempts", name, pm.config.MaxRestartAttempts)
// 发送插件重启失败事件
pm.eventBus.Publish(Event{
Type: "plugin.restart_failed",
Source: "plugin_manager",
Data: map[string]interface{}{
"plugin_name": name,
"attempts": pm.config.MaxRestartAttempts,
},
})
}
// SubscribeToEvents 订阅事件
func (pm *PluginManager) SubscribeToEvents(eventType string, handler EventHandler) {
pm.eventBus.Subscribe(eventType, handler)
}
// PublishEvent 发布事件
func (pm *PluginManager) PublishEvent(event Event) {
pm.eventBus.Publish(event)
}
事件总线系统 #
事件总线实现 #
package plugin
import (
"sync"
)
// EventHandler 事件处理器
type EventHandler func(event Event)
// EventBus 事件总线
type EventBus struct {
handlers map[string][]EventHandler
mutex sync.RWMutex
}
// NewEventBus 创建事件总线
func NewEventBus() *EventBus {
return &EventBus{
handlers: make(map[string][]EventHandler),
}
}
// Subscribe 订阅事件
func (eb *EventBus) Subscribe(eventType string, handler EventHandler) {
eb.mutex.Lock()
defer eb.mutex.Unlock()
eb.handlers[eventType] = append(eb.handlers[eventType], handler)
}
// Unsubscribe 取消订阅(简化版,实际实现可能需要更复杂的逻辑)
func (eb *EventBus) Unsubscribe(eventType string) {
eb.mutex.Lock()
defer eb.mutex.Unlock()
delete(eb.handlers, eventType)
}
// Publish 发布事件
func (eb *EventBus) Publish(event Event) {
eb.mutex.RLock()
handlers, exists := eb.handlers[event.Type]
eb.mutex.RUnlock()
if !exists {
return
}
// 异步处理事件
for _, handler := range handlers {
go func(h EventHandler) {
defer func() {
if r := recover(); r != nil {
// 处理 panic,避免影响其他处理器
log.Printf("Event handler panic: %v", r)
}
}()
h(event)
}(handler)
}
}
// PublishSync 同步发布事件
func (eb *EventBus) PublishSync(event Event) {
eb.mutex.RLock()
handlers, exists := eb.handlers[event.Type]
eb.mutex.RUnlock()
if !exists {
return
}
for _, handler := range handlers {
func(h EventHandler) {
defer func() {
if r := recover(); r != nil {
log.Printf("Event handler panic: %v", r)
}
}()
h(event)
}(handler)
}
}
// GetSubscriberCount 获取订阅者数量
func (eb *EventBus) GetSubscriberCount(eventType string) int {
eb.mutex.RLock()
defer eb.mutex.RUnlock()
return len(eb.handlers[eventType])
}
// GetAllEventTypes 获取所有事件类型
func (eb *EventBus) GetAllEventTypes() []string {
eb.mutex.RLock()
defer eb.mutex.RUnlock()
types := make([]string, 0, len(eb.handlers))
for eventType := range eb.handlers {
types = append(types, eventType)
}
return types
}
具体插件实现示例 #
日志插件示例 #
package plugins
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"time"
"your-project/plugin"
)
// LoggerPlugin 日志插件
type LoggerPlugin struct {
*plugin.BasePlugin
logFile *os.File
logger *log.Logger
logLevel string
logFormat string
}
// NewLoggerPlugin 创建日志插件
func NewLoggerPlugin() *LoggerPlugin {
info := plugin.PluginInfo{
Name: "logger",
Version: "1.0.0",
Description: "File logging plugin",
Author: "System",
License: "MIT",
Tags: []string{"logging", "file"},
}
return &LoggerPlugin{
BasePlugin: plugin.NewBasePlugin(info),
}
}
// Initialize 初始化插件
func (lp *LoggerPlugin) Initialize(ctx context.Context, config map[string]interface{}) error {
if err := lp.BasePlugin.Initialize(ctx, config); err != nil {
return err
}
// 解析配置
logPath, ok := config["log_path"].(string)
if !ok {
logPath = "logs/app.log"
}
lp.logLevel, _ = config["log_level"].(string)
if lp.logLevel == "" {
lp.logLevel = "INFO"
}
lp.logFormat, _ = config["log_format"].(string)
if lp.logFormat == "" {
lp.logFormat = "2006-01-02 15:04:05"
}
// 创建日志目录
if err := os.MkdirAll(filepath.Dir(logPath), 0755); err != nil {
return fmt.Errorf("failed to create log directory: %v", err)
}
// 打开日志文件
file, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return fmt.Errorf("failed to open log file: %v", err)
}
lp.logFile = file
lp.logger = log.New(file, "", 0)
return nil
}
// Start 启动插件
func (lp *LoggerPlugin) Start(ctx context.Context) error {
if err := lp.BasePlugin.Start(ctx); err != nil {
return err
}
lp.logger.Printf("[%s] Logger plugin started", time.Now().Format(lp.logFormat))
return nil
}
// Stop 停止插件
func (lp *LoggerPlugin) Stop(ctx context.Context) error {
if err := lp.BasePlugin.Stop(ctx); err != nil {
return err
}
lp.logger.Printf("[%s] Logger plugin stopped", time.Now().Format(lp.logFormat))
return nil
}
// Cleanup 清理资源
func (lp *LoggerPlugin) Cleanup() error {
if lp.logFile != nil {
lp.logFile.Close()
lp.logFile = nil
}
return lp.BasePlugin.Cleanup()
}
// HandleEvent 处理事件
func (lp *LoggerPlugin) HandleEvent(ctx context.Context, event plugin.Event) error {
if lp.Status() != plugin.StatusActive {
return fmt.Errorf("plugin not active")
}
timestamp := time.Unix(event.Timestamp, 0).Format(lp.logFormat)
logEntry := fmt.Sprintf("[%s] Event: %s from %s - %v",
timestamp, event.Type, event.Source, event.Data)
lp.logger.Println(logEntry)
return nil
}
// SupportedEvents 支持的事件类型
func (lp *LoggerPlugin) SupportedEvents() []string {
return []string{"*"} // 支持所有事件类型
}
// ConfigSchema 配置模式
func (lp *LoggerPlugin) ConfigSchema() map[string]interface{} {
return map[string]interface{}{
"log_path": map[string]interface{}{
"type": "string",
"description": "Log file path",
"default": "logs/app.log",
},
"log_level": map[string]interface{}{
"type": "string",
"description": "Log level",
"enum": []string{"DEBUG", "INFO", "WARN", "ERROR"},
"default": "INFO",
},
"log_format": map[string]interface{}{
"type": "string",
"description": "Timestamp format",
"default": "2006-01-02 15:04:05",
},
}
}
HTTP 服务插件示例 #
package plugins
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
"your-project/plugin"
)
// HTTPServicePlugin HTTP 服务插件
type HTTPServicePlugin struct {
*plugin.BasePlugin
server *http.Server
port int
}
// NewHTTPServicePlugin 创建 HTTP 服务插件
func NewHTTPServicePlugin() *HTTPServicePlugin {
info := plugin.PluginInfo{
Name: "http_service",
Version: "1.0.0",
Description: "HTTP service plugin",
Author: "System",
License: "MIT",
Tags: []string{"http", "service", "api"},
}
return &HTTPServicePlugin{
BasePlugin: plugin.NewBasePlugin(info),
}
}
// Initialize 初始化插件
func (hsp *HTTPServicePlugin) Initialize(ctx context.Context, config map[string]interface{}) error {
if err := hsp.BasePlugin.Initialize(ctx, config); err != nil {
return err
}
// 解析端口配置
if portVal, ok := config["port"]; ok {
switch v := portVal.(type) {
case int:
hsp.port = v
case float64:
hsp.port = int(v)
case string:
if p, err := strconv.Atoi(v); err == nil {
hsp.port = p
}
}
}
if hsp.port == 0 {
hsp.port = 8080
}
// 创建 HTTP 服务器
mux := http.NewServeMux()
hsp.setupRoutes(mux)
hsp.server = &http.Server{
Addr: fmt.Sprintf(":%d", hsp.port),
Handler: mux,
}
return nil
}
// Start 启动插件
func (hsp *HTTPServicePlugin) Start(ctx context.Context) error {
if err := hsp.BasePlugin.Start(ctx); err != nil {
return err
}
// 启动 HTTP 服务器
go func() {
if err := hsp.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
hsp.SetStatus(plugin.StatusError)
}
}()
return nil
}
// Stop 停止插件
func (hsp *HTTPServicePlugin) Stop(ctx context.Context) error {
if err := hsp.BasePlugin.Stop(ctx); err != nil {
return err
}
// 停止 HTTP 服务器
if hsp.server != nil {
return hsp.server.Shutdown(ctx)
}
return nil
}
// setupRoutes 设置路由
func (hsp *HTTPServicePlugin) setupRoutes(mux *http.ServeMux) {
mux.HandleFunc("/health", hsp.healthHandler)
mux.HandleFunc("/info", hsp.infoHandler)
mux.HandleFunc("/config", hsp.configHandler)
}
// healthHandler 健康检查处理器
func (hsp *HTTPServicePlugin) healthHandler(w http.ResponseWriter, r *http.Request) {
response := map[string]interface{}{
"status": "healthy",
"timestamp": time.Now().Unix(),
"uptime": hsp.Uptime().Seconds(),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// infoHandler 信息处理器
func (hsp *HTTPServicePlugin) infoHandler(w http.ResponseWriter, r *http.Request) {
info := hsp.Info()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(info)
}
// configHandler 配置处理器
func (hsp *HTTPServicePlugin) configHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
config := hsp.GetConfig()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(config)
case http.MethodPut:
var newConfig map[string]interface{}
if err := json.NewDecoder(r.Body).Decode(&newConfig); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := hsp.UpdateConfig(newConfig); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
// Serve 提供服务
func (hsp *HTTPServicePlugin) Serve(ctx context.Context, request plugin.ServiceRequest) (plugin.ServiceResponse, error) {
// 这里可以实现更复杂的服务逻辑
response := plugin.ServiceResponse{
StatusCode: 200,
Headers: map[string]string{"Content-Type": "application/json"},
Body: []byte(`{"message": "Hello from HTTP service plugin"}`),
}
return response, nil
}
// ServiceDescription 服务描述
func (hsp *HTTPServicePlugin) ServiceDescription() plugin.ServiceDescriptor {
return plugin.ServiceDescriptor{
Name: "http_service",
Version: "1.0.0",
Endpoints: []plugin.EndpointInfo{
{
Method: "GET",
Path: "/health",
Description: "Health check endpoint",
Responses: []plugin.ResponseInfo{
{StatusCode: 200, Description: "Healthy"},
},
},
{
Method: "GET",
Path: "/info",
Description: "Plugin information endpoint",
Responses: []plugin.ResponseInfo{
{StatusCode: 200, Description: "Plugin info"},
},
},
},
}
}
// ConfigSchema 配置模式
func (hsp *HTTPServicePlugin) ConfigSchema() map[string]interface{} {
return map[string]interface{}{
"port": map[string]interface{}{
"type": "integer",
"description": "HTTP server port",
"default": 8080,
"minimum": 1024,
"maximum": 65535,
},
}
}
使用示例 #
完整的插件系统示例 #
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"your-project/plugin"
"your-project/plugins"
)
func main() {
// 创建插件管理器
config := plugin.DefaultManagerConfig()
config.HealthCheckInterval = 10 * time.Second
manager := plugin.NewPluginManager(config)
// 启动插件管理器
ctx := context.Background()
if err := manager.Start(ctx); err != nil {
log.Fatalf("Failed to start plugin manager: %v", err)
}
defer manager.Stop(ctx)
// 订阅插件事件
manager.SubscribeToEvents("plugin.loaded", func(event plugin.Event) {
fmt.Printf("Plugin loaded: %s\n", event.Data["plugin_name"])
})
manager.SubscribeToEvents("plugin.started", func(event plugin.Event) {
fmt.Printf("Plugin started: %s\n", event.Data["plugin_name"])
})
manager.SubscribeToEvents("plugin.health_check_failed", func(event plugin.Event) {
fmt.Printf("Plugin health check failed: %s - %s\n",
event.Data["plugin_name"], event.Data["error"])
})
// 加载日志插件
loggerPlugin := plugins.NewLoggerPlugin()
loggerConfig := map[string]interface{}{
"log_path": "logs/system.log",
"log_level": "INFO",
"log_format": "2006-01-02 15:04:05",
}
if err := manager.LoadPlugin("logger", loggerPlugin, loggerConfig); err != nil {
log.Fatalf("Failed to load logger plugin: %v", err)
}
// 启动日志插件
if err := manager.StartPlugin(ctx, "logger"); err != nil {
log.Fatalf("Failed to start logger plugin: %v", err)
}
// 加载 HTTP 服务插件
httpPlugin := plugins.NewHTTPServicePlugin()
httpConfig := map[string]interface{}{
"port": 8080,
}
if err := manager.LoadPlugin("http_service", httpPlugin, httpConfig); err != nil {
log.Fatalf("Failed to load HTTP service plugin: %v", err)
}
// 启动 HTTP 服务插件
if err := manager.StartPlugin(ctx, "http_service"); err != nil {
log.Fatalf("Failed to start HTTP service plugin: %v", err)
}
// 发布一些测试事件
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
counter := 0
for {
select {
case <-ticker.C:
counter++
manager.PublishEvent(plugin.Event{
Type: "test.event",
Source: "main",
Timestamp: time.Now().Unix(),
Data: map[string]interface{}{
"counter": counter,
"message": fmt.Sprintf("Test event #%d", counter),
},
})
}
}
}()
// 打印插件状态
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Println("\n=== Plugin Status ===")
allInfo := manager.GetAllPluginInfo()
for name, info := range allInfo {
status, _ := manager.GetPluginStatus(name)
fmt.Printf("Plugin: %s (v%s) - Status: %s\n",
info.Name, info.Version, status)
}
fmt.Println("====================\n")
}
}
}()
// 等待中断信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
fmt.Println("Plugin system started. Press Ctrl+C to stop.")
fmt.Println("HTTP service available at: http://localhost:8080")
<-sigChan
fmt.Println("\nShutting down plugin system...")
}
小结 #
本节详细介绍了插件架构的设计和实现,包括:
- 插件接口设计:定义了灵活的插件接口体系
- 插件管理器:实现了完整的插件生命周期管理
- 事件总线系统:提供了插件间的通信机制
- 具体插件实现:展示了日志插件和 HTTP 服务插件的实现
- 完整示例:演示了如何构建一个完整的插件系统
插件架构为应用程序提供了极大的灵活性和可扩展性,但同时也增加了系统的复杂性。在设计插件系统时,需要仔细考虑接口稳定性、版本兼容性、安全性和性能等因素。在下一节中,我们将学习如何实现插件的动态加载功能。