5.9.4 订单服务开发

5.9.4 订单服务开发 #

订单服务是电商系统的核心业务服务,负责处理订单的创建、支付、发货、退款等完整生命周期管理。本节将详细介绍如何设计和实现一个支持分布式事务的订单服务。

服务设计概述 #

核心功能 #

订单服务需要支持以下核心功能:

  • 订单管理:订单创建、查询、修改、取消
  • 状态管理:订单状态流转和状态机管理
  • 支付集成:支付处理和支付状态同步
  • 库存协调:与库存服务的协调和一致性保证
  • 分布式事务:使用 Saga 模式处理跨服务事务
  • 订单履约:发货、配送、签收流程管理

订单状态机 #

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   PENDING   │───▶│    PAID     │───▶│  SHIPPED    │
│   (待支付)   │    │   (已支付)   │    │   (已发货)   │
└─────────────┘    └─────────────┘    └─────────────┘
       │                   │                   │
       │                   │                   ▼
       │                   │           ┌─────────────┐
       │                   │           │ DELIVERED   │
       │                   │           │  (已送达)    │
       │                   │           └─────────────┘
       │                   │                   │
       ▼                   ▼                   ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ CANCELLED   │    │  REFUNDED   │    │ COMPLETED   │
│  (已取消)    │    │  (已退款)    │    │  (已完成)    │
└─────────────┘    └─────────────┘    └─────────────┘

领域模型设计 #

订单聚合 #

// internal/domain/entity/order.go
package entity

import (
    "errors"
    "time"
)

// 订单聚合根
type Order struct {
    ID              string        `json:"id"`
    OrderNumber     string        `json:"order_number"`
    UserID          string        `json:"user_id"`
    Items           []OrderItem   `json:"items"`
    TotalAmount     Money         `json:"total_amount"`
    DiscountAmount  Money         `json:"discount_amount"`
    ShippingAmount  Money         `json:"shipping_amount"`
    TaxAmount       Money         `json:"tax_amount"`
    FinalAmount     Money         `json:"final_amount"`
    Status          OrderStatus   `json:"status"`
    PaymentInfo     PaymentInfo   `json:"payment_info"`
    ShippingAddress Address       `json:"shipping_address"`
    BillingAddress  Address       `json:"billing_address"`
    ShippingMethod  ShippingMethod `json:"shipping_method"`
    CouponCode      string        `json:"coupon_code,omitempty"`
    Notes           string        `json:"notes"`
    CreatedAt       time.Time     `json:"created_at"`
    UpdatedAt       time.Time     `json:"updated_at"`
    PaidAt          *time.Time    `json:"paid_at,omitempty"`
    ShippedAt       *time.Time    `json:"shipped_at,omitempty"`
    DeliveredAt     *time.Time    `json:"delivered_at,omitempty"`
    CancelledAt     *time.Time    `json:"cancelled_at,omitempty"`

    // 领域事件
    events []DomainEvent
}

// 订单项
type OrderItem struct {
    ID          string `json:"id"`
    ProductID   string `json:"product_id"`
    ProductName string `json:"product_name"`
    ProductSKU  string `json:"product_sku"`
    VariantID   string `json:"variant_id,omitempty"`
    Quantity    int    `json:"quantity"`
    UnitPrice   Money  `json:"unit_price"`
    TotalPrice  Money  `json:"total_price"`
    Image       string `json:"image"`
    Attributes  []ProductAttribute `json:"attributes"`
}

// 商品属性
type ProductAttribute struct {
    Name  string `json:"name"`
    Value string `json:"value"`
}

// 货币
type Money struct {
    Amount   int64  `json:"amount"`   // 以分为单位
    Currency string `json:"currency"` // 货币代码
}

// 支付信息
type PaymentInfo struct {
    PaymentID     string        `json:"payment_id"`
    PaymentMethod PaymentMethod `json:"payment_method"`
    PaymentStatus PaymentStatus `json:"payment_status"`
    TransactionID string        `json:"transaction_id,omitempty"`
    PaidAmount    Money         `json:"paid_amount"`
    PaidAt        *time.Time    `json:"paid_at,omitempty"`
}

// 地址
type Address struct {
    RecipientName string `json:"recipient_name"`
    Phone         string `json:"phone"`
    Street        string `json:"street"`
    City          string `json:"city"`
    Province      string `json:"province"`
    Country       string `json:"country"`
    ZipCode       string `json:"zip_code"`
}

// 配送方式
type ShippingMethod struct {
    ID          string `json:"id"`
    Name        string `json:"name"`
    Description string `json:"description"`
    Price       Money  `json:"price"`
    EstimatedDays int  `json:"estimated_days"`
}

// 订单状态
type OrderStatus int

const (
    OrderStatusPending OrderStatus = iota + 1
    OrderStatusPaid
    OrderStatusShipped
    OrderStatusDelivered
    OrderStatusCompleted
    OrderStatusCancelled
    OrderStatusRefunded
)

// 支付方式
type PaymentMethod int

const (
    PaymentMethodCreditCard PaymentMethod = iota + 1
    PaymentMethodDebitCard
    PaymentMethodPayPal
    PaymentMethodAlipay
    PaymentMethodWechatPay
    PaymentMethodBankTransfer
)

// 支付状态
type PaymentStatus int

