4.8.1 分布式任务调度器

4.8.1 分布式任务调度器 #

分布式任务调度器是企业级应用中的核心组件,负责在多个节点间分发和执行任务。本节将设计并实现一个功能完整的分布式任务调度系统。

系统架构设计 #

整体架构 #

分布式任务调度器采用主从架构,包含以下核心组件:

  • 调度器节点(Scheduler):负责任务分发和状态管理
  • 执行器节点(Executor):负责任务执行
  • 存储层:持久化任务信息和执行状态
  • 通信层:节点间通信协议
// pkg/scheduler/types.go
package scheduler

import (
    "context"
    "time"
)

// TaskStatus 任务状态枚举
type TaskStatus int

const (
    TaskPending TaskStatus = iota
    TaskRunning
    TaskCompleted
    TaskFailed
    TaskCancelled
)

// Task 任务定义
type Task struct {
    ID          string            `json:"id"`
    Name        string            `json:"name"`
    Command     string            `json:"command"`
    Args        []string          `json:"args"`
    Env         map[string]string `json:"env"`
    Timeout     time.Duration     `json:"timeout"`
    Retry       int               `json:"retry"`
    Status      TaskStatus        `json:"status"`
    CreatedAt   time.Time         `json:"created_at"`
    StartedAt   *time.Time        `json:"started_at,omitempty"`
    CompletedAt *time.Time        `json:"completed_at,omitempty"`
    Output      string            `json:"output"`
    Error       string            `json:"error"`
    ExecutorID  string            `json:"executor_id"`
}

// Executor 执行器信息
type Executor struct {
    ID         string            `json:"id"`
    Address    string            `json:"address"`
    Status     string            `json:"status"`
    Capacity   int               `json:"capacity"`
    Running    int               `json:"running"`
    LastSeen   time.Time         `json:"last_seen"`
    Labels     map[string]string `json:"labels"`
}

// SchedulerInterface 调度器接口
type SchedulerInterface interface {
    SubmitTask(ctx context.Context, task *Task) error
    GetTask(ctx context.Context, taskID string) (*Task, error)
    ListTasks(ctx context.Context, status TaskStatus) ([]*Task, error)
    CancelTask(ctx context.Context, taskID string) error
    RegisterExecutor(ctx context.Context, executor *Executor) error
    GetExecutors(ctx context.Context) ([]*Executor, error)
}

调度器核心实现 #

// pkg/scheduler/scheduler.go
package scheduler

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/google/uuid"
)

// Scheduler 调度器实现
type Scheduler struct {
    tasks     map[string]*Task
    executors map[string]*Executor
    taskQueue chan *Task
    mu        sync.RWMutex
    storage   Storage
    logger    *log.Logger
}

// Storage 存储接口
type Storage interface {
    SaveTask(task *Task) error
    LoadTask(taskID string) (*Task, error)
    UpdateTask(task *Task) error
    DeleteTask(taskID string) error
    ListTasks(status TaskStatus) ([]*Task, error)
}

// NewScheduler 创建新的调度器
func NewScheduler(storage Storage, logger *log.Logger) *Scheduler {
    return &Scheduler{
        tasks:     make(map[string]*Task),
        executors: make(map[string]*Executor),
        taskQueue: make(chan *Task, 1000),
        storage:   storage,
        logger:    logger,
    }
}

// Start 启动调度器
func (s *Scheduler) Start(ctx context.Context) error {
    // 启动任务分发协程
    go s.dispatchTasks(ctx)

    // 启动执行器健康检查
    go s.healthCheck(ctx)

    // 启动任务状态同步
    go s.syncTaskStatus(ctx)

    s.logger.Println("Scheduler started")
    return nil
}

// SubmitTask 提交任务
func (s *Scheduler) SubmitTask(ctx context.Context, task *Task) error {
    if task.ID == "" {
        task.ID = uuid.New().String()
    }

    task.Status = TaskPending
    task.CreatedAt = time.Now()

    s.mu.Lock()
    s.tasks[task.ID] = task
    s.mu.Unlock()

    // 保存到存储
    if err := s.storage.SaveTask(task); err != nil {
        return fmt.Errorf("failed to save task: %w", err)
    }

    // 加入调度队列
    select {
    case s.taskQueue <- task:
        s.logger.Printf("Task %s submitted", task.ID)
        return nil
    case <-ctx.Done():
        return ctx.Err()
    default:
        return fmt.Errorf("task queue is full")
    }
}

