4.4.1 进程创建与管理

4.4.1 进程创建与管理 #

进程是操作系统中程序执行的基本单位。在 Go 语言中,我们可以通过 os/exec 包来创建和管理子进程。本节将详细介绍如何在 Go 程序中创建、监控和管理进程。

进程基础概念 #

进程的生命周期 #

进程从创建到结束经历以下几个阶段:

  1. 创建阶段:父进程调用系统调用创建子进程
  2. 运行阶段:进程获得 CPU 时间片执行代码
  3. 等待阶段:进程等待某个事件发生(如 I/O 完成)
  4. 终止阶段:进程执行完毕或被强制终止

进程标识符 #

每个进程都有唯一的进程标识符(PID),用于系统识别和管理:

package main

import (
    "fmt"
    "os"
)

func main() {
    // 获取当前进程的 PID
    pid := os.Getpid()
    fmt.Printf("当前进程 PID: %d\n", pid)

    // 获取父进程的 PID
    ppid := os.Getppid()
    fmt.Printf("父进程 PID: %d\n", ppid)

    // 获取进程组 ID
    pgid := os.Getpgrp()
    fmt.Printf("进程组 ID: %d\n", pgid)
}

使用 os/exec 包创建进程 #

基本进程创建 #

os/exec 包提供了创建和管理外部进程的功能:

package main

import (
    "fmt"
    "os/exec"
    "log"
)

func main() {
    // 创建一个简单的命令
    cmd := exec.Command("ls", "-l")

    // 执行命令并获取输出
    output, err := cmd.Output()
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("命令输出:\n%s", output)
}

进程创建的详细控制 #

对于需要更精细控制的场景,我们可以使用 Cmd 结构体的各种字段:

package main

import (
    "fmt"
    "os"
    "os/exec"
    "log"
    "bytes"
)

func main() {
    // 创建命令
    cmd := exec.Command("grep", "error")

    // 设置标准输入
    cmd.Stdin = bytes.NewBufferString("info: 正常信息\nerror: 错误信息\nwarning: 警告信息\n")

    // 设置标准输出和错误输出
    var stdout, stderr bytes.Buffer
    cmd.Stdout = &stdout
    cmd.Stderr = &stderr

    // 设置工作目录
    cmd.Dir = "/tmp"

    // 设置环境变量
    cmd.Env = append(os.Environ(), "CUSTOM_VAR=custom_value")

    // 执行命令
    err := cmd.Run()
    if err != nil {
        log.Printf("命令执行失败: %v", err)
        log.Printf("错误输出: %s", stderr.String())
        return
    }

    fmt.Printf("标准输出: %s", stdout.String())
}

异步进程执行 #

对于长时间运行的进程,我们可以使用异步执行:

package main

import (
    "fmt"
    "os/exec"
    "log"
    "time"
)

func main() {
    // 创建一个长时间运行的命令
    cmd := exec.Command("sleep", "5")

    // 启动进程但不等待完成
    err := cmd.Start()
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("进程已启动,PID: %d\n", cmd.Process.Pid)

    // 在进程运行时做其他工作
    for i := 0; i < 5; i++ {
        fmt.Printf("主进程工作中... %d\n", i+1)
        time.Sleep(1 * time.Second)
    }

    // 等待进程完成
    err = cmd.Wait()
    if err != nil {
        log.Printf("进程执行失败: %v", err)
    } else {
        fmt.Println("进程执行完成")
    }
}

进程状态管理 #

进程状态监控 #

我们可以监控进程的执行状态和资源使用情况:

package main

import (
    "fmt"
    "os/exec"
    "log"
    "time"
    "syscall"
)