const (
    PaymentStatusPending PaymentStatus = iota + 1
    PaymentStatusProcessing
    PaymentStatusCompleted
    PaymentStatusFailed
    PaymentStatusCancelled
    PaymentStatusRefunded
)

// 创建新订单
func NewOrder(userID string, items []OrderItem, shippingAddress, billingAddress Address, shippingMethod ShippingMethod) (*Order, error) {
    if err := validateOrderInput(userID, items, shippingAddress); err != nil {
        return nil, err
    }

    // 计算订单金额
    totalAmount := calculateTotalAmount(items)
    shippingAmount := shippingMethod.Price
    finalAmount := Money{
        Amount:   totalAmount.Amount + shippingAmount.Amount,
        Currency: totalAmount.Currency,
    }

    order := &Order{
        ID:              generateID(),
        OrderNumber:     generateOrderNumber(),
        UserID:          userID,
        Items:           items,
        TotalAmount:     totalAmount,
        ShippingAmount:  shippingAmount,
        FinalAmount:     finalAmount,
        Status:          OrderStatusPending,
        ShippingAddress: shippingAddress,
        BillingAddress:  billingAddress,
        ShippingMethod:  shippingMethod,
        CreatedAt:       time.Now(),
        UpdatedAt:       time.Now(),
        events:          make([]DomainEvent, 0),
    }

    order.addEvent(NewOrderCreatedEvent(order.ID, order.UserID, order.FinalAmount))
    return order, nil
}

// 支付订单
func (o *Order) Pay(paymentInfo PaymentInfo) error {
    if o.Status != OrderStatusPending {
        return errors.New("order cannot be paid in current status")
    }

    if paymentInfo.PaidAmount.Amount != o.FinalAmount.Amount {
        return errors.New("payment amount does not match order amount")
    }

    now := time.Now()
    o.Status = OrderStatusPaid
    o.PaymentInfo = paymentInfo
    o.PaidAt = &now
    o.UpdatedAt = now

    o.addEvent(NewOrderPaidEvent(o.ID, o.UserID, paymentInfo))
    return nil
}

// 发货
func (o *Order) Ship(trackingNumber string) error {
    if o.Status != OrderStatusPaid {
        return errors.New("order must be paid before shipping")
    }

    now := time.Now()
    o.Status = OrderStatusShipped
    o.ShippedAt = &now
    o.UpdatedAt = now

    o.addEvent(NewOrderShippedEvent(o.ID, o.UserID, trackingNumber))
    return nil
}

// 确认收货
func (o *Order) Deliver() error {
    if o.Status != OrderStatusShipped {
        return errors.New("order must be shipped before delivery")
    }

    now := time.Now()
    o.Status = OrderStatusDelivered
    o.DeliveredAt = &now
    o.UpdatedAt = now

    o.addEvent(NewOrderDeliveredEvent(o.ID, o.UserID))
    return nil
}

// 完成订单
func (o *Order) Complete() error {
    if o.Status != OrderStatusDelivered {
        return errors.New("order must be delivered before completion")
    }

    o.Status = OrderStatusCompleted
    o.UpdatedAt = time.Now()

    o.addEvent(NewOrderCompletedEvent(o.ID, o.UserID))
    return nil
}

// 取消订单
func (o *Order) Cancel(reason string) error {
    if o.Status != OrderStatusPending {
        return errors.New("order cannot be cancelled in current status")
    }

    now := time.Now()
    o.Status = OrderStatusCancelled
    o.CancelledAt = &now
    o.UpdatedAt = now

    o.addEvent(NewOrderCancelledEvent(o.ID, o.UserID, reason))
    return nil
}

// 退款
func (o *Order) Refund(refundAmount Money, reason string) error {
    if o.Status != OrderStatusPaid && o.Status != OrderStatusShipped && o.Status != OrderStatusDelivered {
        return errors.New("order cannot be refunded in current status")
    }

    if refundAmount.Amount > o.PaymentInfo.PaidAmount.Amount {
        return errors.New("refund amount cannot exceed paid amount")
    }

    o.Status = OrderStatusRefunded
    o.UpdatedAt = time.Now()

    o.addEvent(NewOrderRefundedEvent(o.ID, o.UserID, refundAmount, reason))
    return nil
}

// 应用优惠券
func (o *Order) ApplyCoupon(couponCode string, discountAmount Money) error {
    if o.Status != OrderStatusPending {
        return errors.New("coupon can only be applied to pending orders")
    }

    if discountAmount.Amount >= o.TotalAmount.Amount {
        return errors.New("discount amount cannot exceed total amount")
    }

    o.CouponCode = couponCode
    o.DiscountAmount = discountAmount
    o.FinalAmount = Money{
        Amount:   o.TotalAmount.Amount + o.ShippingAmount.Amount - discountAmount.Amount,
        Currency: o.TotalAmount.Currency,
    }
    o.UpdatedAt = time.Now()

    o.addEvent(NewOrderCouponAppliedEvent(o.ID, couponCode, discountAmount))
    return nil
}