// dispatchTasks 任务分发
func (s *Scheduler) dispatchTasks(ctx context.Context) {
    for {
        select {
        case task := <-s.taskQueue:
            if err := s.assignTask(ctx, task); err != nil {
                s.logger.Printf("Failed to assign task %s: %v", task.ID, err)
                // 重新入队或标记失败
                s.handleTaskFailure(task, err)
            }
        case <-ctx.Done():
            return
        }
    }
}

// assignTask 分配任务到执行器
func (s *Scheduler) assignTask(ctx context.Context, task *Task) error {
    s.mu.RLock()
    executor := s.selectExecutor()
    s.mu.RUnlock()

    if executor == nil {
        return fmt.Errorf("no available executor")
    }

    // 发送任务到执行器
    client := NewExecutorClient(executor.Address)
    if err := client.ExecuteTask(ctx, task); err != nil {
        return fmt.Errorf("failed to send task to executor: %w", err)
    }

    // 更新任务状态
    task.Status = TaskRunning
    task.ExecutorID = executor.ID
    now := time.Now()
    task.StartedAt = &now

    s.mu.Lock()
    s.tasks[task.ID] = task
    s.mu.Unlock()

    // 更新执行器状态
    executor.Running++

    return s.storage.UpdateTask(task)
}

// selectExecutor 选择执行器
func (s *Scheduler) selectExecutor() *Executor {
    var selected *Executor
    minLoad := float64(1.0)

    for _, executor := range s.executors {
        if executor.Status != "active" {
            continue
        }

        load := float64(executor.Running) / float64(executor.Capacity)
        if load < minLoad {
            minLoad = load
            selected = executor
        }
    }

    return selected
}

执行器实现 #

// pkg/executor/executor.go
package executor

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "os"
    "os/exec"
    "sync"
    "time"

    "github.com/gin-gonic/gin"
    "your-project/pkg/scheduler"
)

// Executor 执行器
type Executor struct {
    id         string
    address    string
    capacity   int
    running    int
    tasks      map[string]*TaskExecution
    mu         sync.RWMutex
    logger     *log.Logger
    scheduler  string // 调度器地址
}

// TaskExecution 任务执行上下文
type TaskExecution struct {
    Task      *scheduler.Task
    Process   *os.Process
    StartTime time.Time
    Cancel    context.CancelFunc
}

// NewExecutor 创建执行器
func NewExecutor(id, address string, capacity int, schedulerAddr string) *Executor {
    return &Executor{
        id:        id,
        address:   address,
        capacity:  capacity,
        tasks:     make(map[string]*TaskExecution),
        logger:    log.New(os.Stdout, "[EXECUTOR] ", log.LstdFlags),
        scheduler: schedulerAddr,
    }
}

// Start 启动执行器
func (e *Executor) Start(ctx context.Context) error {
    // 注册到调度器
    if err := e.registerToScheduler(ctx); err != nil {
        return fmt.Errorf("failed to register to scheduler: %w", err)
    }

    // 启动HTTP服务
    router := gin.Default()
    e.setupRoutes(router)

    server := &http.Server{
        Addr:    e.address,
        Handler: router,
    }

    // 启动心跳
    go e.heartbeat(ctx)

    go func() {
        if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            e.logger.Printf("HTTP server error: %v", err)
        }
    }()

    e.logger.Printf("Executor started on %s", e.address)

    <-ctx.Done()
    return server.Shutdown(context.Background())
}

// setupRoutes 设置路由
func (e *Executor) setupRoutes(router *gin.Engine) {
    router.POST("/execute", e.handleExecuteTask)
    router.POST("/cancel/:taskId", e.handleCancelTask)
    router.GET("/status", e.handleStatus)
    router.GET("/health", e.handleHealth)
}

// handleExecuteTask 处理任务执行请求
func (e *Executor) handleExecuteTask(c *gin.Context) {
    var task scheduler.Task
    if err := c.ShouldBindJSON(&task); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    if err := e.executeTask(c.Request.Context(), &task); err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
        return
    }

    c.JSON(http.StatusOK, gin.H{"status": "accepted"})
}

