5.6.4 事件驱动架构

5.6.4 事件驱动架构 #

事件驱动架构(Event-Driven Architecture, EDA)是一种基于事件的软件架构模式,系统组件通过产生和响应事件来进行通信。这种架构模式能够实现高度解耦、可扩展和响应式的分布式系统。本节将深入探讨事件驱动架构的设计原则、实现模式和最佳实践。

事件驱动架构基础 #

核心概念 #

// 事件接口定义
type Event interface {
    EventID() string
    EventType() string
    AggregateID() string
    Version() int64
    Timestamp() time.Time
    Data() interface{}
    Metadata() map[string]interface{}
}

// 基础事件实现
type BaseEvent struct {
    ID          string                 `json:"id"`
    Type        string                 `json:"type"`
    AggregateId string                 `json:"aggregate_id"`
    Ver         int64                  `json:"version"`
    Time        time.Time              `json:"timestamp"`
    EventData   interface{}            `json:"data"`
    Meta        map[string]interface{} `json:"metadata"`
}

func (e *BaseEvent) EventID() string                    { return e.ID }
func (e *BaseEvent) EventType() string                  { return e.Type }
func (e *BaseEvent) AggregateID() string                { return e.AggregateId }
func (e *BaseEvent) Version() int64                     { return e.Ver }
func (e *BaseEvent) Timestamp() time.Time               { return e.Time }
func (e *BaseEvent) Data() interface{}                  { return e.EventData }
func (e *BaseEvent) Metadata() map[string]interface{}   { return e.Meta }

// 事件构建器
type EventBuilder struct {
    event *BaseEvent
}

func NewEventBuilder() *EventBuilder {
    return &EventBuilder{
        event: &BaseEvent{
            ID:   uuid.New().String(),
            Time: time.Now(),
            Meta: make(map[string]interface{}),
        },
    }
}

func (eb *EventBuilder) WithType(eventType string) *EventBuilder {
    eb.event.Type = eventType
    return eb
}

func (eb *EventBuilder) WithAggregateID(aggregateID string) *EventBuilder {
    eb.event.AggregateId = aggregateID
    return eb
}

func (eb *EventBuilder) WithVersion(version int64) *EventBuilder {
    eb.event.Ver = version
    return eb
}

func (eb *EventBuilder) WithData(data interface{}) *EventBuilder {
    eb.event.EventData = data
    return eb
}

func (eb *EventBuilder) WithMetadata(key string, value interface{}) *EventBuilder {
    eb.event.Meta[key] = value
    return eb
}

func (eb *EventBuilder) Build() Event {
    return eb.event
}

// 事件存储接口
type EventStore interface {
    SaveEvents(aggregateID string, events []Event, expectedVersion int64) error
    GetEvents(aggregateID string, fromVersion int64) ([]Event, error)
    GetAllEvents(fromPosition int64, maxCount int) ([]Event, error)
    Subscribe(eventTypes []string, handler EventHandler) error
    Unsubscribe(handler EventHandler) error
}

// 事件处理器接口
type EventHandler interface {
    Handle(ctx context.Context, event Event) error
    EventTypes() []string
}

// 函数式事件处理器
type EventHandlerFunc func(ctx context.Context, event Event) error

func (f EventHandlerFunc) Handle(ctx context.Context, event Event) error {
    return f(ctx, event)
}

func (f EventHandlerFunc) EventTypes() []string {
    return []string{"*"} // 处理所有事件类型
}

事件总线实现 #

// 事件总线接口
type EventBus interface {
    Publish(ctx context.Context, event Event) error
    Subscribe(eventType string, handler EventHandler) error
    Unsubscribe(eventType string, handler EventHandler) error
    Close() error
}

// 内存事件总线实现
type InMemoryEventBus struct {
    handlers map[string][]EventHandler
    mu       sync.RWMutex
    logger   *log.Logger
}

func NewInMemoryEventBus() *InMemoryEventBus {
    return &InMemoryEventBus{
        handlers: make(map[string][]EventHandler),
        logger:   log.New(os.Stdout, "[EventBus] ", log.LstdFlags),
    }
}