// 添加订单项
func (o *Order) AddItem(item OrderItem) error {
    if o.Status != OrderStatusPending {
        return errors.New("cannot add items to non-pending order")
    }

    // 检查是否已存在相同商品
    for i, existingItem := range o.Items {
        if existingItem.ProductID == item.ProductID && existingItem.VariantID == item.VariantID {
            o.Items[i].Quantity += item.Quantity
            o.Items[i].TotalPrice = Money{
                Amount:   o.Items[i].UnitPrice.Amount * int64(o.Items[i].Quantity),
                Currency: o.Items[i].UnitPrice.Currency,
            }
            o.recalculateAmount()
            return nil
        }
    }

    item.ID = generateID()
    item.TotalPrice = Money{
        Amount:   item.UnitPrice.Amount * int64(item.Quantity),
        Currency: item.UnitPrice.Currency,
    }

    o.Items = append(o.Items, item)
    o.recalculateAmount()

    o.addEvent(NewOrderItemAddedEvent(o.ID, item))
    return nil
}

// 移除订单项
func (o *Order) RemoveItem(itemID string) error {
    if o.Status != OrderStatusPending {
        return errors.New("cannot remove items from non-pending order")
    }

    for i, item := range o.Items {
        if item.ID == itemID {
            o.Items = append(o.Items[:i], o.Items[i+1:]...)
            o.recalculateAmount()
            o.addEvent(NewOrderItemRemovedEvent(o.ID, itemID))
            return nil
        }
    }

    return errors.New("item not found")
}

// 更新订单项数量
func (o *Order) UpdateItemQuantity(itemID string, quantity int) error {
    if o.Status != OrderStatusPending {
        return errors.New("cannot update items in non-pending order")
    }

    if quantity <= 0 {
        return o.RemoveItem(itemID)
    }

    for i, item := range o.Items {
        if item.ID == itemID {
            o.Items[i].Quantity = quantity
            o.Items[i].TotalPrice = Money{
                Amount:   item.UnitPrice.Amount * int64(quantity),
                Currency: item.UnitPrice.Currency,
            }
            o.recalculateAmount()
            o.addEvent(NewOrderItemUpdatedEvent(o.ID, itemID, quantity))
            return nil
        }
    }

    return errors.New("item not found")
}

// 重新计算订单金额
func (o *Order) recalculateAmount() {
    totalAmount := int64(0)
    currency := "CNY"

    for _, item := range o.Items {
        totalAmount += item.TotalPrice.Amount
        currency = item.TotalPrice.Currency
    }

    o.TotalAmount = Money{Amount: totalAmount, Currency: currency}
    o.FinalAmount = Money{
        Amount:   totalAmount + o.ShippingAmount.Amount - o.DiscountAmount.Amount,
        Currency: currency,
    }
    o.UpdatedAt = time.Now()
}

// 检查订单是否可以取消
func (o *Order) CanCancel() bool {
    return o.Status == OrderStatusPending
}

// 检查订单是否可以退款
func (o *Order) CanRefund() bool {
    return o.Status == OrderStatusPaid || o.Status == OrderStatusShipped || o.Status == OrderStatusDelivered
}

// 获取订单总数量
func (o *Order) GetTotalQuantity() int {
    total := 0
    for _, item := range o.Items {
        total += item.Quantity
    }
    return total
}

// 获取领域事件
func (o *Order) GetEvents() []DomainEvent {
    return o.events
}

// 清除领域事件
func (o *Order) ClearEvents() {
    o.events = make([]DomainEvent, 0)
}

// 添加领域事件
func (o *Order) addEvent(event DomainEvent) {
    o.events = append(o.events, event)
}

// 验证订单输入
func validateOrderInput(userID string, items []OrderItem, shippingAddress Address) error {
    if userID == "" {
        return errors.New("user ID is required")
    }

    if len(items) == 0 {
        return errors.New("order must have at least one item")
    }

    for _, item := range items {
        if item.ProductID == "" {
            return errors.New("product ID is required for all items")
        }
        if item.Quantity <= 0 {
            return errors.New("item quantity must be positive")
        }
        if item.UnitPrice.Amount <= 0 {
            return errors.New("item price must be positive")
        }
    }

    if shippingAddress.RecipientName == "" {
        return errors.New("recipient name is required")
    }
    if shippingAddress.Street == "" {
        return errors.New("street address is required")
    }

    return nil
}

// 计算订单总金额
func calculateTotalAmount(items []OrderItem) Money {
    totalAmount := int64(0)
    currency := "CNY"

    for _, item := range items {
        totalAmount += item.UnitPrice.Amount * int64(item.Quantity)
        currency = item.UnitPrice.Currency
    }

    return Money{Amount: totalAmount, Currency: currency}
}

// 生成订单号
func generateOrderNumber() string {
    return fmt.Sprintf("ORD%d%06d", time.Now().Unix(), rand.Intn(1000000))
}

订单领域事件 #

// internal/domain/event/order_events.go
package event

import "time"

// 订单创建事件
type OrderCreatedEvent struct {
    BaseEvent
    UserID      string `json:"user_id"`
    FinalAmount Money  `json:"final_amount"`
}

func NewOrderCreatedEvent(orderID, userID string, finalAmount Money) *OrderCreatedEvent {
    return &OrderCreatedEvent{
        BaseEvent: BaseEvent{
            EventID:     generateEventID(),
            EventType:   "order.created",
            AggregateID: orderID,
            OccurredAt:  time.Now(),
        },
        UserID:      userID,
        FinalAmount: finalAmount,
    }
}

// 订单支付事件
type OrderPaidEvent struct {
    BaseEvent
    UserID      string      `json:"user_id"`
    PaymentInfo PaymentInfo `json:"payment_info"`
}

