5.6.4 事件驱动架构 #
事件驱动架构(Event-Driven Architecture, EDA)是一种基于事件的软件架构模式,系统组件通过产生和响应事件来进行通信。这种架构模式能够实现高度解耦、可扩展和响应式的分布式系统。本节将深入探讨事件驱动架构的设计原则、实现模式和最佳实践。
事件驱动架构基础 #
核心概念 #
// 事件接口定义
type Event interface {
EventID() string
EventType() string
AggregateID() string
Version() int64
Timestamp() time.Time
Data() interface{}
Metadata() map[string]interface{}
}
// 基础事件实现
type BaseEvent struct {
ID string `json:"id"`
Type string `json:"type"`
AggregateId string `json:"aggregate_id"`
Ver int64 `json:"version"`
Time time.Time `json:"timestamp"`
EventData interface{} `json:"data"`
Meta map[string]interface{} `json:"metadata"`
}
func (e *BaseEvent) EventID() string { return e.ID }
func (e *BaseEvent) EventType() string { return e.Type }
func (e *BaseEvent) AggregateID() string { return e.AggregateId }
func (e *BaseEvent) Version() int64 { return e.Ver }
func (e *BaseEvent) Timestamp() time.Time { return e.Time }
func (e *BaseEvent) Data() interface{} { return e.EventData }
func (e *BaseEvent) Metadata() map[string]interface{} { return e.Meta }
// 事件构建器
type EventBuilder struct {
event *BaseEvent
}
func NewEventBuilder() *EventBuilder {
return &EventBuilder{
event: &BaseEvent{
ID: uuid.New().String(),
Time: time.Now(),
Meta: make(map[string]interface{}),
},
}
}
func (eb *EventBuilder) WithType(eventType string) *EventBuilder {
eb.event.Type = eventType
return eb
}
func (eb *EventBuilder) WithAggregateID(aggregateID string) *EventBuilder {
eb.event.AggregateId = aggregateID
return eb
}
func (eb *EventBuilder) WithVersion(version int64) *EventBuilder {
eb.event.Ver = version
return eb
}
func (eb *EventBuilder) WithData(data interface{}) *EventBuilder {
eb.event.EventData = data
return eb
}
func (eb *EventBuilder) WithMetadata(key string, value interface{}) *EventBuilder {
eb.event.Meta[key] = value
return eb
}
func (eb *EventBuilder) Build() Event {
return eb.event
}
// 事件存储接口
type EventStore interface {
SaveEvents(aggregateID string, events []Event, expectedVersion int64) error
GetEvents(aggregateID string, fromVersion int64) ([]Event, error)
GetAllEvents(fromPosition int64, maxCount int) ([]Event, error)
Subscribe(eventTypes []string, handler EventHandler) error
Unsubscribe(handler EventHandler) error
}
// 事件处理器接口
type EventHandler interface {
Handle(ctx context.Context, event Event) error
EventTypes() []string
}
// 函数式事件处理器
type EventHandlerFunc func(ctx context.Context, event Event) error
func (f EventHandlerFunc) Handle(ctx context.Context, event Event) error {
return f(ctx, event)
}
func (f EventHandlerFunc) EventTypes() []string {
return []string{"*"} // 处理所有事件类型
}
事件总线实现 #
// 事件总线接口
type EventBus interface {
Publish(ctx context.Context, event Event) error
Subscribe(eventType string, handler EventHandler) error
Unsubscribe(eventType string, handler EventHandler) error
Close() error
}
// 内存事件总线实现
type InMemoryEventBus struct {
handlers map[string][]EventHandler
mu sync.RWMutex
logger *log.Logger
}
func NewInMemoryEventBus() *InMemoryEventBus {
return &InMemoryEventBus{
handlers: make(map[string][]EventHandler),
logger: log.New(os.Stdout, "[EventBus] ", log.LstdFlags),
}
}
func (bus *InMemoryEventBus) Publish(ctx context.Context, event Event) error {
bus.mu.RLock()
defer bus.mu.RUnlock()
eventType := event.EventType()
// 获取特定事件类型的处理器
if handlers, exists := bus.handlers[eventType]; exists {
for _, handler := range handlers {
go func(h EventHandler) {
if err := h.Handle(ctx, event); err != nil {
bus.logger.Printf("事件处理失败: event_type=%s, error=%v", eventType, err)
}
}(handler)
}
}
// 获取通配符处理器
if handlers, exists := bus.handlers["*"]; exists {
for _, handler := range handlers {
go func(h EventHandler) {
if err := h.Handle(ctx, event); err != nil {
bus.logger.Printf("事件处理失败: event_type=%s, error=%v", eventType, err)
}
}(handler)
}
}
bus.logger.Printf("事件发布成功: event_id=%s, event_type=%s", event.EventID(), eventType)
return nil
}
func (bus *InMemoryEventBus) Subscribe(eventType string, handler EventHandler) error {
bus.mu.Lock()
defer bus.mu.Unlock()
if _, exists := bus.handlers[eventType]; !exists {
bus.handlers[eventType] = make([]EventHandler, 0)
}
bus.handlers[eventType] = append(bus.handlers[eventType], handler)
bus.logger.Printf("事件处理器订阅成功: event_type=%s", eventType)
return nil
}
func (bus *InMemoryEventBus) Unsubscribe(eventType string, handler EventHandler) error {
bus.mu.Lock()
defer bus.mu.Unlock()
if handlers, exists := bus.handlers[eventType]; exists {
for i, h := range handlers {
if h == handler {
bus.handlers[eventType] = append(handlers[:i], handlers[i+1:]...)
break
}
}
if len(bus.handlers[eventType]) == 0 {
delete(bus.handlers, eventType)
}
}
return nil
}
func (bus *InMemoryEventBus) Close() error {
bus.mu.Lock()
defer bus.mu.Unlock()
bus.handlers = make(map[string][]EventHandler)
return nil
}
事件溯源 (Event Sourcing) #
聚合根实现 #
// 聚合根接口
type AggregateRoot interface {
AggregateID() string
Version() int64
UncommittedEvents() []Event
MarkEventsAsCommitted()
LoadFromHistory(events []Event) error
}
// 基础聚合根
type BaseAggregateRoot struct {
id string
version int64
uncommittedEvents []Event
}
func NewBaseAggregateRoot(id string) *BaseAggregateRoot {
return &BaseAggregateRoot{
id: id,
version: 0,
uncommittedEvents: make([]Event, 0),
}
}
func (ar *BaseAggregateRoot) AggregateID() string {
return ar.id
}
func (ar *BaseAggregateRoot) Version() int64 {
return ar.version
}
func (ar *BaseAggregateRoot) UncommittedEvents() []Event {
return ar.uncommittedEvents
}
func (ar *BaseAggregateRoot) MarkEventsAsCommitted() {
ar.uncommittedEvents = make([]Event, 0)
}
func (ar *BaseAggregateRoot) LoadFromHistory(events []Event) error {
for _, event := range events {
if err := ar.applyEvent(event); err != nil {
return err
}
ar.version = event.Version()
}
return nil
}
func (ar *BaseAggregateRoot) RaiseEvent(eventType string, data interface{}) {
event := NewEventBuilder().
WithType(eventType).
WithAggregateID(ar.id).
WithVersion(ar.version + 1).
WithData(data).
Build()
ar.uncommittedEvents = append(ar.uncommittedEvents, event)
ar.applyEvent(event)
ar.version++
}
func (ar *BaseAggregateRoot) applyEvent(event Event) error {
// 子类需要重写此方法来处理具体的事件
return nil
}
// 用户聚合示例
type User struct {
*BaseAggregateRoot
Name string
Email string
Status string
}
func NewUser(id, name, email string) *User {
user := &User{
BaseAggregateRoot: NewBaseAggregateRoot(id),
Name: name,
Email: email,
Status: "active",
}
// 触发用户创建事件
user.RaiseEvent("UserCreated", map[string]interface{}{
"name": name,
"email": email,
})
return user
}
func (u *User) ChangeEmail(newEmail string) error {
if newEmail == u.Email {
return fmt.Errorf("新邮箱与当前邮箱相同")
}
u.RaiseEvent("EmailChanged", map[string]interface{}{
"old_email": u.Email,
"new_email": newEmail,
})
return nil
}
func (u *User) Deactivate() {
if u.Status != "active" {
return
}
u.RaiseEvent("UserDeactivated", map[string]interface{}{
"user_id": u.AggregateID(),
})
}
func (u *User) applyEvent(event Event) error {
switch event.EventType() {
case "UserCreated":
data := event.Data().(map[string]interface{})
u.Name = data["name"].(string)
u.Email = data["email"].(string)
u.Status = "active"
case "EmailChanged":
data := event.Data().(map[string]interface{})
u.Email = data["new_email"].(string)
case "UserDeactivated":
u.Status = "inactive"
default:
return fmt.Errorf("未知事件类型: %s", event.EventType())
}
return nil
}
事件存储实现 #
// 基于文件的事件存储
type FileEventStore struct {
dataDir string
eventFile *os.File
indexFile *os.File
mu sync.RWMutex
handlers map[string][]EventHandler
position int64
}
func NewFileEventStore(dataDir string) (*FileEventStore, error) {
if err := os.MkdirAll(dataDir, 0755); err != nil {
return nil, err
}
eventPath := filepath.Join(dataDir, "events.log")
indexPath := filepath.Join(dataDir, "index.log")
eventFile, err := os.OpenFile(eventPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
indexFile, err := os.OpenFile(indexPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
eventFile.Close()
return nil, err
}
store := &FileEventStore{
dataDir: dataDir,
eventFile: eventFile,
indexFile: indexFile,
handlers: make(map[string][]EventHandler),
position: 0,
}
// 加载当前位置
if err := store.loadPosition(); err != nil {
return nil, err
}
return store, nil
}
func (fs *FileEventStore) SaveEvents(aggregateID string, events []Event, expectedVersion int64) error {
fs.mu.Lock()
defer fs.mu.Unlock()
// 验证版本
currentVersion, err := fs.getCurrentVersion(aggregateID)
if err != nil {
return err
}
if currentVersion != expectedVersion {
return fmt.Errorf("版本冲突: expected=%d, current=%d", expectedVersion, currentVersion)
}
// 保存事件
for _, event := range events {
eventData, err := json.Marshal(event)
if err != nil {
return err
}
// 写入事件数据
eventLine := fmt.Sprintf("%s\n", string(eventData))
if _, err := fs.eventFile.WriteString(eventLine); err != nil {
return err
}
// 写入索引
indexLine := fmt.Sprintf("%s:%d:%s\n", aggregateID, event.Version(), event.EventID())
if _, err := fs.indexFile.WriteString(indexLine); err != nil {
return err
}
fs.position++
}
// 强制刷新到磁盘
fs.eventFile.Sync()
fs.indexFile.Sync()
// 通知订阅者
for _, event := range events {
fs.notifyHandlers(event)
}
return nil
}
func (fs *FileEventStore) GetEvents(aggregateID string, fromVersion int64) ([]Event, error) {
fs.mu.RLock()
defer fs.mu.RUnlock()
var events []Event
// 读取事件文件
fs.eventFile.Seek(0, io.SeekStart)
scanner := bufio.NewScanner(fs.eventFile)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
var event BaseEvent
if err := json.Unmarshal([]byte(line), &event); err != nil {
continue
}
if event.AggregateId == aggregateID && event.Ver >= fromVersion {
events = append(events, &event)
}
}
return events, scanner.Err()
}
func (fs *FileEventStore) Subscribe(eventTypes []string, handler EventHandler) error {
fs.mu.Lock()
defer fs.mu.Unlock()
for _, eventType := range eventTypes {
if _, exists := fs.handlers[eventType]; !exists {
fs.handlers[eventType] = make([]EventHandler, 0)
}
fs.handlers[eventType] = append(fs.handlers[eventType], handler)
}
return nil
}
func (fs *FileEventStore) notifyHandlers(event Event) {
eventType := event.EventType()
if handlers, exists := fs.handlers[eventType]; exists {
for _, handler := range handlers {
go func(h EventHandler) {
ctx := context.Background()
if err := h.Handle(ctx, event); err != nil {
log.Printf("事件处理失败: %v", err)
}
}(handler)
}
}
// 通知通配符处理器
if handlers, exists := fs.handlers["*"]; exists {
for _, handler := range handlers {
go func(h EventHandler) {
ctx := context.Background()
if err := h.Handle(ctx, event); err != nil {
log.Printf("事件处理失败: %v", err)
}
}(handler)
}
}
}
func (fs *FileEventStore) getCurrentVersion(aggregateID string) (int64, error) {
// 从索引文件中查找最新版本
fs.indexFile.Seek(0, io.SeekStart)
scanner := bufio.NewScanner(fs.indexFile)
var maxVersion int64 = -1
for scanner.Scan() {
line := scanner.Text()
parts := strings.Split(line, ":")
if len(parts) >= 2 && parts[0] == aggregateID {
if version, err := strconv.ParseInt(parts[1], 10, 64); err == nil {
if version > maxVersion {
maxVersion = version
}
}
}
}
return maxVersion, scanner.Err()
}
func (fs *FileEventStore) loadPosition() error {
// 计算当前事件位置
fs.eventFile.Seek(0, io.SeekStart)
scanner := bufio.NewScanner(fs.eventFile)
for scanner.Scan() {
fs.position++
}
return scanner.Err()
}
CQRS 架构模式 #
命令查询分离 #
// 命令接口
type Command interface {
CommandID() string
CommandType() string
AggregateID() string
Data() interface{}
}
// 基础命令实现
type BaseCommand struct {
ID string `json:"id"`
Type string `json:"type"`
AggregateId string `json:"aggregate_id"`
CommandData interface{} `json:"data"`
}
func (c *BaseCommand) CommandID() string { return c.ID }
func (c *BaseCommand) CommandType() string { return c.Type }
func (c *BaseCommand) AggregateID() string { return c.AggregateId }
func (c *BaseCommand) Data() interface{} { return c.CommandData }
// 命令处理器接口
type CommandHandler interface {
Handle(ctx context.Context, command Command) error
CommandType() string
}
// 命令总线
type CommandBus struct {
handlers map[string]CommandHandler
mu sync.RWMutex
logger *log.Logger
}
func NewCommandBus() *CommandBus {
return &CommandBus{
handlers: make(map[string]CommandHandler),
logger: log.New(os.Stdout, "[CommandBus] ", log.LstdFlags),
}
}
func (cb *CommandBus) RegisterHandler(handler CommandHandler) {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.handlers[handler.CommandType()] = handler
cb.logger.Printf("命令处理器注册成功: %s", handler.CommandType())
}
func (cb *CommandBus) Execute(ctx context.Context, command Command) error {
cb.mu.RLock()
handler, exists := cb.handlers[command.CommandType()]
cb.mu.RUnlock()
if !exists {
return fmt.Errorf("未找到命令处理器: %s", command.CommandType())
}
return handler.Handle(ctx, command)
}
// 查询接口
type Query interface {
QueryID() string
QueryType() string
Parameters() map[string]interface{}
}
// 查询处理器接口
type QueryHandler interface {
Handle(ctx context.Context, query Query) (interface{}, error)
QueryType() string
}
// 查询总线
type QueryBus struct {
handlers map[string]QueryHandler
mu sync.RWMutex
logger *log.Logger
}
func NewQueryBus() *QueryBus {
return &QueryBus{
handlers: make(map[string]QueryHandler),
logger: log.New(os.Stdout, "[QueryBus] ", log.LstdFlags),
}
}
func (qb *QueryBus) RegisterHandler(handler QueryHandler) {
qb.mu.Lock()
defer qb.mu.Unlock()
qb.handlers[handler.QueryType()] = handler
qb.logger.Printf("查询处理器注册成功: %s", handler.QueryType())
}
func (qb *QueryBus) Execute(ctx context.Context, query Query) (interface{}, error) {
qb.mu.RLock()
handler, exists := qb.handlers[query.QueryType()]
qb.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("未找到查询处理器: %s", query.QueryType())
}
return handler.Handle(ctx, query)
}
读写模型分离 #
// 写模型 - 用户聚合
type UserWriteModel struct {
*User
eventStore EventStore
}
func NewUserWriteModel(eventStore EventStore) *UserWriteModel {
return &UserWriteModel{
eventStore: eventStore,
}
}
func (uwm *UserWriteModel) LoadUser(userID string) error {
events, err := uwm.eventStore.GetEvents(userID, 0)
if err != nil {
return err
}
if len(events) == 0 {
return fmt.Errorf("用户不存在: %s", userID)
}
uwm.User = &User{BaseAggregateRoot: NewBaseAggregateRoot(userID)}
return uwm.User.LoadFromHistory(events)
}
func (uwm *UserWriteModel) SaveUser() error {
uncommittedEvents := uwm.User.UncommittedEvents()
if len(uncommittedEvents) == 0 {
return nil
}
expectedVersion := uwm.User.Version() - int64(len(uncommittedEvents))
err := uwm.eventStore.SaveEvents(uwm.User.AggregateID(), uncommittedEvents, expectedVersion)
if err != nil {
return err
}
uwm.User.MarkEventsAsCommitted()
return nil
}
// 读模型 - 用户视图
type UserReadModel struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// 读模型存储
type UserReadModelStore struct {
users map[string]*UserReadModel
mu sync.RWMutex
logger *log.Logger
}
func NewUserReadModelStore() *UserReadModelStore {
return &UserReadModelStore{
users: make(map[string]*UserReadModel),
logger: log.New(os.Stdout, "[UserReadModelStore] ", log.LstdFlags),
}
}
func (urms *UserReadModelStore) Handle(ctx context.Context, event Event) error {
urms.mu.Lock()
defer urms.mu.Unlock()
switch event.EventType() {
case "UserCreated":
data := event.Data().(map[string]interface{})
user := &UserReadModel{
ID: event.AggregateID(),
Name: data["name"].(string),
Email: data["email"].(string),
Status: "active",
CreatedAt: event.Timestamp(),
UpdatedAt: event.Timestamp(),
}
urms.users[user.ID] = user
urms.logger.Printf("用户读模型创建: %s", user.ID)
case "EmailChanged":
if user, exists := urms.users[event.AggregateID()]; exists {
data := event.Data().(map[string]interface{})
user.Email = data["new_email"].(string)
user.UpdatedAt = event.Timestamp()
urms.logger.Printf("用户邮箱更新: %s", user.ID)
}
case "UserDeactivated":
if user, exists := urms.users[event.AggregateID()]; exists {
user.Status = "inactive"
user.UpdatedAt = event.Timestamp()
urms.logger.Printf("用户停用: %s", user.ID)
}
}
return nil
}
func (urms *UserReadModelStore) EventTypes() []string {
return []string{"UserCreated", "EmailChanged", "UserDeactivated"}
}
func (urms *UserReadModelStore) GetUser(userID string) (*UserReadModel, error) {
urms.mu.RLock()
defer urms.mu.RUnlock()
if user, exists := urms.users[userID]; exists {
return user, nil
}
return nil, fmt.Errorf("用户不存在: %s", userID)
}
func (urms *UserReadModelStore) GetAllUsers() []*UserReadModel {
urms.mu.RLock()
defer urms.mu.RUnlock()
users := make([]*UserReadModel, 0, len(urms.users))
for _, user := range urms.users {
users = append(users, user)
}
return users
}
事件驱动微服务 #
微服务事件协调 #
// 分布式事件总线
type DistributedEventBus struct {
localBus EventBus
messageBus MessageQueue
serializer EventSerializer
logger *log.Logger
}
type EventSerializer interface {
Serialize(event Event) ([]byte, error)
Deserialize(data []byte) (Event, error)
}
func NewDistributedEventBus(localBus EventBus, messageBus MessageQueue, serializer EventSerializer) *DistributedEventBus {
return &DistributedEventBus{
localBus: localBus,
messageBus: messageBus,
serializer: serializer,
logger: log.New(os.Stdout, "[DistributedEventBus] ", log.LstdFlags),
}
}
func (deb *DistributedEventBus) Publish(ctx context.Context, event Event) error {
// 本地发布
if err := deb.localBus.Publish(ctx, event); err != nil {
return err
}
// 序列化事件
data, err := deb.serializer.Serialize(event)
if err != nil {
return err
}
// 发布到消息队列
message := &Message{
ID: uuid.New().String(),
Topic: "events",
Key: event.AggregateID(),
Value: data,
Headers: map[string]string{
"event_type": event.EventType(),
"event_id": event.EventID(),
},
Timestamp: time.Now(),
}
return deb.messageBus.Send(ctx, message)
}
// 事件协调器
type EventOrchestrator struct {
eventBus EventBus
sagaManager *SagaManager
logger *log.Logger
}
func NewEventOrchestrator(eventBus EventBus, sagaManager *SagaManager) *EventOrchestrator {
return &EventOrchestrator{
eventBus: eventBus,
sagaManager: sagaManager,
logger: log.New(os.Stdout, "[EventOrchestrator] ", log.LstdFlags),
}
}
func (eo *EventOrchestrator) Handle(ctx context.Context, event Event) error {
// 处理Saga事务
if err := eo.sagaManager.ProcessEvent(ctx, event); err != nil {
eo.logger.Printf("Saga处理失败: %v", err)
}
// 根据事件类型执行相应的协调逻辑
switch event.EventType() {
case "OrderCreated":
return eo.handleOrderCreated(ctx, event)
case "PaymentCompleted":
return eo.handlePaymentCompleted(ctx, event)
case "InventoryReserved":
return eo.handleInventoryReserved(ctx, event)
}
return nil
}
func (eo *EventOrchestrator) handleOrderCreated(ctx context.Context, event Event) error {
// 触发库存预留
inventoryEvent := NewEventBuilder().
WithType("ReserveInventory").
WithAggregateID(event.AggregateID()).
WithData(event.Data()).
Build()
return eo.eventBus.Publish(ctx, inventoryEvent)
}
事件驱动架构为构建松耦合、可扩展的分布式系统提供了强大的基础。通过事件溯源、CQRS 模式和分布式事件协调,我们可以构建出高度响应式和可维护的微服务系统。掌握这些模式和技术将帮助我们设计出更加健壮和灵活的现代应用架构。