func main() {
    // 创建一个计算密集型命令
    cmd := exec.Command("dd", "if=/dev/zero", "of=/dev/null", "bs=1M", "count=100")

    // 启动进程
    err := cmd.Start()
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("进程启动,PID: %d\n", cmd.Process.Pid)

    // 监控进程状态
    go func() {
        for {
            // 检查进程是否还在运行
            process, err := os.FindProcess(cmd.Process.Pid)
            if err != nil {
                fmt.Printf("无法找到进程: %v\n", err)
                return
            }

            // 尝试发送信号 0 来检查进程状态
            err = process.Signal(syscall.Signal(0))
            if err != nil {
                fmt.Printf("进程已结束\n")
                return
            }

            fmt.Printf("进程 %d 仍在运行\n", cmd.Process.Pid)
            time.Sleep(1 * time.Second)
        }
    }()

    // 等待进程完成
    err = cmd.Wait()
    if err != nil {
        log.Printf("进程执行失败: %v", err)
    }

    // 获取进程资源使用情况
    if cmd.ProcessState != nil {
        fmt.Printf("进程退出状态: %v\n", cmd.ProcessState.ExitCode())
        fmt.Printf("用户态 CPU 时间: %v\n", cmd.ProcessState.UserTime())
        fmt.Printf("系统态 CPU 时间: %v\n", cmd.ProcessState.SystemTime())
    }
}

进程超时控制 #

使用 context 包可以实现进程超时控制:

package main

import (
    "context"
    "fmt"
    "os/exec"
    "log"
    "time"
)

func runCommandWithTimeout(timeout time.Duration, name string, args ...string) error {
    // 创建带超时的上下文
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    // 使用上下文创建命令
    cmd := exec.CommandContext(ctx, name, args...)

    fmt.Printf("执行命令: %s %v (超时: %v)\n", name, args, timeout)

    // 执行命令
    err := cmd.Run()

    // 检查是否因超时而失败
    if ctx.Err() == context.DeadlineExceeded {
        return fmt.Errorf("命令执行超时")
    }

    return err
}

func main() {
    // 测试正常执行的命令
    err := runCommandWithTimeout(2*time.Second, "echo", "Hello, World!")
    if err != nil {
        log.Printf("命令执行失败: %v", err)
    } else {
        fmt.Println("命令执行成功")
    }

    // 测试超时的命令
    err = runCommandWithTimeout(2*time.Second, "sleep", "5")
    if err != nil {
        log.Printf("命令执行失败: %v", err)
    } else {
        fmt.Println("命令执行成功")
    }
}

进程信号处理 #

发送信号给进程 #

我们可以向进程发送各种信号来控制其行为:

package main

import (
    "fmt"
    "os"
    "os/exec"
    "os/signal"
    "syscall"
    "time"
    "log"
)

func main() {
    // 创建一个长时间运行的进程
    cmd := exec.Command("sleep", "30")
    err := cmd.Start()
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("启动进程,PID: %d\n", cmd.Process.Pid)

    // 设置信号处理
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        // 等待 5 秒后发送 SIGTERM 信号
        time.Sleep(5 * time.Second)
        fmt.Println("发送 SIGTERM 信号给子进程")
        err := cmd.Process.Signal(syscall.SIGTERM)
        if err != nil {
            log.Printf("发送信号失败: %v", err)
        }
    }()

    // 监听信号
    go func() {
        sig := <-sigChan
        fmt.Printf("接收到信号: %v\n", sig)

        // 终止子进程
        if cmd.Process != nil {
            fmt.Println("终止子进程")
            cmd.Process.Kill()
        }

        os.Exit(0)
    }()

    // 等待进程完成
    err = cmd.Wait()
    if err != nil {
        fmt.Printf("进程被终止: %v\n", err)
    } else {
        fmt.Println("进程正常结束")
    }
}

优雅关闭进程 #

实现进程的优雅关闭机制:

package main

import (
    "fmt"
    "os"
    "os/exec"
    "syscall"
    "time"
    "log"
)