func NewOrderPaidEvent(orderID, userID string, paymentInfo PaymentInfo) *OrderPaidEvent {
    return &OrderPaidEvent{
        BaseEvent: BaseEvent{
            EventID:     generateEventID(),
            EventType:   "order.paid",
            AggregateID: orderID,
            OccurredAt:  time.Now(),
        },
        UserID:      userID,
        PaymentInfo: paymentInfo,
    }
}

// 订单发货事件
type OrderShippedEvent struct {
    BaseEvent
    UserID         string `json:"user_id"`
    TrackingNumber string `json:"tracking_number"`
}

func NewOrderShippedEvent(orderID, userID, trackingNumber string) *OrderShippedEvent {
    return &OrderShippedEvent{
        BaseEvent: BaseEvent{
            EventID:     generateEventID(),
            EventType:   "order.shipped",
            AggregateID: orderID,
            OccurredAt:  time.Now(),
        },
        UserID:         userID,
        TrackingNumber: trackingNumber,
    }
}

// 订单送达事件
type OrderDeliveredEvent struct {
    BaseEvent
    UserID string `json:"user_id"`
}

func NewOrderDeliveredEvent(orderID, userID string) *OrderDeliveredEvent {
    return &OrderDeliveredEvent{
        BaseEvent: BaseEvent{
            EventID:     generateEventID(),
            EventType:   "order.delivered",
            AggregateID: orderID,
            OccurredAt:  time.Now(),
        },
        UserID: userID,
    }
}

// 订单取消事件
type OrderCancelledEvent struct {
    BaseEvent
    UserID string `json:"user_id"`
    Reason string `json:"reason"`
}

func NewOrderCancelledEvent(orderID, userID, reason string) *OrderCancelledEvent {
    return &OrderCancelledEvent{
        BaseEvent: BaseEvent{
            EventID:     generateEventID(),
            EventType:   "order.cancelled",
            AggregateID: orderID,
            OccurredAt:  time.Now(),
        },
        UserID: userID,
        Reason: reason,
    }
}

// 订单退款事件
type OrderRefundedEvent struct {
    BaseEvent
    UserID       string `json:"user_id"`
    RefundAmount Money  `json:"refund_amount"`
    Reason       string `json:"reason"`
}

func NewOrderRefundedEvent(orderID, userID string, refundAmount Money, reason string) *OrderRefundedEvent {
    return &OrderRefundedEvent{
        BaseEvent: BaseEvent{
            EventID:     generateEventID(),
            EventType:   "order.refunded",
            AggregateID: orderID,
            OccurredAt:  time.Now(),
        },
        UserID:       userID,
        RefundAmount: refundAmount,
        Reason:       reason,
    }
}

应用服务层 #

Saga 事务管理 #

// internal/application/saga/order_saga.go
package saga

import (
    "context"
    "encoding/json"
    "errors"
    "time"
)

// Saga 事务管理器
type OrderSagaManager struct {
    sagaRepo        SagaRepository
    userService     UserService
    productService  ProductService
    inventoryService InventoryService
    paymentService  PaymentService
    eventBus        EventBus
}

func NewOrderSagaManager(
    sagaRepo SagaRepository,
    userService UserService,
    productService ProductService,
    inventoryService InventoryService,
    paymentService PaymentService,
    eventBus EventBus,
) *OrderSagaManager {
    return &OrderSagaManager{
        sagaRepo:        sagaRepo,
        userService:     userService,
        productService:  productService,
        inventoryService: inventoryService,
        paymentService:  paymentService,
        eventBus:        eventBus,
    }
}

// 创建订单 Saga
func (s *OrderSagaManager) CreateOrderSaga(ctx context.Context, orderData *CreateOrderSagaData) error {
    saga := &Saga{
        ID:        generateSagaID(),
        Type:      "create_order",
        Status:    SagaStatusStarted,
        Data:      orderData,
        Steps:     s.buildCreateOrderSteps(),
        CreatedAt: time.Now(),
        UpdatedAt: time.Now(),
    }

    // 保存 Saga
    if err := s.sagaRepo.Save(ctx, saga); err != nil {
        return err
    }

    // 执行 Saga
    return s.executeSaga(ctx, saga)
}

// 构建创建订单的步骤
func (s *OrderSagaManager) buildCreateOrderSteps() []SagaStep {
    return []SagaStep{
        {
            Name:        "validate_user",
            Handler:     s.validateUser,
            Compensator: nil, // 验证步骤不需要补偿
        },
        {
            Name:        "validate_products",
            Handler:     s.validateProducts,
            Compensator: nil,
        },
        {
            Name:        "reserve_inventory",
            Handler:     s.reserveInventory,
            Compensator: s.releaseInventory,
        },
        {
            Name:        "create_order",
            Handler:     s.createOrder,
            Compensator: s.cancelOrder,
        },
        {
            Name:        "process_payment",
            Handler:     s.processPayment,
            Compensator: s.refundPayment,
        },
        {
            Name:        "confirm_order",
            Handler:     s.confirmOrder,
            Compensator: nil,
        },
    }
}

