5.9.4 订单服务开发 #
订单服务是电商系统的核心业务服务,负责处理订单的创建、支付、发货、退款等完整生命周期管理。本节将详细介绍如何设计和实现一个支持分布式事务的订单服务。
服务设计概述 #
核心功能 #
订单服务需要支持以下核心功能:
- 订单管理:订单创建、查询、修改、取消
- 状态管理:订单状态流转和状态机管理
- 支付集成:支付处理和支付状态同步
- 库存协调:与库存服务的协调和一致性保证
- 分布式事务:使用 Saga 模式处理跨服务事务
- 订单履约:发货、配送、签收流程管理
订单状态机 #
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ PENDING │───▶│ PAID │───▶│ SHIPPED │
│ (待支付) │ │ (已支付) │ │ (已发货) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
│ │ ▼
│ │ ┌─────────────┐
│ │ │ DELIVERED │
│ │ │ (已送达) │
│ │ └─────────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ CANCELLED │ │ REFUNDED │ │ COMPLETED │
│ (已取消) │ │ (已退款) │ │ (已完成) │
└─────────────┘ └─────────────┘ └─────────────┘
领域模型设计 #
订单聚合 #
// internal/domain/entity/order.go
package entity
import (
"errors"
"time"
)
// 订单聚合根
type Order struct {
ID string `json:"id"`
OrderNumber string `json:"order_number"`
UserID string `json:"user_id"`
Items []OrderItem `json:"items"`
TotalAmount Money `json:"total_amount"`
DiscountAmount Money `json:"discount_amount"`
ShippingAmount Money `json:"shipping_amount"`
TaxAmount Money `json:"tax_amount"`
FinalAmount Money `json:"final_amount"`
Status OrderStatus `json:"status"`
PaymentInfo PaymentInfo `json:"payment_info"`
ShippingAddress Address `json:"shipping_address"`
BillingAddress Address `json:"billing_address"`
ShippingMethod ShippingMethod `json:"shipping_method"`
CouponCode string `json:"coupon_code,omitempty"`
Notes string `json:"notes"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
PaidAt *time.Time `json:"paid_at,omitempty"`
ShippedAt *time.Time `json:"shipped_at,omitempty"`
DeliveredAt *time.Time `json:"delivered_at,omitempty"`
CancelledAt *time.Time `json:"cancelled_at,omitempty"`
// 领域事件
events []DomainEvent
}
// 订单项
type OrderItem struct {
ID string `json:"id"`
ProductID string `json:"product_id"`
ProductName string `json:"product_name"`
ProductSKU string `json:"product_sku"`
VariantID string `json:"variant_id,omitempty"`
Quantity int `json:"quantity"`
UnitPrice Money `json:"unit_price"`
TotalPrice Money `json:"total_price"`
Image string `json:"image"`
Attributes []ProductAttribute `json:"attributes"`
}
// 商品属性
type ProductAttribute struct {
Name string `json:"name"`
Value string `json:"value"`
}
// 货币
type Money struct {
Amount int64 `json:"amount"` // 以分为单位
Currency string `json:"currency"` // 货币代码
}
// 支付信息
type PaymentInfo struct {
PaymentID string `json:"payment_id"`
PaymentMethod PaymentMethod `json:"payment_method"`
PaymentStatus PaymentStatus `json:"payment_status"`
TransactionID string `json:"transaction_id,omitempty"`
PaidAmount Money `json:"paid_amount"`
PaidAt *time.Time `json:"paid_at,omitempty"`
}
// 地址
type Address struct {
RecipientName string `json:"recipient_name"`
Phone string `json:"phone"`
Street string `json:"street"`
City string `json:"city"`
Province string `json:"province"`
Country string `json:"country"`
ZipCode string `json:"zip_code"`
}
// 配送方式
type ShippingMethod struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Price Money `json:"price"`
EstimatedDays int `json:"estimated_days"`
}
// 订单状态
type OrderStatus int
const (
OrderStatusPending OrderStatus = iota + 1
OrderStatusPaid
OrderStatusShipped
OrderStatusDelivered
OrderStatusCompleted
OrderStatusCancelled
OrderStatusRefunded
)
// 支付方式
type PaymentMethod int
const (
PaymentMethodCreditCard PaymentMethod = iota + 1
PaymentMethodDebitCard
PaymentMethodPayPal
PaymentMethodAlipay
PaymentMethodWechatPay
PaymentMethodBankTransfer
)
// 支付状态
type PaymentStatus int
const (
PaymentStatusPending PaymentStatus = iota + 1
PaymentStatusProcessing
PaymentStatusCompleted
PaymentStatusFailed
PaymentStatusCancelled
PaymentStatusRefunded
)
// 创建新订单
func NewOrder(userID string, items []OrderItem, shippingAddress, billingAddress Address, shippingMethod ShippingMethod) (*Order, error) {
if err := validateOrderInput(userID, items, shippingAddress); err != nil {
return nil, err
}
// 计算订单金额
totalAmount := calculateTotalAmount(items)
shippingAmount := shippingMethod.Price
finalAmount := Money{
Amount: totalAmount.Amount + shippingAmount.Amount,
Currency: totalAmount.Currency,
}
order := &Order{
ID: generateID(),
OrderNumber: generateOrderNumber(),
UserID: userID,
Items: items,
TotalAmount: totalAmount,
ShippingAmount: shippingAmount,
FinalAmount: finalAmount,
Status: OrderStatusPending,
ShippingAddress: shippingAddress,
BillingAddress: billingAddress,
ShippingMethod: shippingMethod,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
events: make([]DomainEvent, 0),
}
order.addEvent(NewOrderCreatedEvent(order.ID, order.UserID, order.FinalAmount))
return order, nil
}
// 支付订单
func (o *Order) Pay(paymentInfo PaymentInfo) error {
if o.Status != OrderStatusPending {
return errors.New("order cannot be paid in current status")
}
if paymentInfo.PaidAmount.Amount != o.FinalAmount.Amount {
return errors.New("payment amount does not match order amount")
}
now := time.Now()
o.Status = OrderStatusPaid
o.PaymentInfo = paymentInfo
o.PaidAt = &now
o.UpdatedAt = now
o.addEvent(NewOrderPaidEvent(o.ID, o.UserID, paymentInfo))
return nil
}
// 发货
func (o *Order) Ship(trackingNumber string) error {
if o.Status != OrderStatusPaid {
return errors.New("order must be paid before shipping")
}
now := time.Now()
o.Status = OrderStatusShipped
o.ShippedAt = &now
o.UpdatedAt = now
o.addEvent(NewOrderShippedEvent(o.ID, o.UserID, trackingNumber))
return nil
}
// 确认收货
func (o *Order) Deliver() error {
if o.Status != OrderStatusShipped {
return errors.New("order must be shipped before delivery")
}
now := time.Now()
o.Status = OrderStatusDelivered
o.DeliveredAt = &now
o.UpdatedAt = now
o.addEvent(NewOrderDeliveredEvent(o.ID, o.UserID))
return nil
}
// 完成订单
func (o *Order) Complete() error {
if o.Status != OrderStatusDelivered {
return errors.New("order must be delivered before completion")
}
o.Status = OrderStatusCompleted
o.UpdatedAt = time.Now()
o.addEvent(NewOrderCompletedEvent(o.ID, o.UserID))
return nil
}
// 取消订单
func (o *Order) Cancel(reason string) error {
if o.Status != OrderStatusPending {
return errors.New("order cannot be cancelled in current status")
}
now := time.Now()
o.Status = OrderStatusCancelled
o.CancelledAt = &now
o.UpdatedAt = now
o.addEvent(NewOrderCancelledEvent(o.ID, o.UserID, reason))
return nil
}
// 退款
func (o *Order) Refund(refundAmount Money, reason string) error {
if o.Status != OrderStatusPaid && o.Status != OrderStatusShipped && o.Status != OrderStatusDelivered {
return errors.New("order cannot be refunded in current status")
}
if refundAmount.Amount > o.PaymentInfo.PaidAmount.Amount {
return errors.New("refund amount cannot exceed paid amount")
}
o.Status = OrderStatusRefunded
o.UpdatedAt = time.Now()
o.addEvent(NewOrderRefundedEvent(o.ID, o.UserID, refundAmount, reason))
return nil
}
// 应用优惠券
func (o *Order) ApplyCoupon(couponCode string, discountAmount Money) error {
if o.Status != OrderStatusPending {
return errors.New("coupon can only be applied to pending orders")
}
if discountAmount.Amount >= o.TotalAmount.Amount {
return errors.New("discount amount cannot exceed total amount")
}
o.CouponCode = couponCode
o.DiscountAmount = discountAmount
o.FinalAmount = Money{
Amount: o.TotalAmount.Amount + o.ShippingAmount.Amount - discountAmount.Amount,
Currency: o.TotalAmount.Currency,
}
o.UpdatedAt = time.Now()
o.addEvent(NewOrderCouponAppliedEvent(o.ID, couponCode, discountAmount))
return nil
}
// 添加订单项
func (o *Order) AddItem(item OrderItem) error {
if o.Status != OrderStatusPending {
return errors.New("cannot add items to non-pending order")
}
// 检查是否已存在相同商品
for i, existingItem := range o.Items {
if existingItem.ProductID == item.ProductID && existingItem.VariantID == item.VariantID {
o.Items[i].Quantity += item.Quantity
o.Items[i].TotalPrice = Money{
Amount: o.Items[i].UnitPrice.Amount * int64(o.Items[i].Quantity),
Currency: o.Items[i].UnitPrice.Currency,
}
o.recalculateAmount()
return nil
}
}
item.ID = generateID()
item.TotalPrice = Money{
Amount: item.UnitPrice.Amount * int64(item.Quantity),
Currency: item.UnitPrice.Currency,
}
o.Items = append(o.Items, item)
o.recalculateAmount()
o.addEvent(NewOrderItemAddedEvent(o.ID, item))
return nil
}
// 移除订单项
func (o *Order) RemoveItem(itemID string) error {
if o.Status != OrderStatusPending {
return errors.New("cannot remove items from non-pending order")
}
for i, item := range o.Items {
if item.ID == itemID {
o.Items = append(o.Items[:i], o.Items[i+1:]...)
o.recalculateAmount()
o.addEvent(NewOrderItemRemovedEvent(o.ID, itemID))
return nil
}
}
return errors.New("item not found")
}
// 更新订单项数量
func (o *Order) UpdateItemQuantity(itemID string, quantity int) error {
if o.Status != OrderStatusPending {
return errors.New("cannot update items in non-pending order")
}
if quantity <= 0 {
return o.RemoveItem(itemID)
}
for i, item := range o.Items {
if item.ID == itemID {
o.Items[i].Quantity = quantity
o.Items[i].TotalPrice = Money{
Amount: item.UnitPrice.Amount * int64(quantity),
Currency: item.UnitPrice.Currency,
}
o.recalculateAmount()
o.addEvent(NewOrderItemUpdatedEvent(o.ID, itemID, quantity))
return nil
}
}
return errors.New("item not found")
}
// 重新计算订单金额
func (o *Order) recalculateAmount() {
totalAmount := int64(0)
currency := "CNY"
for _, item := range o.Items {
totalAmount += item.TotalPrice.Amount
currency = item.TotalPrice.Currency
}
o.TotalAmount = Money{Amount: totalAmount, Currency: currency}
o.FinalAmount = Money{
Amount: totalAmount + o.ShippingAmount.Amount - o.DiscountAmount.Amount,
Currency: currency,
}
o.UpdatedAt = time.Now()
}
// 检查订单是否可以取消
func (o *Order) CanCancel() bool {
return o.Status == OrderStatusPending
}
// 检查订单是否可以退款
func (o *Order) CanRefund() bool {
return o.Status == OrderStatusPaid || o.Status == OrderStatusShipped || o.Status == OrderStatusDelivered
}
// 获取订单总数量
func (o *Order) GetTotalQuantity() int {
total := 0
for _, item := range o.Items {
total += item.Quantity
}
return total
}
// 获取领域事件
func (o *Order) GetEvents() []DomainEvent {
return o.events
}
// 清除领域事件
func (o *Order) ClearEvents() {
o.events = make([]DomainEvent, 0)
}
// 添加领域事件
func (o *Order) addEvent(event DomainEvent) {
o.events = append(o.events, event)
}
// 验证订单输入
func validateOrderInput(userID string, items []OrderItem, shippingAddress Address) error {
if userID == "" {
return errors.New("user ID is required")
}
if len(items) == 0 {
return errors.New("order must have at least one item")
}
for _, item := range items {
if item.ProductID == "" {
return errors.New("product ID is required for all items")
}
if item.Quantity <= 0 {
return errors.New("item quantity must be positive")
}
if item.UnitPrice.Amount <= 0 {
return errors.New("item price must be positive")
}
}
if shippingAddress.RecipientName == "" {
return errors.New("recipient name is required")
}
if shippingAddress.Street == "" {
return errors.New("street address is required")
}
return nil
}
// 计算订单总金额
func calculateTotalAmount(items []OrderItem) Money {
totalAmount := int64(0)
currency := "CNY"
for _, item := range items {
totalAmount += item.UnitPrice.Amount * int64(item.Quantity)
currency = item.UnitPrice.Currency
}
return Money{Amount: totalAmount, Currency: currency}
}
// 生成订单号
func generateOrderNumber() string {
return fmt.Sprintf("ORD%d%06d", time.Now().Unix(), rand.Intn(1000000))
}
订单领域事件 #
// internal/domain/event/order_events.go
package event
import "time"
// 订单创建事件
type OrderCreatedEvent struct {
BaseEvent
UserID string `json:"user_id"`
FinalAmount Money `json:"final_amount"`
}
func NewOrderCreatedEvent(orderID, userID string, finalAmount Money) *OrderCreatedEvent {
return &OrderCreatedEvent{
BaseEvent: BaseEvent{
EventID: generateEventID(),
EventType: "order.created",
AggregateID: orderID,
OccurredAt: time.Now(),
},
UserID: userID,
FinalAmount: finalAmount,
}
}
// 订单支付事件
type OrderPaidEvent struct {
BaseEvent
UserID string `json:"user_id"`
PaymentInfo PaymentInfo `json:"payment_info"`
}
func NewOrderPaidEvent(orderID, userID string, paymentInfo PaymentInfo) *OrderPaidEvent {
return &OrderPaidEvent{
BaseEvent: BaseEvent{
EventID: generateEventID(),
EventType: "order.paid",
AggregateID: orderID,
OccurredAt: time.Now(),
},
UserID: userID,
PaymentInfo: paymentInfo,
}
}
// 订单发货事件
type OrderShippedEvent struct {
BaseEvent
UserID string `json:"user_id"`
TrackingNumber string `json:"tracking_number"`
}
func NewOrderShippedEvent(orderID, userID, trackingNumber string) *OrderShippedEvent {
return &OrderShippedEvent{
BaseEvent: BaseEvent{
EventID: generateEventID(),
EventType: "order.shipped",
AggregateID: orderID,
OccurredAt: time.Now(),
},
UserID: userID,
TrackingNumber: trackingNumber,
}
}
// 订单送达事件
type OrderDeliveredEvent struct {
BaseEvent
UserID string `json:"user_id"`
}
func NewOrderDeliveredEvent(orderID, userID string) *OrderDeliveredEvent {
return &OrderDeliveredEvent{
BaseEvent: BaseEvent{
EventID: generateEventID(),
EventType: "order.delivered",
AggregateID: orderID,
OccurredAt: time.Now(),
},
UserID: userID,
}
}
// 订单取消事件
type OrderCancelledEvent struct {
BaseEvent
UserID string `json:"user_id"`
Reason string `json:"reason"`
}
func NewOrderCancelledEvent(orderID, userID, reason string) *OrderCancelledEvent {
return &OrderCancelledEvent{
BaseEvent: BaseEvent{
EventID: generateEventID(),
EventType: "order.cancelled",
AggregateID: orderID,
OccurredAt: time.Now(),
},
UserID: userID,
Reason: reason,
}
}
// 订单退款事件
type OrderRefundedEvent struct {
BaseEvent
UserID string `json:"user_id"`
RefundAmount Money `json:"refund_amount"`
Reason string `json:"reason"`
}
func NewOrderRefundedEvent(orderID, userID string, refundAmount Money, reason string) *OrderRefundedEvent {
return &OrderRefundedEvent{
BaseEvent: BaseEvent{
EventID: generateEventID(),
EventType: "order.refunded",
AggregateID: orderID,
OccurredAt: time.Now(),
},
UserID: userID,
RefundAmount: refundAmount,
Reason: reason,
}
}
应用服务层 #
Saga 事务管理 #
// internal/application/saga/order_saga.go
package saga
import (
"context"
"encoding/json"
"errors"
"time"
)
// Saga 事务管理器
type OrderSagaManager struct {
sagaRepo SagaRepository
userService UserService
productService ProductService
inventoryService InventoryService
paymentService PaymentService
eventBus EventBus
}
func NewOrderSagaManager(
sagaRepo SagaRepository,
userService UserService,
productService ProductService,
inventoryService InventoryService,
paymentService PaymentService,
eventBus EventBus,
) *OrderSagaManager {
return &OrderSagaManager{
sagaRepo: sagaRepo,
userService: userService,
productService: productService,
inventoryService: inventoryService,
paymentService: paymentService,
eventBus: eventBus,
}
}
// 创建订单 Saga
func (s *OrderSagaManager) CreateOrderSaga(ctx context.Context, orderData *CreateOrderSagaData) error {
saga := &Saga{
ID: generateSagaID(),
Type: "create_order",
Status: SagaStatusStarted,
Data: orderData,
Steps: s.buildCreateOrderSteps(),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
// 保存 Saga
if err := s.sagaRepo.Save(ctx, saga); err != nil {
return err
}
// 执行 Saga
return s.executeSaga(ctx, saga)
}
// 构建创建订单的步骤
func (s *OrderSagaManager) buildCreateOrderSteps() []SagaStep {
return []SagaStep{
{
Name: "validate_user",
Handler: s.validateUser,
Compensator: nil, // 验证步骤不需要补偿
},
{
Name: "validate_products",
Handler: s.validateProducts,
Compensator: nil,
},
{
Name: "reserve_inventory",
Handler: s.reserveInventory,
Compensator: s.releaseInventory,
},
{
Name: "create_order",
Handler: s.createOrder,
Compensator: s.cancelOrder,
},
{
Name: "process_payment",
Handler: s.processPayment,
Compensator: s.refundPayment,
},
{
Name: "confirm_order",
Handler: s.confirmOrder,
Compensator: nil,
},
}
}
// 执行 Saga
func (s *OrderSagaManager) executeSaga(ctx context.Context, saga *Saga) error {
for i, step := range saga.Steps {
saga.CurrentStep = i
saga.UpdatedAt = time.Now()
// 更新 Saga 状态
if err := s.sagaRepo.Update(ctx, saga); err != nil {
return err
}
// 执行步骤
if err := step.Handler(ctx, saga.Data); err != nil {
// 执行失败,开始补偿
saga.Status = SagaStatusCompensating
saga.FailureReason = err.Error()
s.sagaRepo.Update(ctx, saga)
return s.compensateSaga(ctx, saga, i)
}
// 记录步骤完成
saga.CompletedSteps = append(saga.CompletedSteps, step.Name)
}
// 所有步骤完成
saga.Status = SagaStatusCompleted
saga.CompletedAt = time.Now()
saga.UpdatedAt = time.Now()
return s.sagaRepo.Update(ctx, saga)
}
// 补偿 Saga
func (s *OrderSagaManager) compensateSaga(ctx context.Context, saga *Saga, failedStepIndex int) error {
// 从失败步骤开始,逆序执行补偿操作
for i := failedStepIndex - 1; i >= 0; i-- {
step := saga.Steps[i]
if step.Compensator != nil {
if err := step.Compensator(ctx, saga.Data); err != nil {
// 补偿失败,记录错误但继续执行其他补偿
saga.CompensationErrors = append(saga.CompensationErrors,
CompensationError{
Step: step.Name,
Error: err.Error(),
})
}
}
}
saga.Status = SagaStatusFailed
saga.UpdatedAt = time.Now()
return s.sagaRepo.Update(ctx, saga)
}
// Saga 步骤实现
// 验证用户
func (s *OrderSagaManager) validateUser(ctx context.Context, data interface{}) error {
orderData := data.(*CreateOrderSagaData)
user, err := s.userService.GetUser(ctx, orderData.UserID)
if err != nil {
return errors.New("user not found")
}
if user.Status != UserStatusActive {
return errors.New("user is not active")
}
return nil
}
// 验证商品
func (s *OrderSagaManager) validateProducts(ctx context.Context, data interface{}) error {
orderData := data.(*CreateOrderSagaData)
for _, item := range orderData.Items {
product, err := s.productService.GetProduct(ctx, item.ProductID)
if err != nil {
return errors.New("product not found: " + item.ProductID)
}
if product.Status != ProductStatusActive {
return errors.New("product is not active: " + item.ProductID)
}
if !product.IsInStock() {
return errors.New("product is out of stock: " + item.ProductID)
}
}
return nil
}
// 预留库存
func (s *OrderSagaManager) reserveInventory(ctx context.Context, data interface{}) error {
orderData := data.(*CreateOrderSagaData)
reservations := make([]InventoryReservation, 0)
for _, item := range orderData.Items {
reservation := InventoryReservation{
ProductID: item.ProductID,
Quantity: item.Quantity,
}
if err := s.inventoryService.ReserveInventory(ctx, reservation); err != nil {
// 如果预留失败,释放已预留的库存
for _, prevReservation := range reservations {
s.inventoryService.ReleaseInventory(ctx, prevReservation)
}
return err
}
reservations = append(reservations, reservation)
}
// 保存预留信息到 Saga 数据中
orderData.InventoryReservations = reservations
return nil
}
// 释放库存(补偿操作)
func (s *OrderSagaManager) releaseInventory(ctx context.Context, data interface{}) error {
orderData := data.(*CreateOrderSagaData)
for _, reservation := range orderData.InventoryReservations {
if err := s.inventoryService.ReleaseInventory(ctx, reservation); err != nil {
// 记录错误但继续执行
log.Error("failed to release inventory", err)
}
}
return nil
}
// 创建订单
func (s *OrderSagaManager) createOrder(ctx context.Context, data interface{}) error {
orderData := data.(*CreateOrderSagaData)
// 构建订单项
orderItems := make([]OrderItem, len(orderData.Items))
for i, item := range orderData.Items {
product, _ := s.productService.GetProduct(ctx, item.ProductID)
orderItems[i] = OrderItem{
ProductID: item.ProductID,
ProductName: product.Name,
ProductSKU: product.SKU,
Quantity: item.Quantity,
UnitPrice: product.GetEffectivePrice(),
}
}
// 创建订单
order, err := NewOrder(
orderData.UserID,
orderItems,
orderData.ShippingAddress,
orderData.BillingAddress,
orderData.ShippingMethod,
)
if err != nil {
return err
}
// 保存订单
if err := s.orderRepo.Save(ctx, order); err != nil {
return err
}
// 保存订单 ID 到 Saga 数据
orderData.OrderID = order.ID
return nil
}
// 取消订单(补偿操作)
func (s *OrderSagaManager) cancelOrder(ctx context.Context, data interface{}) error {
orderData := data.(*CreateOrderSagaData)
if orderData.OrderID != "" {
order, err := s.orderRepo.GetByID(ctx, orderData.OrderID)
if err != nil {
return err
}
if err := order.Cancel("saga compensation"); err != nil {
return err
}
return s.orderRepo.Update(ctx, order)
}
return nil
}
// 处理支付
func (s *OrderSagaManager) processPayment(ctx context.Context, data interface{}) error {
orderData := data.(*CreateOrderSagaData)
paymentRequest := PaymentRequest{
OrderID: orderData.OrderID,
Amount: orderData.PaymentAmount,
PaymentMethod: orderData.PaymentMethod,
UserID: orderData.UserID,
}
paymentResult, err := s.paymentService.ProcessPayment(ctx, paymentRequest)
if err != nil {
return err
}
// 保存支付信息到 Saga 数据
orderData.PaymentID = paymentResult.PaymentID
orderData.TransactionID = paymentResult.TransactionID
return nil
}
// 退款(补偿操作)
func (s *OrderSagaManager) refundPayment(ctx context.Context, data interface{}) error {
orderData := data.(*CreateOrderSagaData)
if orderData.PaymentID != "" {
refundRequest := RefundRequest{
PaymentID: orderData.PaymentID,
Amount: orderData.PaymentAmount,
Reason: "saga compensation",
}
return s.paymentService.RefundPayment(ctx, refundRequest)
}
return nil
}
// 确认订单
func (s *OrderSagaManager) confirmOrder(ctx context.Context, data interface{}) error {
orderData := data.(*CreateOrderSagaData)
order, err := s.orderRepo.GetByID(ctx, orderData.OrderID)
if err != nil {
return err
}
paymentInfo := PaymentInfo{
PaymentID: orderData.PaymentID,
PaymentMethod: orderData.PaymentMethod,
PaymentStatus: PaymentStatusCompleted,
TransactionID: orderData.TransactionID,
PaidAmount: orderData.PaymentAmount,
}
if err := order.Pay(paymentInfo); err != nil {
return err
}
return s.orderRepo.Update(ctx, order)
}
// Saga 数据结构
type CreateOrderSagaData struct {
UserID string `json:"user_id"`
Items []OrderItemData `json:"items"`
ShippingAddress Address `json:"shipping_address"`
BillingAddress Address `json:"billing_address"`
ShippingMethod ShippingMethod `json:"shipping_method"`
PaymentMethod PaymentMethod `json:"payment_method"`
PaymentAmount Money `json:"payment_amount"`
// 执行过程中生成的数据
OrderID string `json:"order_id,omitempty"`
PaymentID string `json:"payment_id,omitempty"`
TransactionID string `json:"transaction_id,omitempty"`
InventoryReservations []InventoryReservation `json:"inventory_reservations,omitempty"`
}
type OrderItemData struct {
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
}
type InventoryReservation struct {
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
}
// Saga 实体
type Saga struct {
ID string `json:"id"`
Type string `json:"type"`
Status SagaStatus `json:"status"`
Data interface{} `json:"data"`
Steps []SagaStep `json:"steps"`
CurrentStep int `json:"current_step"`
CompletedSteps []string `json:"completed_steps"`
FailureReason string `json:"failure_reason,omitempty"`
CompensationErrors []CompensationError `json:"compensation_errors,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
type SagaStep struct {
Name string `json:"name"`
Handler func(ctx context.Context, data interface{}) error `json:"-"`
Compensator func(ctx context.Context, data interface{}) error `json:"-"`
}
type SagaStatus int
const (
SagaStatusStarted SagaStatus = iota + 1
SagaStatusCompleted
SagaStatusCompensating
SagaStatusFailed
)
type CompensationError struct {
Step string `json:"step"`
Error string `json:"error"`
}
订单命令处理 #
// internal/application/command/order_command_handler.go
package command
import (
"context"
"errors"
"github.com/ecommerce/order-service/internal/application/saga"
"github.com/ecommerce/order-service/internal/domain/entity"
"github.com/ecommerce/order-service/internal/domain/repository"
"github.com/ecommerce/order-service/pkg/validator"
)
type OrderCommandHandler struct {
orderRepo repository.OrderRepository
sagaManager *saga.OrderSagaManager
eventBus EventBus
validator *validator.Validator
}
func NewOrderCommandHandler(
orderRepo repository.OrderRepository,
sagaManager *saga.OrderSagaManager,
eventBus EventBus,
validator *validator.Validator,
) *OrderCommandHandler {
return &OrderCommandHandler{
orderRepo: orderRepo,
sagaManager: sagaManager,
eventBus: eventBus,
validator: validator,
}
}
// 处理创建订单命令
func (h *OrderCommandHandler) HandleCreateOrder(ctx context.Context, cmd *CreateOrderCommand) (*CreateOrderResult, error) {
// 验证命令
if err := h.validator.Validate(cmd); err != nil {
return nil, err
}
// 构建 Saga 数据
sagaData := &saga.CreateOrderSagaData{
UserID: cmd.UserID,
Items: h.toOrderItemData(cmd.Items),
ShippingAddress: cmd.ShippingAddress,
BillingAddress: cmd.BillingAddress,
ShippingMethod: cmd.ShippingMethod,
PaymentMethod: cmd.PaymentMethod,
PaymentAmount: cmd.PaymentAmount,
}
// 启动 Saga 事务
if err := h.sagaManager.CreateOrderSaga(ctx, sagaData); err != nil {
return nil, err
}
return &CreateOrderResult{
SagaID: sagaData.OrderID, // 这里应该返回 Saga ID
Message: "Order creation started",
}, nil
}
// 处理取消订单命令
func (h *OrderCommandHandler) HandleCancelOrder(ctx context.Context, cmd *CancelOrderCommand) error {
// 验证命令
if err := h.validator.Validate(cmd); err != nil {
return err
}
// 获取订单
order, err := h.orderRepo.GetByID(ctx, cmd.OrderID)
if err != nil {
return err
}
// 检查权限
if order.UserID != cmd.UserID {
return errors.New("permission denied")
}
// 取消订单
if err := order.Cancel(cmd.Reason); err != nil {
return err
}
// 保存更新
if err := h.orderRepo.Update(ctx, order); err != nil {
return err
}
// 发布领域事件
for _, event := range order.GetEvents() {
if err := h.eventBus.Publish(ctx, event); err != nil {
log.Error("failed to publish event", err)
}
}
order.ClearEvents()
return nil
}
// 处理订单发货命令
func (h *OrderCommandHandler) HandleShipOrder(ctx context.Context, cmd *ShipOrderCommand) error {
// 验证命令
if err := h.validator.Validate(cmd); err != nil {
return err
}
// 获取订单
order, err := h.orderRepo.GetByID(ctx, cmd.OrderID)
if err != nil {
return err
}
// 发货
if err := order.Ship(cmd.TrackingNumber); err != nil {
return err
}
// 保存更新
if err := h.orderRepo.Update(ctx, order); err != nil {
return err
}
// 发布领域事件
for _, event := range order.GetEvents() {
if err := h.eventBus.Publish(ctx, event); err != nil {
log.Error("failed to publish event", err)
}
}
order.ClearEvents()
return nil
}
// 处理确认收货命令
func (h *OrderCommandHandler) HandleConfirmDelivery(ctx context.Context, cmd *ConfirmDeliveryCommand) error {
// 验证命令
if err := h.validator.Validate(cmd); err != nil {
return err
}
// 获取订单
order, err := h.orderRepo.GetByID(ctx, cmd.OrderID)
if err != nil {
return err
}
// 检查权限
if order.UserID != cmd.UserID {
return errors.New("permission denied")
}
// 确认收货
if err := order.Deliver(); err != nil {
return err
}
// 保存更新
if err := h.orderRepo.Update(ctx, order); err != nil {
return err
}
// 发布领域事件
for _, event := range order.GetEvents() {
if err := h.eventBus.Publish(ctx, event); err != nil {
log.Error("failed to publish event", err)
}
}
order.ClearEvents()
return nil
}
// 辅助方法
func (h *OrderCommandHandler) toOrderItemData(items []OrderItemCommand) []saga.OrderItemData {
result := make([]saga.OrderItemData, len(items))
for i, item := range items {
result[i] = saga.OrderItemData{
ProductID: item.ProductID,
Quantity: item.Quantity,
}
}
return result
}
// 命令定义
type CreateOrderCommand struct {
UserID string `json:"user_id" validate:"required"`
Items []OrderItemCommand `json:"items" validate:"required,min=1"`
ShippingAddress Address `json:"shipping_address" validate:"required"`
BillingAddress Address `json:"billing_address" validate:"required"`
ShippingMethod ShippingMethod `json:"shipping_method" validate:"required"`
PaymentMethod PaymentMethod `json:"payment_method" validate:"required"`
PaymentAmount Money `json:"payment_amount" validate:"required"`
CouponCode string `json:"coupon_code,omitempty"`
Notes string `json:"notes,omitempty"`
}
type OrderItemCommand struct {
ProductID string `json:"product_id" validate:"required"`
Quantity int `json:"quantity" validate:"min=1"`
}
type CancelOrderCommand struct {
OrderID string `json:"order_id" validate:"required"`
UserID string `json:"user_id" validate:"required"`
Reason string `json:"reason" validate:"required"`
}
type ShipOrderCommand struct {
OrderID string `json:"order_id" validate:"required"`
TrackingNumber string `json:"tracking_number" validate:"required"`
}
type ConfirmDeliveryCommand struct {
OrderID string `json:"order_id" validate:"required"`
UserID string `json:"user_id" validate:"required"`
}
// 结果类型
type CreateOrderResult struct {
SagaID string `json:"saga_id"`
Message string `json:"message"`
}
事件处理器 #
订单事件处理器 #
// internal/application/event/order_event_handler.go
package event
import (
"context"
"encoding/json"
"github.com/ecommerce/order-service/internal/domain/event"
)
type OrderEventHandler struct {
notificationService NotificationService
inventoryService InventoryService
userService UserService
analyticsService AnalyticsService
}
func NewOrderEventHandler(
notificationService NotificationService,
inventoryService InventoryService,
userService UserService,
analyticsService AnalyticsService,
) *OrderEventHandler {
return &OrderEventHandler{
notificationService: notificationService,
inventoryService: inventoryService,
userService: userService,
analyticsService: analyticsService,
}
}
// 处理订单创建事件
func (h *OrderEventHandler) HandleOrderCreated(ctx context.Context, eventData []byte) error {
var event event.OrderCreatedEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return err
}
// 发送订单创建通知
notification := Notification{
UserID: event.UserID,
Type: NotificationTypeOrderCreated,
Title: "订单创建成功",
Content: fmt.Sprintf("您的订单 %s 已创建成功,金额:%s", event.AggregateID, formatMoney(event.FinalAmount)),
}
if err := h.notificationService.SendNotification(ctx, notification); err != nil {
log.Error("failed to send order created notification", err)
}
// 记录分析数据
analyticsData := AnalyticsData{
EventType: "order_created",
UserID: event.UserID,
OrderID: event.AggregateID,
Amount: event.FinalAmount.Amount,
Timestamp: event.OccurredAt,
}
if err := h.analyticsService.RecordEvent(ctx, analyticsData); err != nil {
log.Error("failed to record analytics data", err)
}
return nil
}
// 处理订单支付事件
func (h *OrderEventHandler) HandleOrderPaid(ctx context.Context, eventData []byte) error {
var event event.OrderPaidEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return err
}
// 发送支付成功通知
notification := Notification{
UserID: event.UserID,
Type: NotificationTypeOrderPaid,
Title: "支付成功",
Content: fmt.Sprintf("您的订单 %s 支付成功,我们将尽快为您发货", event.AggregateID),
}
if err := h.notificationService.SendNotification(ctx, notification); err != nil {
log.Error("failed to send order paid notification", err)
}
// 更新用户积分
points := calculatePoints(event.PaymentInfo.PaidAmount)
if err := h.userService.AddPoints(ctx, event.UserID, points); err != nil {
log.Error("failed to add user points", err)
}
return nil
}
// 处理订单发货事件
func (h *OrderEventHandler) HandleOrderShipped(ctx context.Context, eventData []byte) error {
var event event.OrderShippedEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return err
}
// 发送发货通知
notification := Notification{
UserID: event.UserID,
Type: NotificationTypeOrderShipped,
Title: "订单已发货",
Content: fmt.Sprintf("您的订单 %s 已发货,物流单号:%s", event.AggregateID, event.TrackingNumber),
}
if err := h.notificationService.SendNotification(ctx, notification); err != nil {
log.Error("failed to send order shipped notification", err)
}
return nil
}
// 处理订单取消事件
func (h *OrderEventHandler) HandleOrderCancelled(ctx context.Context, eventData []byte) error {
var event event.OrderCancelledEvent
if err := json.Unmarshal(eventData, &event); err != nil {
return err
}
// 发送取消通知
notification := Notification{
UserID: event.UserID,
Type: NotificationTypeOrderCancelled,
Title: "订单已取消",
Content: fmt.Sprintf("您的订单 %s 已取消,原因:%s", event.AggregateID, event.Reason),
}
if err := h.notificationService.SendNotification(ctx, notification); err != nil {
log.Error("failed to send order cancelled notification", err)
}
return nil
}
// 辅助方法
func formatMoney(money Money) string {
return fmt.Sprintf("%.2f %s", float64(money.Amount)/100, money.Currency)
}
func calculatePoints(amount Money) int {
// 每消费 1 元获得 1 积分
return int(amount.Amount / 100)
}
// 通知类型
type NotificationType int
const (
NotificationTypeOrderCreated NotificationType = iota + 1
NotificationTypeOrderPaid
NotificationTypeOrderShipped
NotificationTypeOrderDelivered
NotificationTypeOrderCancelled
NotificationTypeOrderRefunded
)
type Notification struct {
UserID string `json:"user_id"`
Type NotificationType `json:"type"`
Title string `json:"title"`
Content string `json:"content"`
}
type AnalyticsData struct {
EventType string `json:"event_type"`
UserID string `json:"user_id"`
OrderID string `json:"order_id"`
Amount int64 `json:"amount"`
Timestamp time.Time `json:"timestamp"`
}
通过本节的学习,我们完成了订单服务的核心功能实现,包括订单管理、状态流转、Saga 分布式事务处理和事件处理。下一节将进行系统集成和综合实战。