// 优雅关闭进程
func gracefulShutdown(cmd *exec.Cmd, timeout time.Duration) error {
    if cmd.Process == nil {
        return fmt.Errorf("进程未启动")
    }

    // 首先发送 SIGTERM 信号
    fmt.Printf("发送 SIGTERM 信号给进程 %d\n", cmd.Process.Pid)
    err := cmd.Process.Signal(syscall.SIGTERM)
    if err != nil {
        return fmt.Errorf("发送 SIGTERM 失败: %v", err)
    }

    // 等待进程优雅退出
    done := make(chan error, 1)
    go func() {
        done <- cmd.Wait()
    }()

    select {
    case err := <-done:
        fmt.Println("进程优雅退出")
        return err
    case <-time.After(timeout):
        // 超时后强制杀死进程
        fmt.Printf("优雅退出超时,强制杀死进程 %d\n", cmd.Process.Pid)
        err := cmd.Process.Kill()
        if err != nil {
            return fmt.Errorf("强制杀死进程失败: %v", err)
        }
        <-done // 等待 Wait() 返回
        return fmt.Errorf("进程被强制终止")
    }
}

func main() {
    // 创建一个可以处理信号的脚本
    script := `#!/bin/bash
trap 'echo "收到 SIGTERM,正在清理..."; sleep 2; echo "清理完成,退出"; exit 0' TERM
echo "脚本开始运行,PID: $$"
while true; do
    echo "工作中..."
    sleep 1
done`

    // 写入临时脚本文件
    scriptFile := "/tmp/test_script.sh"
    err := os.WriteFile(scriptFile, []byte(script), 0755)
    if err != nil {
        log.Fatal(err)
    }
    defer os.Remove(scriptFile)

    // 执行脚本
    cmd := exec.Command("bash", scriptFile)
    cmd.Stdout = os.Stdout
    cmd.Stderr = os.Stderr

    err = cmd.Start()
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("脚本启动,PID: %d\n", cmd.Process.Pid)

    // 让脚本运行一段时间
    time.Sleep(3 * time.Second)

    // 优雅关闭进程
    err = gracefulShutdown(cmd, 5*time.Second)
    if err != nil {
        log.Printf("关闭进程时出错: %v", err)
    }
}

进程池管理 #

对于需要管理多个进程的场景,我们可以实现一个简单的进程池:

package main

import (
    "fmt"
    "os/exec"
    "sync"
    "time"
    "log"
    "context"
)

// ProcessPool 进程池
type ProcessPool struct {
    maxWorkers int
    jobs       chan Job
    wg         sync.WaitGroup
    ctx        context.Context
    cancel     context.CancelFunc
}

// Job 表示一个要执行的任务
type Job struct {
    Command string
    Args    []string
    ID      int
}

// NewProcessPool 创建新的进程池
func NewProcessPool(maxWorkers int) *ProcessPool {
    ctx, cancel := context.WithCancel(context.Background())
    return &ProcessPool{
        maxWorkers: maxWorkers,
        jobs:       make(chan Job, maxWorkers*2),
        ctx:        ctx,
        cancel:     cancel,
    }
}

// Start 启动进程池
func (p *ProcessPool) Start() {
    for i := 0; i < p.maxWorkers; i++ {
        p.wg.Add(1)
        go p.worker(i)
    }
}

// worker 工作协程
func (p *ProcessPool) worker(id int) {
    defer p.wg.Done()

    fmt.Printf("Worker %d 启动\n", id)

    for {
        select {
        case job := <-p.jobs:
            fmt.Printf("Worker %d 处理任务 %d: %s %v\n", id, job.ID, job.Command, job.Args)

            // 创建带超时的命令
            ctx, cancel := context.WithTimeout(p.ctx, 10*time.Second)
            cmd := exec.CommandContext(ctx, job.Command, job.Args...)

            // 执行命令
            output, err := cmd.Output()
            if err != nil {
                log.Printf("Worker %d 任务 %d 执行失败: %v", id, job.ID, err)
            } else {
                fmt.Printf("Worker %d 任务 %d 完成,输出长度: %d\n", id, job.ID, len(output))
            }

            cancel()

        case <-p.ctx.Done():
            fmt.Printf("Worker %d 退出\n", id)
            return
        }
    }
}