// 执行 Saga
func (s *OrderSagaManager) executeSaga(ctx context.Context, saga *Saga) error {
    for i, step := range saga.Steps {
        saga.CurrentStep = i
        saga.UpdatedAt = time.Now()

        // 更新 Saga 状态
        if err := s.sagaRepo.Update(ctx, saga); err != nil {
            return err
        }

        // 执行步骤
        if err := step.Handler(ctx, saga.Data); err != nil {
            // 执行失败,开始补偿
            saga.Status = SagaStatusCompensating
            saga.FailureReason = err.Error()
            s.sagaRepo.Update(ctx, saga)

            return s.compensateSaga(ctx, saga, i)
        }

        // 记录步骤完成
        saga.CompletedSteps = append(saga.CompletedSteps, step.Name)
    }

    // 所有步骤完成
    saga.Status = SagaStatusCompleted
    saga.CompletedAt = time.Now()
    saga.UpdatedAt = time.Now()

    return s.sagaRepo.Update(ctx, saga)
}

// 补偿 Saga
func (s *OrderSagaManager) compensateSaga(ctx context.Context, saga *Saga, failedStepIndex int) error {
    // 从失败步骤开始,逆序执行补偿操作
    for i := failedStepIndex - 1; i >= 0; i-- {
        step := saga.Steps[i]
        if step.Compensator != nil {
            if err := step.Compensator(ctx, saga.Data); err != nil {
                // 补偿失败,记录错误但继续执行其他补偿
                saga.CompensationErrors = append(saga.CompensationErrors,
                    CompensationError{
                        Step:  step.Name,
                        Error: err.Error(),
                    })
            }
        }
    }

    saga.Status = SagaStatusFailed
    saga.UpdatedAt = time.Now()

    return s.sagaRepo.Update(ctx, saga)
}

// Saga 步骤实现

// 验证用户
func (s *OrderSagaManager) validateUser(ctx context.Context, data interface{}) error {
    orderData := data.(*CreateOrderSagaData)

    user, err := s.userService.GetUser(ctx, orderData.UserID)
    if err != nil {
        return errors.New("user not found")
    }

    if user.Status != UserStatusActive {
        return errors.New("user is not active")
    }

    return nil
}

// 验证商品
func (s *OrderSagaManager) validateProducts(ctx context.Context, data interface{}) error {
    orderData := data.(*CreateOrderSagaData)

    for _, item := range orderData.Items {
        product, err := s.productService.GetProduct(ctx, item.ProductID)
        if err != nil {
            return errors.New("product not found: " + item.ProductID)
        }

        if product.Status != ProductStatusActive {
            return errors.New("product is not active: " + item.ProductID)
        }

        if !product.IsInStock() {
            return errors.New("product is out of stock: " + item.ProductID)
        }
    }

    return nil
}

// 预留库存
func (s *OrderSagaManager) reserveInventory(ctx context.Context, data interface{}) error {
    orderData := data.(*CreateOrderSagaData)

    reservations := make([]InventoryReservation, 0)

    for _, item := range orderData.Items {
        reservation := InventoryReservation{
            ProductID: item.ProductID,
            Quantity:  item.Quantity,
        }

        if err := s.inventoryService.ReserveInventory(ctx, reservation); err != nil {
            // 如果预留失败,释放已预留的库存
            for _, prevReservation := range reservations {
                s.inventoryService.ReleaseInventory(ctx, prevReservation)
            }
            return err
        }

        reservations = append(reservations, reservation)
    }

    // 保存预留信息到 Saga 数据中
    orderData.InventoryReservations = reservations

    return nil
}

// 释放库存(补偿操作)
func (s *OrderSagaManager) releaseInventory(ctx context.Context, data interface{}) error {
    orderData := data.(*CreateOrderSagaData)

    for _, reservation := range orderData.InventoryReservations {
        if err := s.inventoryService.ReleaseInventory(ctx, reservation); err != nil {
            // 记录错误但继续执行
            log.Error("failed to release inventory", err)
        }
    }

    return nil
}

// 创建订单
func (s *OrderSagaManager) createOrder(ctx context.Context, data interface{}) error {
    orderData := data.(*CreateOrderSagaData)

    // 构建订单项
    orderItems := make([]OrderItem, len(orderData.Items))
    for i, item := range orderData.Items {
        product, _ := s.productService.GetProduct(ctx, item.ProductID)

        orderItems[i] = OrderItem{
            ProductID:   item.ProductID,
            ProductName: product.Name,
            ProductSKU:  product.SKU,
            Quantity:    item.Quantity,
            UnitPrice:   product.GetEffectivePrice(),
        }
    }

    // 创建订单
    order, err := NewOrder(
        orderData.UserID,
        orderItems,
        orderData.ShippingAddress,
        orderData.BillingAddress,
        orderData.ShippingMethod,
    )
    if err != nil {
        return err
    }

    // 保存订单
    if err := s.orderRepo.Save(ctx, order); err != nil {
        return err
    }

    // 保存订单 ID 到 Saga 数据
    orderData.OrderID = order.ID

    return nil
}

// 取消订单(补偿操作)
func (s *OrderSagaManager) cancelOrder(ctx context.Context, data interface{}) error {
    orderData := data.(*CreateOrderSagaData)

    if orderData.OrderID != "" {
        order, err := s.orderRepo.GetByID(ctx, orderData.OrderID)
        if err != nil {
            return err
        }

        if err := order.Cancel("saga compensation"); err != nil {
            return err
        }

        return s.orderRepo.Update(ctx, order)
    }

    return nil
}

