4.8.1 分布式任务调度器 #
分布式任务调度器是企业级应用中的核心组件,负责在多个节点间分发和执行任务。本节将设计并实现一个功能完整的分布式任务调度系统。
系统架构设计 #
整体架构 #
分布式任务调度器采用主从架构,包含以下核心组件:
- 调度器节点(Scheduler):负责任务分发和状态管理
- 执行器节点(Executor):负责任务执行
- 存储层:持久化任务信息和执行状态
- 通信层:节点间通信协议
// pkg/scheduler/types.go
package scheduler
import (
"context"
"time"
)
// TaskStatus 任务状态枚举
type TaskStatus int
const (
TaskPending TaskStatus = iota
TaskRunning
TaskCompleted
TaskFailed
TaskCancelled
)
// Task 任务定义
type Task struct {
ID string `json:"id"`
Name string `json:"name"`
Command string `json:"command"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
Timeout time.Duration `json:"timeout"`
Retry int `json:"retry"`
Status TaskStatus `json:"status"`
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Output string `json:"output"`
Error string `json:"error"`
ExecutorID string `json:"executor_id"`
}
// Executor 执行器信息
type Executor struct {
ID string `json:"id"`
Address string `json:"address"`
Status string `json:"status"`
Capacity int `json:"capacity"`
Running int `json:"running"`
LastSeen time.Time `json:"last_seen"`
Labels map[string]string `json:"labels"`
}
// SchedulerInterface 调度器接口
type SchedulerInterface interface {
SubmitTask(ctx context.Context, task *Task) error
GetTask(ctx context.Context, taskID string) (*Task, error)
ListTasks(ctx context.Context, status TaskStatus) ([]*Task, error)
CancelTask(ctx context.Context, taskID string) error
RegisterExecutor(ctx context.Context, executor *Executor) error
GetExecutors(ctx context.Context) ([]*Executor, error)
}
调度器核心实现 #
// pkg/scheduler/scheduler.go
package scheduler
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/google/uuid"
)
// Scheduler 调度器实现
type Scheduler struct {
tasks map[string]*Task
executors map[string]*Executor
taskQueue chan *Task
mu sync.RWMutex
storage Storage
logger *log.Logger
}
// Storage 存储接口
type Storage interface {
SaveTask(task *Task) error
LoadTask(taskID string) (*Task, error)
UpdateTask(task *Task) error
DeleteTask(taskID string) error
ListTasks(status TaskStatus) ([]*Task, error)
}
// NewScheduler 创建新的调度器
func NewScheduler(storage Storage, logger *log.Logger) *Scheduler {
return &Scheduler{
tasks: make(map[string]*Task),
executors: make(map[string]*Executor),
taskQueue: make(chan *Task, 1000),
storage: storage,
logger: logger,
}
}
// Start 启动调度器
func (s *Scheduler) Start(ctx context.Context) error {
// 启动任务分发协程
go s.dispatchTasks(ctx)
// 启动执行器健康检查
go s.healthCheck(ctx)
// 启动任务状态同步
go s.syncTaskStatus(ctx)
s.logger.Println("Scheduler started")
return nil
}
// SubmitTask 提交任务
func (s *Scheduler) SubmitTask(ctx context.Context, task *Task) error {
if task.ID == "" {
task.ID = uuid.New().String()
}
task.Status = TaskPending
task.CreatedAt = time.Now()
s.mu.Lock()
s.tasks[task.ID] = task
s.mu.Unlock()
// 保存到存储
if err := s.storage.SaveTask(task); err != nil {
return fmt.Errorf("failed to save task: %w", err)
}
// 加入调度队列
select {
case s.taskQueue <- task:
s.logger.Printf("Task %s submitted", task.ID)
return nil
case <-ctx.Done():
return ctx.Err()
default:
return fmt.Errorf("task queue is full")
}
}
// dispatchTasks 任务分发
func (s *Scheduler) dispatchTasks(ctx context.Context) {
for {
select {
case task := <-s.taskQueue:
if err := s.assignTask(ctx, task); err != nil {
s.logger.Printf("Failed to assign task %s: %v", task.ID, err)
// 重新入队或标记失败
s.handleTaskFailure(task, err)
}
case <-ctx.Done():
return
}
}
}
// assignTask 分配任务到执行器
func (s *Scheduler) assignTask(ctx context.Context, task *Task) error {
s.mu.RLock()
executor := s.selectExecutor()
s.mu.RUnlock()
if executor == nil {
return fmt.Errorf("no available executor")
}
// 发送任务到执行器
client := NewExecutorClient(executor.Address)
if err := client.ExecuteTask(ctx, task); err != nil {
return fmt.Errorf("failed to send task to executor: %w", err)
}
// 更新任务状态
task.Status = TaskRunning
task.ExecutorID = executor.ID
now := time.Now()
task.StartedAt = &now
s.mu.Lock()
s.tasks[task.ID] = task
s.mu.Unlock()
// 更新执行器状态
executor.Running++
return s.storage.UpdateTask(task)
}
// selectExecutor 选择执行器
func (s *Scheduler) selectExecutor() *Executor {
var selected *Executor
minLoad := float64(1.0)
for _, executor := range s.executors {
if executor.Status != "active" {
continue
}
load := float64(executor.Running) / float64(executor.Capacity)
if load < minLoad {
minLoad = load
selected = executor
}
}
return selected
}
执行器实现 #
// pkg/executor/executor.go
package executor
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/exec"
"sync"
"time"
"github.com/gin-gonic/gin"
"your-project/pkg/scheduler"
)
// Executor 执行器
type Executor struct {
id string
address string
capacity int
running int
tasks map[string]*TaskExecution
mu sync.RWMutex
logger *log.Logger
scheduler string // 调度器地址
}
// TaskExecution 任务执行上下文
type TaskExecution struct {
Task *scheduler.Task
Process *os.Process
StartTime time.Time
Cancel context.CancelFunc
}
// NewExecutor 创建执行器
func NewExecutor(id, address string, capacity int, schedulerAddr string) *Executor {
return &Executor{
id: id,
address: address,
capacity: capacity,
tasks: make(map[string]*TaskExecution),
logger: log.New(os.Stdout, "[EXECUTOR] ", log.LstdFlags),
scheduler: schedulerAddr,
}
}
// Start 启动执行器
func (e *Executor) Start(ctx context.Context) error {
// 注册到调度器
if err := e.registerToScheduler(ctx); err != nil {
return fmt.Errorf("failed to register to scheduler: %w", err)
}
// 启动HTTP服务
router := gin.Default()
e.setupRoutes(router)
server := &http.Server{
Addr: e.address,
Handler: router,
}
// 启动心跳
go e.heartbeat(ctx)
go func() {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
e.logger.Printf("HTTP server error: %v", err)
}
}()
e.logger.Printf("Executor started on %s", e.address)
<-ctx.Done()
return server.Shutdown(context.Background())
}
// setupRoutes 设置路由
func (e *Executor) setupRoutes(router *gin.Engine) {
router.POST("/execute", e.handleExecuteTask)
router.POST("/cancel/:taskId", e.handleCancelTask)
router.GET("/status", e.handleStatus)
router.GET("/health", e.handleHealth)
}
// handleExecuteTask 处理任务执行请求
func (e *Executor) handleExecuteTask(c *gin.Context) {
var task scheduler.Task
if err := c.ShouldBindJSON(&task); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if err := e.executeTask(c.Request.Context(), &task); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"status": "accepted"})
}
// executeTask 执行任务
func (e *Executor) executeTask(ctx context.Context, task *scheduler.Task) error {
e.mu.Lock()
if e.running >= e.capacity {
e.mu.Unlock()
return fmt.Errorf("executor at capacity")
}
e.running++
e.mu.Unlock()
// 创建执行上下文
execCtx, cancel := context.WithTimeout(ctx, task.Timeout)
execution := &TaskExecution{
Task: task,
StartTime: time.Now(),
Cancel: cancel,
}
e.mu.Lock()
e.tasks[task.ID] = execution
e.mu.Unlock()
// 异步执行任务
go func() {
defer func() {
e.mu.Lock()
delete(e.tasks, task.ID)
e.running--
e.mu.Unlock()
cancel()
}()
e.runTask(execCtx, execution)
}()
return nil
}
// runTask 运行任务
func (e *Executor) runTask(ctx context.Context, execution *TaskExecution) {
task := execution.Task
// 创建命令
cmd := exec.CommandContext(ctx, task.Command, task.Args...)
// 设置环境变量
cmd.Env = os.Environ()
for k, v := range task.Env {
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v))
}
// 执行命令
output, err := cmd.CombinedOutput()
// 更新任务状态
now := time.Now()
task.CompletedAt = &now
task.Output = string(output)
if err != nil {
task.Status = scheduler.TaskFailed
task.Error = err.Error()
e.logger.Printf("Task %s failed: %v", task.ID, err)
} else {
task.Status = scheduler.TaskCompleted
e.logger.Printf("Task %s completed", task.ID)
}
// 报告结果给调度器
e.reportTaskResult(task)
}
// reportTaskResult 报告任务结果
func (e *Executor) reportTaskResult(task *scheduler.Task) {
client := NewSchedulerClient(e.scheduler)
if err := client.UpdateTaskStatus(context.Background(), task); err != nil {
e.logger.Printf("Failed to report task result: %v", err)
}
}
通信客户端实现 #
// pkg/client/executor_client.go
package client
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"your-project/pkg/scheduler"
)
// ExecutorClient 执行器客户端
type ExecutorClient struct {
address string
client *http.Client
}
// NewExecutorClient 创建执行器客户端
func NewExecutorClient(address string) *ExecutorClient {
return &ExecutorClient{
address: address,
client: &http.Client{
Timeout: 30 * time.Second,
},
}
}
// ExecuteTask 发送任务执行请求
func (c *ExecutorClient) ExecuteTask(ctx context.Context, task *scheduler.Task) error {
data, err := json.Marshal(task)
if err != nil {
return fmt.Errorf("failed to marshal task: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "POST",
fmt.Sprintf("http://%s/execute", c.address),
bytes.NewBuffer(data))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("executor returned status: %d", resp.StatusCode)
}
return nil
}
// CancelTask 取消任务
func (c *ExecutorClient) CancelTask(ctx context.Context, taskID string) error {
req, err := http.NewRequestWithContext(ctx, "POST",
fmt.Sprintf("http://%s/cancel/%s", c.address, taskID), nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
resp, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
return nil
}
存储层实现 #
// pkg/storage/memory.go
package storage
import (
"fmt"
"sync"
"your-project/pkg/scheduler"
)
// MemoryStorage 内存存储实现
type MemoryStorage struct {
tasks map[string]*scheduler.Task
mu sync.RWMutex
}
// NewMemoryStorage 创建内存存储
func NewMemoryStorage() *MemoryStorage {
return &MemoryStorage{
tasks: make(map[string]*scheduler.Task),
}
}
// SaveTask 保存任务
func (s *MemoryStorage) SaveTask(task *scheduler.Task) error {
s.mu.Lock()
defer s.mu.Unlock()
s.tasks[task.ID] = task
return nil
}
// LoadTask 加载任务
func (s *MemoryStorage) LoadTask(taskID string) (*scheduler.Task, error) {
s.mu.RLock()
defer s.mu.RUnlock()
task, exists := s.tasks[taskID]
if !exists {
return nil, fmt.Errorf("task not found: %s", taskID)
}
return task, nil
}
// UpdateTask 更新任务
func (s *MemoryStorage) UpdateTask(task *scheduler.Task) error {
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.tasks[task.ID]; !exists {
return fmt.Errorf("task not found: %s", task.ID)
}
s.tasks[task.ID] = task
return nil
}
// ListTasks 列出任务
func (s *MemoryStorage) ListTasks(status scheduler.TaskStatus) ([]*scheduler.Task, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var result []*scheduler.Task
for _, task := range s.tasks {
if task.Status == status {
result = append(result, task)
}
}
return result, nil
}
主程序示例 #
// cmd/scheduler/main.go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"your-project/pkg/scheduler"
"your-project/pkg/storage"
)
func main() {
// 创建存储
store := storage.NewMemoryStorage()
// 创建调度器
logger := log.New(os.Stdout, "[SCHEDULER] ", log.LstdFlags)
sched := scheduler.NewScheduler(store, logger)
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 启动调度器
if err := sched.Start(ctx); err != nil {
log.Fatalf("Failed to start scheduler: %v", err)
}
// 提交测试任务
task := &scheduler.Task{
Name: "test-task",
Command: "echo",
Args: []string{"Hello, World!"},
Timeout: 30 * time.Second,
Retry: 3,
}
if err := sched.SubmitTask(ctx, task); err != nil {
log.Printf("Failed to submit task: %v", err)
}
// 等待信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down scheduler...")
}
功能扩展 #
任务依赖管理 #
// TaskDependency 任务依赖
type TaskDependency struct {
TaskID string `json:"task_id"`
Dependencies []string `json:"dependencies"`
}
// 在调度器中添加依赖检查
func (s *Scheduler) checkDependencies(task *Task) bool {
// 检查所有依赖任务是否完成
for _, depID := range task.Dependencies {
depTask, exists := s.tasks[depID]
if !exists || depTask.Status != TaskCompleted {
return false
}
}
return true
}
任务重试机制 #
// 在任务失败时实现重试
func (s *Scheduler) handleTaskFailure(task *Task, err error) {
if task.Retry > 0 {
task.Retry--
task.Status = TaskPending
task.Error = err.Error()
// 延迟重试
go func() {
time.Sleep(time.Minute)
select {
case s.taskQueue <- task:
s.logger.Printf("Task %s retrying", task.ID)
default:
s.logger.Printf("Failed to retry task %s", task.ID)
}
}()
} else {
task.Status = TaskFailed
task.Error = err.Error()
s.storage.UpdateTask(task)
}
}
部署和运维 #
Docker 化部署 #
# Dockerfile.scheduler
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY . .
RUN go mod download
RUN go build -o scheduler cmd/scheduler/main.go
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=builder /app/scheduler .
CMD ["./scheduler"]
监控指标 #
// 添加Prometheus监控
import "github.com/prometheus/client_golang/prometheus"
var (
tasksTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "scheduler_tasks_total",
Help: "Total number of tasks",
},
[]string{"status"},
)
executorsActive = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "scheduler_executors_active",
Help: "Number of active executors",
},
)
)
通过本节的学习,我们实现了一个功能完整的分布式任务调度器,涵盖了任务分发、执行监控、故障恢复等核心功能。这个系统展示了如何运用 Go 语言的并发特性和网络编程能力来构建复杂的分布式系统。