func (bus *InMemoryEventBus) Publish(ctx context.Context, event Event) error {
    bus.mu.RLock()
    defer bus.mu.RUnlock()

    eventType := event.EventType()

    // 获取特定事件类型的处理器
    if handlers, exists := bus.handlers[eventType]; exists {
        for _, handler := range handlers {
            go func(h EventHandler) {
                if err := h.Handle(ctx, event); err != nil {
                    bus.logger.Printf("事件处理失败: event_type=%s, error=%v", eventType, err)
                }
            }(handler)
        }
    }

    // 获取通配符处理器
    if handlers, exists := bus.handlers["*"]; exists {
        for _, handler := range handlers {
            go func(h EventHandler) {
                if err := h.Handle(ctx, event); err != nil {
                    bus.logger.Printf("事件处理失败: event_type=%s, error=%v", eventType, err)
                }
            }(handler)
        }
    }

    bus.logger.Printf("事件发布成功: event_id=%s, event_type=%s", event.EventID(), eventType)
    return nil
}

func (bus *InMemoryEventBus) Subscribe(eventType string, handler EventHandler) error {
    bus.mu.Lock()
    defer bus.mu.Unlock()

    if _, exists := bus.handlers[eventType]; !exists {
        bus.handlers[eventType] = make([]EventHandler, 0)
    }

    bus.handlers[eventType] = append(bus.handlers[eventType], handler)
    bus.logger.Printf("事件处理器订阅成功: event_type=%s", eventType)
    return nil
}

func (bus *InMemoryEventBus) Unsubscribe(eventType string, handler EventHandler) error {
    bus.mu.Lock()
    defer bus.mu.Unlock()

    if handlers, exists := bus.handlers[eventType]; exists {
        for i, h := range handlers {
            if h == handler {
                bus.handlers[eventType] = append(handlers[:i], handlers[i+1:]...)
                break
            }
        }

        if len(bus.handlers[eventType]) == 0 {
            delete(bus.handlers, eventType)
        }
    }

    return nil
}

func (bus *InMemoryEventBus) Close() error {
    bus.mu.Lock()
    defer bus.mu.Unlock()

    bus.handlers = make(map[string][]EventHandler)
    return nil
}

事件溯源 (Event Sourcing) #

聚合根实现 #

// 聚合根接口
type AggregateRoot interface {
    AggregateID() string
    Version() int64
    UncommittedEvents() []Event
    MarkEventsAsCommitted()
    LoadFromHistory(events []Event) error
}

// 基础聚合根
type BaseAggregateRoot struct {
    id                string
    version           int64
    uncommittedEvents []Event
}

func NewBaseAggregateRoot(id string) *BaseAggregateRoot {
    return &BaseAggregateRoot{
        id:                id,
        version:           0,
        uncommittedEvents: make([]Event, 0),
    }
}

func (ar *BaseAggregateRoot) AggregateID() string {
    return ar.id
}

func (ar *BaseAggregateRoot) Version() int64 {
    return ar.version
}

func (ar *BaseAggregateRoot) UncommittedEvents() []Event {
    return ar.uncommittedEvents
}

func (ar *BaseAggregateRoot) MarkEventsAsCommitted() {
    ar.uncommittedEvents = make([]Event, 0)
}

func (ar *BaseAggregateRoot) LoadFromHistory(events []Event) error {
    for _, event := range events {
        if err := ar.applyEvent(event); err != nil {
            return err
        }
        ar.version = event.Version()
    }
    return nil
}

func (ar *BaseAggregateRoot) RaiseEvent(eventType string, data interface{}) {
    event := NewEventBuilder().
        WithType(eventType).
        WithAggregateID(ar.id).
        WithVersion(ar.version + 1).
        WithData(data).
        Build()

    ar.uncommittedEvents = append(ar.uncommittedEvents, event)
    ar.applyEvent(event)
    ar.version++
}

func (ar *BaseAggregateRoot) applyEvent(event Event) error {
    // 子类需要重写此方法来处理具体的事件
    return nil
}

// 用户聚合示例
type User struct {
    *BaseAggregateRoot
    Name   string
    Email  string
    Status string
}