// 处理支付
func (s *OrderSagaManager) processPayment(ctx context.Context, data interface{}) error {
    orderData := data.(*CreateOrderSagaData)

    paymentRequest := PaymentRequest{
        OrderID:       orderData.OrderID,
        Amount:        orderData.PaymentAmount,
        PaymentMethod: orderData.PaymentMethod,
        UserID:        orderData.UserID,
    }

    paymentResult, err := s.paymentService.ProcessPayment(ctx, paymentRequest)
    if err != nil {
        return err
    }

    // 保存支付信息到 Saga 数据
    orderData.PaymentID = paymentResult.PaymentID
    orderData.TransactionID = paymentResult.TransactionID

    return nil
}

// 退款(补偿操作)
func (s *OrderSagaManager) refundPayment(ctx context.Context, data interface{}) error {
    orderData := data.(*CreateOrderSagaData)

    if orderData.PaymentID != "" {
        refundRequest := RefundRequest{
            PaymentID: orderData.PaymentID,
            Amount:    orderData.PaymentAmount,
            Reason:    "saga compensation",
        }

        return s.paymentService.RefundPayment(ctx, refundRequest)
    }

    return nil
}

// 确认订单
func (s *OrderSagaManager) confirmOrder(ctx context.Context, data interface{}) error {
    orderData := data.(*CreateOrderSagaData)

    order, err := s.orderRepo.GetByID(ctx, orderData.OrderID)
    if err != nil {
        return err
    }

    paymentInfo := PaymentInfo{
        PaymentID:     orderData.PaymentID,
        PaymentMethod: orderData.PaymentMethod,
        PaymentStatus: PaymentStatusCompleted,
        TransactionID: orderData.TransactionID,
        PaidAmount:    orderData.PaymentAmount,
    }

    if err := order.Pay(paymentInfo); err != nil {
        return err
    }

    return s.orderRepo.Update(ctx, order)
}

// Saga 数据结构
type CreateOrderSagaData struct {
    UserID                string                  `json:"user_id"`
    Items                 []OrderItemData         `json:"items"`
    ShippingAddress       Address                 `json:"shipping_address"`
    BillingAddress        Address                 `json:"billing_address"`
    ShippingMethod        ShippingMethod          `json:"shipping_method"`
    PaymentMethod         PaymentMethod           `json:"payment_method"`
    PaymentAmount         Money                   `json:"payment_amount"`

    // 执行过程中生成的数据
    OrderID               string                  `json:"order_id,omitempty"`
    PaymentID             string                  `json:"payment_id,omitempty"`
    TransactionID         string                  `json:"transaction_id,omitempty"`
    InventoryReservations []InventoryReservation  `json:"inventory_reservations,omitempty"`
}

type OrderItemData struct {
    ProductID string `json:"product_id"`
    Quantity  int    `json:"quantity"`
}

type InventoryReservation struct {
    ProductID string `json:"product_id"`
    Quantity  int    `json:"quantity"`
}

// Saga 实体
type Saga struct {
    ID                  string              `json:"id"`
    Type                string              `json:"type"`
    Status              SagaStatus          `json:"status"`
    Data                interface{}         `json:"data"`
    Steps               []SagaStep          `json:"steps"`
    CurrentStep         int                 `json:"current_step"`
    CompletedSteps      []string            `json:"completed_steps"`
    FailureReason       string              `json:"failure_reason,omitempty"`
    CompensationErrors  []CompensationError `json:"compensation_errors,omitempty"`
    CreatedAt           time.Time           `json:"created_at"`
    UpdatedAt           time.Time           `json:"updated_at"`
    CompletedAt         *time.Time          `json:"completed_at,omitempty"`
}

type SagaStep struct {
    Name        string                                      `json:"name"`
    Handler     func(ctx context.Context, data interface{}) error `json:"-"`
    Compensator func(ctx context.Context, data interface{}) error `json:"-"`
}

type SagaStatus int

const (
    SagaStatusStarted SagaStatus = iota + 1
    SagaStatusCompleted
    SagaStatusCompensating
    SagaStatusFailed
)

type CompensationError struct {
    Step  string `json:"step"`
    Error string `json:"error"`
}

订单命令处理 #

// internal/application/command/order_command_handler.go
package command

import (
    "context"
    "errors"

    "github.com/ecommerce/order-service/internal/application/saga"
    "github.com/ecommerce/order-service/internal/domain/entity"
    "github.com/ecommerce/order-service/internal/domain/repository"
    "github.com/ecommerce/order-service/pkg/validator"
)

type OrderCommandHandler struct {
    orderRepo   repository.OrderRepository
    sagaManager *saga.OrderSagaManager
    eventBus    EventBus
    validator   *validator.Validator
}

func NewOrderCommandHandler(
    orderRepo repository.OrderRepository,
    sagaManager *saga.OrderSagaManager,
    eventBus EventBus,
    validator *validator.Validator,
) *OrderCommandHandler {
    return &OrderCommandHandler{
        orderRepo:   orderRepo,
        sagaManager: sagaManager,
        eventBus:    eventBus,
        validator:   validator,
    }
}