// executeTask 执行任务
func (e *Executor) executeTask(ctx context.Context, task *scheduler.Task) error {
    e.mu.Lock()
    if e.running >= e.capacity {
        e.mu.Unlock()
        return fmt.Errorf("executor at capacity")
    }
    e.running++
    e.mu.Unlock()

    // 创建执行上下文
    execCtx, cancel := context.WithTimeout(ctx, task.Timeout)
    execution := &TaskExecution{
        Task:      task,
        StartTime: time.Now(),
        Cancel:    cancel,
    }

    e.mu.Lock()
    e.tasks[task.ID] = execution
    e.mu.Unlock()

    // 异步执行任务
    go func() {
        defer func() {
            e.mu.Lock()
            delete(e.tasks, task.ID)
            e.running--
            e.mu.Unlock()
            cancel()
        }()

        e.runTask(execCtx, execution)
    }()

    return nil
}

// runTask 运行任务
func (e *Executor) runTask(ctx context.Context, execution *TaskExecution) {
    task := execution.Task

    // 创建命令
    cmd := exec.CommandContext(ctx, task.Command, task.Args...)

    // 设置环境变量
    cmd.Env = os.Environ()
    for k, v := range task.Env {
        cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v))
    }

    // 执行命令
    output, err := cmd.CombinedOutput()

    // 更新任务状态
    now := time.Now()
    task.CompletedAt = &now
    task.Output = string(output)

    if err != nil {
        task.Status = scheduler.TaskFailed
        task.Error = err.Error()
        e.logger.Printf("Task %s failed: %v", task.ID, err)
    } else {
        task.Status = scheduler.TaskCompleted
        e.logger.Printf("Task %s completed", task.ID)
    }

    // 报告结果给调度器
    e.reportTaskResult(task)
}

// reportTaskResult 报告任务结果
func (e *Executor) reportTaskResult(task *scheduler.Task) {
    client := NewSchedulerClient(e.scheduler)
    if err := client.UpdateTaskStatus(context.Background(), task); err != nil {
        e.logger.Printf("Failed to report task result: %v", err)
    }
}

通信客户端实现 #

// pkg/client/executor_client.go
package client

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "time"

    "your-project/pkg/scheduler"
)

// ExecutorClient 执行器客户端
type ExecutorClient struct {
    address string
    client  *http.Client
}

// NewExecutorClient 创建执行器客户端
func NewExecutorClient(address string) *ExecutorClient {
    return &ExecutorClient{
        address: address,
        client: &http.Client{
            Timeout: 30 * time.Second,
        },
    }
}

// ExecuteTask 发送任务执行请求
func (c *ExecutorClient) ExecuteTask(ctx context.Context, task *scheduler.Task) error {
    data, err := json.Marshal(task)
    if err != nil {
        return fmt.Errorf("failed to marshal task: %w", err)
    }

    req, err := http.NewRequestWithContext(ctx, "POST",
        fmt.Sprintf("http://%s/execute", c.address),
        bytes.NewBuffer(data))
    if err != nil {
        return fmt.Errorf("failed to create request: %w", err)
    }

    req.Header.Set("Content-Type", "application/json")

    resp, err := c.client.Do(req)
    if err != nil {
        return fmt.Errorf("failed to send request: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        return fmt.Errorf("executor returned status: %d", resp.StatusCode)
    }

    return nil
}

// CancelTask 取消任务
func (c *ExecutorClient) CancelTask(ctx context.Context, taskID string) error {
    req, err := http.NewRequestWithContext(ctx, "POST",
        fmt.Sprintf("http://%s/cancel/%s", c.address, taskID), nil)
    if err != nil {
        return fmt.Errorf("failed to create request: %w", err)
    }

    resp, err := c.client.Do(req)
    if err != nil {
        return fmt.Errorf("failed to send request: %w", err)
    }
    defer resp.Body.Close()

    return nil
}

存储层实现 #

// pkg/storage/memory.go
package storage

import (
    "fmt"
    "sync"

    "your-project/pkg/scheduler"
)

// MemoryStorage 内存存储实现
type MemoryStorage struct {
    tasks map[string]*scheduler.Task
    mu    sync.RWMutex
}

// NewMemoryStorage 创建内存存储
func NewMemoryStorage() *MemoryStorage {
    return &MemoryStorage{
        tasks: make(map[string]*scheduler.Task),
    }
}

// SaveTask 保存任务
func (s *MemoryStorage) SaveTask(task *scheduler.Task) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    s.tasks[task.ID] = task
    return nil
}