func NewUser(id, name, email string) *User {
    user := &User{
        BaseAggregateRoot: NewBaseAggregateRoot(id),
        Name:              name,
        Email:             email,
        Status:            "active",
    }

    // 触发用户创建事件
    user.RaiseEvent("UserCreated", map[string]interface{}{
        "name":  name,
        "email": email,
    })

    return user
}

func (u *User) ChangeEmail(newEmail string) error {
    if newEmail == u.Email {
        return fmt.Errorf("新邮箱与当前邮箱相同")
    }

    u.RaiseEvent("EmailChanged", map[string]interface{}{
        "old_email": u.Email,
        "new_email": newEmail,
    })

    return nil
}

func (u *User) Deactivate() {
    if u.Status != "active" {
        return
    }

    u.RaiseEvent("UserDeactivated", map[string]interface{}{
        "user_id": u.AggregateID(),
    })
}

func (u *User) applyEvent(event Event) error {
    switch event.EventType() {
    case "UserCreated":
        data := event.Data().(map[string]interface{})
        u.Name = data["name"].(string)
        u.Email = data["email"].(string)
        u.Status = "active"

    case "EmailChanged":
        data := event.Data().(map[string]interface{})
        u.Email = data["new_email"].(string)

    case "UserDeactivated":
        u.Status = "inactive"

    default:
        return fmt.Errorf("未知事件类型: %s", event.EventType())
    }

    return nil
}

事件存储实现 #

// 基于文件的事件存储
type FileEventStore struct {
    dataDir    string
    eventFile  *os.File
    indexFile  *os.File
    mu         sync.RWMutex
    handlers   map[string][]EventHandler
    position   int64
}