// 处理创建订单命令
func (h *OrderCommandHandler) HandleCreateOrder(ctx context.Context, cmd *CreateOrderCommand) (*CreateOrderResult, error) {
    // 验证命令
    if err := h.validator.Validate(cmd); err != nil {
        return nil, err
    }

    // 构建 Saga 数据
    sagaData := &saga.CreateOrderSagaData{
        UserID:          cmd.UserID,
        Items:           h.toOrderItemData(cmd.Items),
        ShippingAddress: cmd.ShippingAddress,
        BillingAddress:  cmd.BillingAddress,
        ShippingMethod:  cmd.ShippingMethod,
        PaymentMethod:   cmd.PaymentMethod,
        PaymentAmount:   cmd.PaymentAmount,
    }

    // 启动 Saga 事务
    if err := h.sagaManager.CreateOrderSaga(ctx, sagaData); err != nil {
        return nil, err
    }

    return &CreateOrderResult{
        SagaID: sagaData.OrderID, // 这里应该返回 Saga ID
        Message: "Order creation started",
    }, nil
}

// 处理取消订单命令
func (h *OrderCommandHandler) HandleCancelOrder(ctx context.Context, cmd *CancelOrderCommand) error {
    // 验证命令
    if err := h.validator.Validate(cmd); err != nil {
        return err
    }

    // 获取订单
    order, err := h.orderRepo.GetByID(ctx, cmd.OrderID)
    if err != nil {
        return err
    }

    // 检查权限
    if order.UserID != cmd.UserID {
        return errors.New("permission denied")
    }

    // 取消订单
    if err := order.Cancel(cmd.Reason); err != nil {
        return err
    }

    // 保存更新
    if err := h.orderRepo.Update(ctx, order); err != nil {
        return err
    }

    // 发布领域事件
    for _, event := range order.GetEvents() {
        if err := h.eventBus.Publish(ctx, event); err != nil {
            log.Error("failed to publish event", err)
        }
    }

    order.ClearEvents()
    return nil
}

// 处理订单发货命令
func (h *OrderCommandHandler) HandleShipOrder(ctx context.Context, cmd *ShipOrderCommand) error {
    // 验证命令
    if err := h.validator.Validate(cmd); err != nil {
        return err
    }

    // 获取订单
    order, err := h.orderRepo.GetByID(ctx, cmd.OrderID)
    if err != nil {
        return err
    }

    // 发货
    if err := order.Ship(cmd.TrackingNumber); err != nil {
        return err
    }

    // 保存更新
    if err := h.orderRepo.Update(ctx, order); err != nil {
        return err
    }

    // 发布领域事件
    for _, event := range order.GetEvents() {
        if err := h.eventBus.Publish(ctx, event); err != nil {
            log.Error("failed to publish event", err)
        }
    }

    order.ClearEvents()
    return nil
}

// 处理确认收货命令
func (h *OrderCommandHandler) HandleConfirmDelivery(ctx context.Context, cmd *ConfirmDeliveryCommand) error {
    // 验证命令
    if err := h.validator.Validate(cmd); err != nil {
        return err
    }

    // 获取订单
    order, err := h.orderRepo.GetByID(ctx, cmd.OrderID)
    if err != nil {
        return err
    }

    // 检查权限
    if order.UserID != cmd.UserID {
        return errors.New("permission denied")
    }

    // 确认收货
    if err := order.Deliver(); err != nil {
        return err
    }

    // 保存更新
    if err := h.orderRepo.Update(ctx, order); err != nil {
        return err
    }

    // 发布领域事件
    for _, event := range order.GetEvents() {
        if err := h.eventBus.Publish(ctx, event); err != nil {
            log.Error("failed to publish event", err)
        }
    }

    order.ClearEvents()
    return nil
}

// 辅助方法
func (h *OrderCommandHandler) toOrderItemData(items []OrderItemCommand) []saga.OrderItemData {
    result := make([]saga.OrderItemData, len(items))
    for i, item := range items {
        result[i] = saga.OrderItemData{
            ProductID: item.ProductID,
            Quantity:  item.Quantity,
        }
    }
    return result
}

// 命令定义
type CreateOrderCommand struct {
    UserID          string             `json:"user_id" validate:"required"`
    Items           []OrderItemCommand `json:"items" validate:"required,min=1"`
    ShippingAddress Address            `json:"shipping_address" validate:"required"`
    BillingAddress  Address            `json:"billing_address" validate:"required"`
    ShippingMethod  ShippingMethod     `json:"shipping_method" validate:"required"`
    PaymentMethod   PaymentMethod      `json:"payment_method" validate:"required"`
    PaymentAmount   Money              `json:"payment_amount" validate:"required"`
    CouponCode      string             `json:"coupon_code,omitempty"`
    Notes           string             `json:"notes,omitempty"`
}

type OrderItemCommand struct {
    ProductID string `json:"product_id" validate:"required"`
    Quantity  int    `json:"quantity" validate:"min=1"`
}

type CancelOrderCommand struct {
    OrderID string `json:"order_id" validate:"required"`
    UserID  string `json:"user_id" validate:"required"`
    Reason  string `json:"reason" validate:"required"`
}

type ShipOrderCommand struct {
    OrderID        string `json:"order_id" validate:"required"`
    TrackingNumber string `json:"tracking_number" validate:"required"`
}

type ConfirmDeliveryCommand struct {
    OrderID string `json:"order_id" validate:"required"`
    UserID  string `json:"user_id" validate:"required"`
}

// 结果类型
type CreateOrderResult struct {
    SagaID  string `json:"saga_id"`
    Message string `json:"message"`
}

事件处理器 #