// LoadTask 加载任务
func (s *MemoryStorage) LoadTask(taskID string) (*scheduler.Task, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    task, exists := s.tasks[taskID]
    if !exists {
        return nil, fmt.Errorf("task not found: %s", taskID)
    }

    return task, nil
}

// UpdateTask 更新任务
func (s *MemoryStorage) UpdateTask(task *scheduler.Task) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    if _, exists := s.tasks[task.ID]; !exists {
        return fmt.Errorf("task not found: %s", task.ID)
    }

    s.tasks[task.ID] = task
    return nil
}

// ListTasks 列出任务
func (s *MemoryStorage) ListTasks(status scheduler.TaskStatus) ([]*scheduler.Task, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    var result []*scheduler.Task
    for _, task := range s.tasks {
        if task.Status == status {
            result = append(result, task)
        }
    }

    return result, nil
}

主程序示例 #

// cmd/scheduler/main.go
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "your-project/pkg/scheduler"
    "your-project/pkg/storage"
)

func main() {
    // 创建存储
    store := storage.NewMemoryStorage()

    // 创建调度器
    logger := log.New(os.Stdout, "[SCHEDULER] ", log.LstdFlags)
    sched := scheduler.NewScheduler(store, logger)

    // 创建上下文
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 启动调度器
    if err := sched.Start(ctx); err != nil {
        log.Fatalf("Failed to start scheduler: %v", err)
    }

    // 提交测试任务
    task := &scheduler.Task{
        Name:    "test-task",
        Command: "echo",
        Args:    []string{"Hello, World!"},
        Timeout: 30 * time.Second,
        Retry:   3,
    }

    if err := sched.SubmitTask(ctx, task); err != nil {
        log.Printf("Failed to submit task: %v", err)
    }

    // 等待信号
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    <-sigChan
    log.Println("Shutting down scheduler...")
}

功能扩展 #

任务依赖管理 #

// TaskDependency 任务依赖
type TaskDependency struct {
    TaskID       string   `json:"task_id"`
    Dependencies []string `json:"dependencies"`
}

// 在调度器中添加依赖检查
func (s *Scheduler) checkDependencies(task *Task) bool {
    // 检查所有依赖任务是否完成
    for _, depID := range task.Dependencies {
        depTask, exists := s.tasks[depID]
        if !exists || depTask.Status != TaskCompleted {
            return false
        }
    }
    return true
}

任务重试机制 #

// 在任务失败时实现重试
func (s *Scheduler) handleTaskFailure(task *Task, err error) {
    if task.Retry > 0 {
        task.Retry--
        task.Status = TaskPending
        task.Error = err.Error()

        // 延迟重试
        go func() {
            time.Sleep(time.Minute)
            select {
            case s.taskQueue <- task:
                s.logger.Printf("Task %s retrying", task.ID)
            default:
                s.logger.Printf("Failed to retry task %s", task.ID)
            }
        }()
    } else {
        task.Status = TaskFailed
        task.Error = err.Error()
        s.storage.UpdateTask(task)
    }
}

部署和运维 #

Docker 化部署 #

# Dockerfile.scheduler
FROM golang:1.21-alpine AS builder

WORKDIR /app
COPY . .
RUN go mod download
RUN go build -o scheduler cmd/scheduler/main.go

FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=builder /app/scheduler .
CMD ["./scheduler"]

监控指标 #

// 添加Prometheus监控
import "github.com/prometheus/client_golang/prometheus"

var (
    tasksTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "scheduler_tasks_total",
            Help: "Total number of tasks",
        },
        []string{"status"},
    )

    executorsActive = prometheus.NewGauge(
        prometheus.GaugeOpts{
            Name: "scheduler_executors_active",
            Help: "Number of active executors",
        },
    )
)

通过本节的学习,我们实现了一个功能完整的分布式任务调度器,涵盖了任务分发、执行监控、故障恢复等核心功能。这个系统展示了如何运用 Go 语言的并发特性和网络编程能力来构建复杂的分布式系统。