func NewFileEventStore(dataDir string) (*FileEventStore, error) {
    if err := os.MkdirAll(dataDir, 0755); err != nil {
        return nil, err
    }

    eventPath := filepath.Join(dataDir, "events.log")
    indexPath := filepath.Join(dataDir, "index.log")

    eventFile, err := os.OpenFile(eventPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
    if err != nil {
        return nil, err
    }

    indexFile, err := os.OpenFile(indexPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
    if err != nil {
        eventFile.Close()
        return nil, err
    }

    store := &FileEventStore{
        dataDir:   dataDir,
        eventFile: eventFile,
        indexFile: indexFile,
        handlers:  make(map[string][]EventHandler),
        position:  0,
    }

    // 加载当前位置
    if err := store.loadPosition(); err != nil {
        return nil, err
    }

    return store, nil
}

func (fs *FileEventStore) SaveEvents(aggregateID string, events []Event, expectedVersion int64) error {
    fs.mu.Lock()
    defer fs.mu.Unlock()

    // 验证版本
    currentVersion, err := fs.getCurrentVersion(aggregateID)
    if err != nil {
        return err
    }

    if currentVersion != expectedVersion {
        return fmt.Errorf("版本冲突: expected=%d, current=%d", expectedVersion, currentVersion)
    }

    // 保存事件
    for _, event := range events {
        eventData, err := json.Marshal(event)
        if err != nil {
            return err
        }

        // 写入事件数据
        eventLine := fmt.Sprintf("%s\n", string(eventData))
        if _, err := fs.eventFile.WriteString(eventLine); err != nil {
            return err
        }

        // 写入索引
        indexLine := fmt.Sprintf("%s:%d:%s\n", aggregateID, event.Version(), event.EventID())
        if _, err := fs.indexFile.WriteString(indexLine); err != nil {
            return err
        }

        fs.position++
    }

    // 强制刷新到磁盘
    fs.eventFile.Sync()
    fs.indexFile.Sync()

    // 通知订阅者
    for _, event := range events {
        fs.notifyHandlers(event)
    }

    return nil
}

func (fs *FileEventStore) GetEvents(aggregateID string, fromVersion int64) ([]Event, error) {
    fs.mu.RLock()
    defer fs.mu.RUnlock()

    var events []Event

    // 读取事件文件
    fs.eventFile.Seek(0, io.SeekStart)
    scanner := bufio.NewScanner(fs.eventFile)

    for scanner.Scan() {
        line := scanner.Text()
        if line == "" {
            continue
        }

        var event BaseEvent
        if err := json.Unmarshal([]byte(line), &event); err != nil {
            continue
        }

        if event.AggregateId == aggregateID && event.Ver >= fromVersion {
            events = append(events, &event)
        }
    }

    return events, scanner.Err()
}

func (fs *FileEventStore) Subscribe(eventTypes []string, handler EventHandler) error {
    fs.mu.Lock()
    defer fs.mu.Unlock()

    for _, eventType := range eventTypes {
        if _, exists := fs.handlers[eventType]; !exists {
            fs.handlers[eventType] = make([]EventHandler, 0)
        }
        fs.handlers[eventType] = append(fs.handlers[eventType], handler)
    }

    return nil
}

func (fs *FileEventStore) notifyHandlers(event Event) {
    eventType := event.EventType()

    if handlers, exists := fs.handlers[eventType]; exists {
        for _, handler := range handlers {
            go func(h EventHandler) {
                ctx := context.Background()
                if err := h.Handle(ctx, event); err != nil {
                    log.Printf("事件处理失败: %v", err)
                }
            }(handler)
        }
    }

    // 通知通配符处理器
    if handlers, exists := fs.handlers["*"]; exists {
        for _, handler := range handlers {
            go func(h EventHandler) {
                ctx := context.Background()
                if err := h.Handle(ctx, event); err != nil {
                    log.Printf("事件处理失败: %v", err)
                }
            }(handler)
        }
    }
}

func (fs *FileEventStore) getCurrentVersion(aggregateID string) (int64, error) {
    // 从索引文件中查找最新版本
    fs.indexFile.Seek(0, io.SeekStart)
    scanner := bufio.NewScanner(fs.indexFile)

    var maxVersion int64 = -1

    for scanner.Scan() {
        line := scanner.Text()
        parts := strings.Split(line, ":")
        if len(parts) >= 2 && parts[0] == aggregateID {
            if version, err := strconv.ParseInt(parts[1], 10, 64); err == nil {
                if version > maxVersion {
                    maxVersion = version
                }
            }
        }
    }

    return maxVersion, scanner.Err()
}

func (fs *FileEventStore) loadPosition() error {
    // 计算当前事件位置
    fs.eventFile.Seek(0, io.SeekStart)
    scanner := bufio.NewScanner(fs.eventFile)

    for scanner.Scan() {
        fs.position++
    }

    return scanner.Err()
}

CQRS 架构模式 #

命令查询分离 #

// 命令接口
type Command interface {
    CommandID() string
    CommandType() string
    AggregateID() string
    Data() interface{}
}

// 基础命令实现
type BaseCommand struct {
    ID          string      `json:"id"`
    Type        string      `json:"type"`
    AggregateId string      `json:"aggregate_id"`
    CommandData interface{} `json:"data"`
}

func (c *BaseCommand) CommandID() string   { return c.ID }
func (c *BaseCommand) CommandType() string { return c.Type }
func (c *BaseCommand) AggregateID() string { return c.AggregateId }
func (c *BaseCommand) Data() interface{}   { return c.CommandData }

// 命令处理器接口
type CommandHandler interface {
    Handle(ctx context.Context, command Command) error
    CommandType() string
}

// 命令总线
type CommandBus struct {
    handlers map[string]CommandHandler
    mu       sync.RWMutex
    logger   *log.Logger
}

func NewCommandBus() *CommandBus {
    return &CommandBus{
        handlers: make(map[string]CommandHandler),
        logger:   log.New(os.Stdout, "[CommandBus] ", log.LstdFlags),
    }
}

func (cb *CommandBus) RegisterHandler(handler CommandHandler) {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    cb.handlers[handler.CommandType()] = handler
    cb.logger.Printf("命令处理器注册成功: %s", handler.CommandType())
}

func (cb *CommandBus) Execute(ctx context.Context, command Command) error {
    cb.mu.RLock()
    handler, exists := cb.handlers[command.CommandType()]
    cb.mu.RUnlock()

    if !exists {
        return fmt.Errorf("未找到命令处理器: %s", command.CommandType())
    }

    return handler.Handle(ctx, command)
}

// 查询接口
type Query interface {
    QueryID() string
    QueryType() string
    Parameters() map[string]interface{}
}

// 查询处理器接口
type QueryHandler interface {
    Handle(ctx context.Context, query Query) (interface{}, error)
    QueryType() string
}

// 查询总线
type QueryBus struct {
    handlers map[string]QueryHandler
    mu       sync.RWMutex
    logger   *log.Logger
}

func NewQueryBus() *QueryBus {
    return &QueryBus{
        handlers: make(map[string]QueryHandler),
        logger:   log.New(os.Stdout, "[QueryBus] ", log.LstdFlags),
    }
}

func (qb *QueryBus) RegisterHandler(handler QueryHandler) {
    qb.mu.Lock()
    defer qb.mu.Unlock()

    qb.handlers[handler.QueryType()] = handler
    qb.logger.Printf("查询处理器注册成功: %s", handler.QueryType())
}

func (qb *QueryBus) Execute(ctx context.Context, query Query) (interface{}, error) {
    qb.mu.RLock()
    handler, exists := qb.handlers[query.QueryType()]
    qb.mu.RUnlock()

    if !exists {
        return nil, fmt.Errorf("未找到查询处理器: %s", query.QueryType())
    }

    return handler.Handle(ctx, query)
}

读写模型分离 #

// 写模型 - 用户聚合
type UserWriteModel struct {
    *User
    eventStore EventStore
}

func NewUserWriteModel(eventStore EventStore) *UserWriteModel {
    return &UserWriteModel{
        eventStore: eventStore,
    }
}

func (uwm *UserWriteModel) LoadUser(userID string) error {
    events, err := uwm.eventStore.GetEvents(userID, 0)
    if err != nil {
        return err
    }

    if len(events) == 0 {
        return fmt.Errorf("用户不存在: %s", userID)
    }

    uwm.User = &User{BaseAggregateRoot: NewBaseAggregateRoot(userID)}
    return uwm.User.LoadFromHistory(events)
}

func (uwm *UserWriteModel) SaveUser() error {
    uncommittedEvents := uwm.User.UncommittedEvents()
    if len(uncommittedEvents) == 0 {
        return nil
    }

    expectedVersion := uwm.User.Version() - int64(len(uncommittedEvents))
    err := uwm.eventStore.SaveEvents(uwm.User.AggregateID(), uncommittedEvents, expectedVersion)
    if err != nil {
        return err
    }

    uwm.User.MarkEventsAsCommitted()
    return nil
}

// 读模型 - 用户视图
type UserReadModel struct {
    ID     string `json:"id"`
    Name   string `json:"name"`
    Email  string `json:"email"`
    Status string `json:"status"`
    CreatedAt time.Time `json:"created_at"`
    UpdatedAt time.Time `json:"updated_at"`
}

// 读模型存储
type UserReadModelStore struct {
    users  map[string]*UserReadModel
    mu     sync.RWMutex
    logger *log.Logger
}

func NewUserReadModelStore() *UserReadModelStore {
    return &UserReadModelStore{
        users:  make(map[string]*UserReadModel),
        logger: log.New(os.Stdout, "[UserReadModelStore] ", log.LstdFlags),
    }
}

func (urms *UserReadModelStore) Handle(ctx context.Context, event Event) error {
    urms.mu.Lock()
    defer urms.mu.Unlock()

    switch event.EventType() {
    case "UserCreated":
        data := event.Data().(map[string]interface{})
        user := &UserReadModel{
            ID:        event.AggregateID(),
            Name:      data["name"].(string),
            Email:     data["email"].(string),
            Status:    "active",
            CreatedAt: event.Timestamp(),
            UpdatedAt: event.Timestamp(),
        }
        urms.users[user.ID] = user
        urms.logger.Printf("用户读模型创建: %s", user.ID)

    case "EmailChanged":
        if user, exists := urms.users[event.AggregateID()]; exists {
            data := event.Data().(map[string]interface{})
            user.Email = data["new_email"].(string)
            user.UpdatedAt = event.Timestamp()
            urms.logger.Printf("用户邮箱更新: %s", user.ID)
        }

    case "UserDeactivated":
        if user, exists := urms.users[event.AggregateID()]; exists {
            user.Status = "inactive"
            user.UpdatedAt = event.Timestamp()
            urms.logger.Printf("用户停用: %s", user.ID)
        }
    }

    return nil
}

func (urms *UserReadModelStore) EventTypes() []string {
    return []string{"UserCreated", "EmailChanged", "UserDeactivated"}
}

func (urms *UserReadModelStore) GetUser(userID string) (*UserReadModel, error) {
    urms.mu.RLock()
    defer urms.mu.RUnlock()

    if user, exists := urms.users[userID]; exists {
        return user, nil
    }

    return nil, fmt.Errorf("用户不存在: %s", userID)
}

func (urms *UserReadModelStore) GetAllUsers() []*UserReadModel {
    urms.mu.RLock()
    defer urms.mu.RUnlock()

    users := make([]*UserReadModel, 0, len(urms.users))
    for _, user := range urms.users {
        users = append(users, user)
    }

    return users
}

事件驱动微服务 #

微服务事件协调 #

// 分布式事件总线
type DistributedEventBus struct {
    localBus    EventBus
    messageBus  MessageQueue
    serializer  EventSerializer
    logger      *log.Logger
}

type EventSerializer interface {
    Serialize(event Event) ([]byte, error)
    Deserialize(data []byte) (Event, error)
}

func NewDistributedEventBus(localBus EventBus, messageBus MessageQueue, serializer EventSerializer) *DistributedEventBus {
    return &DistributedEventBus{
        localBus:   localBus,
        messageBus: messageBus,
        serializer: serializer,
        logger:     log.New(os.Stdout, "[DistributedEventBus] ", log.LstdFlags),
    }
}

func (deb *DistributedEventBus) Publish(ctx context.Context, event Event) error {
    // 本地发布
    if err := deb.localBus.Publish(ctx, event); err != nil {
        return err
    }

    // 序列化事件
    data, err := deb.serializer.Serialize(event)
    if err != nil {
        return err
    }

    // 发布到消息队列
    message := &Message{
        ID:        uuid.New().String(),
        Topic:     "events",
        Key:       event.AggregateID(),
        Value:     data,
        Headers:   map[string]string{
            "event_type": event.EventType(),
            "event_id":   event.EventID(),
        },
        Timestamp: time.Now(),
    }

    return deb.messageBus.Send(ctx, message)
}

// 事件协调器
type EventOrchestrator struct {
    eventBus    EventBus
    sagaManager *SagaManager
    logger      *log.Logger
}

func NewEventOrchestrator(eventBus EventBus, sagaManager *SagaManager) *EventOrchestrator {
    return &EventOrchestrator{
        eventBus:    eventBus,
        sagaManager: sagaManager,
        logger:      log.New(os.Stdout, "[EventOrchestrator] ", log.LstdFlags),
    }
}

func (eo *EventOrchestrator) Handle(ctx context.Context, event Event) error {
    // 处理Saga事务
    if err := eo.sagaManager.ProcessEvent(ctx, event); err != nil {
        eo.logger.Printf("Saga处理失败: %v", err)
    }

    // 根据事件类型执行相应的协调逻辑
    switch event.EventType() {
    case "OrderCreated":
        return eo.handleOrderCreated(ctx, event)
    case "PaymentCompleted":
        return eo.handlePaymentCompleted(ctx, event)
    case "InventoryReserved":
        return eo.handleInventoryReserved(ctx, event)
    }

    return nil
}

func (eo *EventOrchestrator) handleOrderCreated(ctx context.Context, event Event) error {
    // 触发库存预留
    inventoryEvent := NewEventBuilder().
        WithType("ReserveInventory").
        WithAggregateID(event.AggregateID()).
        WithData(event.Data()).
        Build()

    return eo.eventBus.Publish(ctx, inventoryEvent)
}

事件驱动架构为构建松耦合、可扩展的分布式系统提供了强大的基础。通过事件溯源、CQRS 模式和分布式事件协调,我们可以构建出高度响应式和可维护的微服务系统。掌握这些模式和技术将帮助我们设计出更加健壮和灵活的现代应用架构。