订单事件处理器 #

// internal/application/event/order_event_handler.go
package event

import (
    "context"
    "encoding/json"

    "github.com/ecommerce/order-service/internal/domain/event"
)

type OrderEventHandler struct {
    notificationService NotificationService
    inventoryService    InventoryService
    userService         UserService
    analyticsService    AnalyticsService
}

func NewOrderEventHandler(
    notificationService NotificationService,
    inventoryService InventoryService,
    userService UserService,
    analyticsService AnalyticsService,
) *OrderEventHandler {
    return &OrderEventHandler{
        notificationService: notificationService,
        inventoryService:    inventoryService,
        userService:         userService,
        analyticsService:    analyticsService,
    }
}

// 处理订单创建事件
func (h *OrderEventHandler) HandleOrderCreated(ctx context.Context, eventData []byte) error {
    var event event.OrderCreatedEvent
    if err := json.Unmarshal(eventData, &event); err != nil {
        return err
    }

    // 发送订单创建通知
    notification := Notification{
        UserID:  event.UserID,
        Type:    NotificationTypeOrderCreated,
        Title:   "订单创建成功",
        Content: fmt.Sprintf("您的订单 %s 已创建成功,金额:%s", event.AggregateID, formatMoney(event.FinalAmount)),
    }

    if err := h.notificationService.SendNotification(ctx, notification); err != nil {
        log.Error("failed to send order created notification", err)
    }

    // 记录分析数据
    analyticsData := AnalyticsData{
        EventType: "order_created",
        UserID:    event.UserID,
        OrderID:   event.AggregateID,
        Amount:    event.FinalAmount.Amount,
        Timestamp: event.OccurredAt,
    }

    if err := h.analyticsService.RecordEvent(ctx, analyticsData); err != nil {
        log.Error("failed to record analytics data", err)
    }

    return nil
}

// 处理订单支付事件
func (h *OrderEventHandler) HandleOrderPaid(ctx context.Context, eventData []byte) error {
    var event event.OrderPaidEvent
    if err := json.Unmarshal(eventData, &event); err != nil {
        return err
    }

    // 发送支付成功通知
    notification := Notification{
        UserID:  event.UserID,
        Type:    NotificationTypeOrderPaid,
        Title:   "支付成功",
        Content: fmt.Sprintf("您的订单 %s 支付成功,我们将尽快为您发货", event.AggregateID),
    }

    if err := h.notificationService.SendNotification(ctx, notification); err != nil {
        log.Error("failed to send order paid notification", err)
    }

    // 更新用户积分
    points := calculatePoints(event.PaymentInfo.PaidAmount)
    if err := h.userService.AddPoints(ctx, event.UserID, points); err != nil {
        log.Error("failed to add user points", err)
    }

    return nil
}

// 处理订单发货事件
func (h *OrderEventHandler) HandleOrderShipped(ctx context.Context, eventData []byte) error {
    var event event.OrderShippedEvent
    if err := json.Unmarshal(eventData, &event); err != nil {
        return err
    }

    // 发送发货通知
    notification := Notification{
        UserID:  event.UserID,
        Type:    NotificationTypeOrderShipped,
        Title:   "订单已发货",
        Content: fmt.Sprintf("您的订单 %s 已发货,物流单号:%s", event.AggregateID, event.TrackingNumber),
    }

    if err := h.notificationService.SendNotification(ctx, notification); err != nil {
        log.Error("failed to send order shipped notification", err)
    }

    return nil
}

// 处理订单取消事件
func (h *OrderEventHandler) HandleOrderCancelled(ctx context.Context, eventData []byte) error {
    var event event.OrderCancelledEvent
    if err := json.Unmarshal(eventData, &event); err != nil {
        return err
    }

    // 发送取消通知
    notification := Notification{
        UserID:  event.UserID,
        Type:    NotificationTypeOrderCancelled,
        Title:   "订单已取消",
        Content: fmt.Sprintf("您的订单 %s 已取消,原因:%s", event.AggregateID, event.Reason),
    }

    if err := h.notificationService.SendNotification(ctx, notification); err != nil {
        log.Error("failed to send order cancelled notification", err)
    }

    return nil
}

// 辅助方法
func formatMoney(money Money) string {
    return fmt.Sprintf("%.2f %s", float64(money.Amount)/100, money.Currency)
}

func calculatePoints(amount Money) int {
    // 每消费 1 元获得 1 积分
    return int(amount.Amount / 100)
}

// 通知类型
type NotificationType int

const (
    NotificationTypeOrderCreated NotificationType = iota + 1
    NotificationTypeOrderPaid
    NotificationTypeOrderShipped
    NotificationTypeOrderDelivered
    NotificationTypeOrderCancelled
    NotificationTypeOrderRefunded
)

type Notification struct {
    UserID  string           `json:"user_id"`
    Type    NotificationType `json:"type"`
    Title   string           `json:"title"`
    Content string           `json:"content"`
}

type AnalyticsData struct {
    EventType string    `json:"event_type"`
    UserID    string    `json:"user_id"`
    OrderID   string    `json:"order_id"`
    Amount    int64     `json:"amount"`
    Timestamp time.Time `json:"timestamp"`
}

通过本节的学习,我们完成了订单服务的核心功能实现,包括订单管理、状态流转、Saga 分布式事务处理和事件处理。下一节将进行系统集成和综合实战。