// Submit 提交任务
func (p *ProcessPool) Submit(job Job) {
    select {
    case p.jobs <- job:
    case <-p.ctx.Done():
        log.Printf("进程池已关闭,无法提交任务 %d", job.ID)
    }
}

// Shutdown 关闭进程池
func (p *ProcessPool) Shutdown() {
    close(p.jobs)
    p.cancel()
    p.wg.Wait()
    fmt.Println("进程池已关闭")
}

func main() {
    // 创建进程池
    pool := NewProcessPool(3)
    pool.Start()

    // 提交任务
    jobs := []Job{
        {Command: "echo", Args: []string{"Hello from job 1"}, ID: 1},
        {Command: "sleep", Args: []string{"2"}, ID: 2},
        {Command: "ls", Args: []string{"-l", "/tmp"}, ID: 3},
        {Command: "echo", Args: []string{"Hello from job 4"}, ID: 4},
        {Command: "date", Args: []string{}, ID: 5},
    }

    for _, job := range jobs {
        pool.Submit(job)
    }

    // 等待一段时间让任务完成
    time.Sleep(5 * time.Second)

    // 关闭进程池
    pool.Shutdown()
}

实践练习 #

练习 1:进程监控工具 #

创建一个简单的进程监控工具:

package main

import (
    "fmt"
    "os"
    "os/exec"
    "time"
    "log"
    "strconv"
    "strings"
)

// ProcessInfo 进程信息
type ProcessInfo struct {
    PID     int
    Command string
    CPU     float64
    Memory  float64
}

// getProcessInfo 获取进程信息
func getProcessInfo(pid int) (*ProcessInfo, error) {
    // 使用 ps 命令获取进程信息
    cmd := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pid,comm,%cpu,%mem", "--no-headers")
    output, err := cmd.Output()
    if err != nil {
        return nil, err
    }

    fields := strings.Fields(string(output))
    if len(fields) < 4 {
        return nil, fmt.Errorf("无法解析进程信息")
    }

    cpu, _ := strconv.ParseFloat(fields[2], 64)
    mem, _ := strconv.ParseFloat(fields[3], 64)

    return &ProcessInfo{
        PID:     pid,
        Command: fields[1],
        CPU:     cpu,
        Memory:  mem,
    }, nil
}

// monitorProcess 监控进程
func monitorProcess(pid int, interval time.Duration) {
    fmt.Printf("开始监控进程 %d\n", pid)

    for {
        info, err := getProcessInfo(pid)
        if err != nil {
            log.Printf("获取进程信息失败: %v", err)
            break
        }

        fmt.Printf("PID: %d, 命令: %s, CPU: %.1f%%, 内存: %.1f%%\n",
            info.PID, info.Command, info.CPU, info.Memory)

        time.Sleep(interval)
    }
}

func main() {
    if len(os.Args) < 2 {
        fmt.Println("用法: go run monitor.go <PID>")
        return
    }

    pid, err := strconv.Atoi(os.Args[1])
    if err != nil {
        log.Fatal("无效的 PID")
    }

    monitorProcess(pid, 2*time.Second)
}

总结 #

本节介绍了 Go 语言中进程创建与管理的核心概念和实践方法:

  1. 进程基础:了解了进程的生命周期和标识符
  2. 进程创建:学会使用 os/exec 包创建和控制进程
  3. 状态管理:掌握了进程状态监控和超时控制
  4. 信号处理:学会了进程间的信号通信
  5. 进程池:实现了多进程的管理和调度

这些技能为构建复杂的系统级应用程序奠定了基础。在下一节中,我们将学习进程间通信的各种机制。