4.4.1 进程创建与管理 #
进程是操作系统中程序执行的基本单位。在 Go 语言中,我们可以通过 os/exec
包来创建和管理子进程。本节将详细介绍如何在 Go 程序中创建、监控和管理进程。
进程基础概念 #
进程的生命周期 #
进程从创建到结束经历以下几个阶段:
- 创建阶段:父进程调用系统调用创建子进程
- 运行阶段:进程获得 CPU 时间片执行代码
- 等待阶段:进程等待某个事件发生(如 I/O 完成)
- 终止阶段:进程执行完毕或被强制终止
进程标识符 #
每个进程都有唯一的进程标识符(PID),用于系统识别和管理:
package main
import (
"fmt"
"os"
)
func main() {
// 获取当前进程的 PID
pid := os.Getpid()
fmt.Printf("当前进程 PID: %d\n", pid)
// 获取父进程的 PID
ppid := os.Getppid()
fmt.Printf("父进程 PID: %d\n", ppid)
// 获取进程组 ID
pgid := os.Getpgrp()
fmt.Printf("进程组 ID: %d\n", pgid)
}
使用 os/exec 包创建进程 #
基本进程创建 #
os/exec
包提供了创建和管理外部进程的功能:
package main
import (
"fmt"
"os/exec"
"log"
)
func main() {
// 创建一个简单的命令
cmd := exec.Command("ls", "-l")
// 执行命令并获取输出
output, err := cmd.Output()
if err != nil {
log.Fatal(err)
}
fmt.Printf("命令输出:\n%s", output)
}
进程创建的详细控制 #
对于需要更精细控制的场景,我们可以使用 Cmd
结构体的各种字段:
package main
import (
"fmt"
"os"
"os/exec"
"log"
"bytes"
)
func main() {
// 创建命令
cmd := exec.Command("grep", "error")
// 设置标准输入
cmd.Stdin = bytes.NewBufferString("info: 正常信息\nerror: 错误信息\nwarning: 警告信息\n")
// 设置标准输出和错误输出
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
// 设置工作目录
cmd.Dir = "/tmp"
// 设置环境变量
cmd.Env = append(os.Environ(), "CUSTOM_VAR=custom_value")
// 执行命令
err := cmd.Run()
if err != nil {
log.Printf("命令执行失败: %v", err)
log.Printf("错误输出: %s", stderr.String())
return
}
fmt.Printf("标准输出: %s", stdout.String())
}
异步进程执行 #
对于长时间运行的进程,我们可以使用异步执行:
package main
import (
"fmt"
"os/exec"
"log"
"time"
)
func main() {
// 创建一个长时间运行的命令
cmd := exec.Command("sleep", "5")
// 启动进程但不等待完成
err := cmd.Start()
if err != nil {
log.Fatal(err)
}
fmt.Printf("进程已启动,PID: %d\n", cmd.Process.Pid)
// 在进程运行时做其他工作
for i := 0; i < 5; i++ {
fmt.Printf("主进程工作中... %d\n", i+1)
time.Sleep(1 * time.Second)
}
// 等待进程完成
err = cmd.Wait()
if err != nil {
log.Printf("进程执行失败: %v", err)
} else {
fmt.Println("进程执行完成")
}
}
进程状态管理 #
进程状态监控 #
我们可以监控进程的执行状态和资源使用情况:
package main
import (
"fmt"
"os/exec"
"log"
"time"
"syscall"
)
func main() {
// 创建一个计算密集型命令
cmd := exec.Command("dd", "if=/dev/zero", "of=/dev/null", "bs=1M", "count=100")
// 启动进程
err := cmd.Start()
if err != nil {
log.Fatal(err)
}
fmt.Printf("进程启动,PID: %d\n", cmd.Process.Pid)
// 监控进程状态
go func() {
for {
// 检查进程是否还在运行
process, err := os.FindProcess(cmd.Process.Pid)
if err != nil {
fmt.Printf("无法找到进程: %v\n", err)
return
}
// 尝试发送信号 0 来检查进程状态
err = process.Signal(syscall.Signal(0))
if err != nil {
fmt.Printf("进程已结束\n")
return
}
fmt.Printf("进程 %d 仍在运行\n", cmd.Process.Pid)
time.Sleep(1 * time.Second)
}
}()
// 等待进程完成
err = cmd.Wait()
if err != nil {
log.Printf("进程执行失败: %v", err)
}
// 获取进程资源使用情况
if cmd.ProcessState != nil {
fmt.Printf("进程退出状态: %v\n", cmd.ProcessState.ExitCode())
fmt.Printf("用户态 CPU 时间: %v\n", cmd.ProcessState.UserTime())
fmt.Printf("系统态 CPU 时间: %v\n", cmd.ProcessState.SystemTime())
}
}
进程超时控制 #
使用 context
包可以实现进程超时控制:
package main
import (
"context"
"fmt"
"os/exec"
"log"
"time"
)
func runCommandWithTimeout(timeout time.Duration, name string, args ...string) error {
// 创建带超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// 使用上下文创建命令
cmd := exec.CommandContext(ctx, name, args...)
fmt.Printf("执行命令: %s %v (超时: %v)\n", name, args, timeout)
// 执行命令
err := cmd.Run()
// 检查是否因超时而失败
if ctx.Err() == context.DeadlineExceeded {
return fmt.Errorf("命令执行超时")
}
return err
}
func main() {
// 测试正常执行的命令
err := runCommandWithTimeout(2*time.Second, "echo", "Hello, World!")
if err != nil {
log.Printf("命令执行失败: %v", err)
} else {
fmt.Println("命令执行成功")
}
// 测试超时的命令
err = runCommandWithTimeout(2*time.Second, "sleep", "5")
if err != nil {
log.Printf("命令执行失败: %v", err)
} else {
fmt.Println("命令执行成功")
}
}
进程信号处理 #
发送信号给进程 #
我们可以向进程发送各种信号来控制其行为:
package main
import (
"fmt"
"os"
"os/exec"
"os/signal"
"syscall"
"time"
"log"
)
func main() {
// 创建一个长时间运行的进程
cmd := exec.Command("sleep", "30")
err := cmd.Start()
if err != nil {
log.Fatal(err)
}
fmt.Printf("启动进程,PID: %d\n", cmd.Process.Pid)
// 设置信号处理
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
// 等待 5 秒后发送 SIGTERM 信号
time.Sleep(5 * time.Second)
fmt.Println("发送 SIGTERM 信号给子进程")
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
log.Printf("发送信号失败: %v", err)
}
}()
// 监听信号
go func() {
sig := <-sigChan
fmt.Printf("接收到信号: %v\n", sig)
// 终止子进程
if cmd.Process != nil {
fmt.Println("终止子进程")
cmd.Process.Kill()
}
os.Exit(0)
}()
// 等待进程完成
err = cmd.Wait()
if err != nil {
fmt.Printf("进程被终止: %v\n", err)
} else {
fmt.Println("进程正常结束")
}
}
优雅关闭进程 #
实现进程的优雅关闭机制:
package main
import (
"fmt"
"os"
"os/exec"
"syscall"
"time"
"log"
)
// 优雅关闭进程
func gracefulShutdown(cmd *exec.Cmd, timeout time.Duration) error {
if cmd.Process == nil {
return fmt.Errorf("进程未启动")
}
// 首先发送 SIGTERM 信号
fmt.Printf("发送 SIGTERM 信号给进程 %d\n", cmd.Process.Pid)
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
return fmt.Errorf("发送 SIGTERM 失败: %v", err)
}
// 等待进程优雅退出
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case err := <-done:
fmt.Println("进程优雅退出")
return err
case <-time.After(timeout):
// 超时后强制杀死进程
fmt.Printf("优雅退出超时,强制杀死进程 %d\n", cmd.Process.Pid)
err := cmd.Process.Kill()
if err != nil {
return fmt.Errorf("强制杀死进程失败: %v", err)
}
<-done // 等待 Wait() 返回
return fmt.Errorf("进程被强制终止")
}
}
func main() {
// 创建一个可以处理信号的脚本
script := `#!/bin/bash
trap 'echo "收到 SIGTERM,正在清理..."; sleep 2; echo "清理完成,退出"; exit 0' TERM
echo "脚本开始运行,PID: $$"
while true; do
echo "工作中..."
sleep 1
done`
// 写入临时脚本文件
scriptFile := "/tmp/test_script.sh"
err := os.WriteFile(scriptFile, []byte(script), 0755)
if err != nil {
log.Fatal(err)
}
defer os.Remove(scriptFile)
// 执行脚本
cmd := exec.Command("bash", scriptFile)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err = cmd.Start()
if err != nil {
log.Fatal(err)
}
fmt.Printf("脚本启动,PID: %d\n", cmd.Process.Pid)
// 让脚本运行一段时间
time.Sleep(3 * time.Second)
// 优雅关闭进程
err = gracefulShutdown(cmd, 5*time.Second)
if err != nil {
log.Printf("关闭进程时出错: %v", err)
}
}
进程池管理 #
对于需要管理多个进程的场景,我们可以实现一个简单的进程池:
package main
import (
"fmt"
"os/exec"
"sync"
"time"
"log"
"context"
)
// ProcessPool 进程池
type ProcessPool struct {
maxWorkers int
jobs chan Job
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
// Job 表示一个要执行的任务
type Job struct {
Command string
Args []string
ID int
}
// NewProcessPool 创建新的进程池
func NewProcessPool(maxWorkers int) *ProcessPool {
ctx, cancel := context.WithCancel(context.Background())
return &ProcessPool{
maxWorkers: maxWorkers,
jobs: make(chan Job, maxWorkers*2),
ctx: ctx,
cancel: cancel,
}
}
// Start 启动进程池
func (p *ProcessPool) Start() {
for i := 0; i < p.maxWorkers; i++ {
p.wg.Add(1)
go p.worker(i)
}
}
// worker 工作协程
func (p *ProcessPool) worker(id int) {
defer p.wg.Done()
fmt.Printf("Worker %d 启动\n", id)
for {
select {
case job := <-p.jobs:
fmt.Printf("Worker %d 处理任务 %d: %s %v\n", id, job.ID, job.Command, job.Args)
// 创建带超时的命令
ctx, cancel := context.WithTimeout(p.ctx, 10*time.Second)
cmd := exec.CommandContext(ctx, job.Command, job.Args...)
// 执行命令
output, err := cmd.Output()
if err != nil {
log.Printf("Worker %d 任务 %d 执行失败: %v", id, job.ID, err)
} else {
fmt.Printf("Worker %d 任务 %d 完成,输出长度: %d\n", id, job.ID, len(output))
}
cancel()
case <-p.ctx.Done():
fmt.Printf("Worker %d 退出\n", id)
return
}
}
}
// Submit 提交任务
func (p *ProcessPool) Submit(job Job) {
select {
case p.jobs <- job:
case <-p.ctx.Done():
log.Printf("进程池已关闭,无法提交任务 %d", job.ID)
}
}
// Shutdown 关闭进程池
func (p *ProcessPool) Shutdown() {
close(p.jobs)
p.cancel()
p.wg.Wait()
fmt.Println("进程池已关闭")
}
func main() {
// 创建进程池
pool := NewProcessPool(3)
pool.Start()
// 提交任务
jobs := []Job{
{Command: "echo", Args: []string{"Hello from job 1"}, ID: 1},
{Command: "sleep", Args: []string{"2"}, ID: 2},
{Command: "ls", Args: []string{"-l", "/tmp"}, ID: 3},
{Command: "echo", Args: []string{"Hello from job 4"}, ID: 4},
{Command: "date", Args: []string{}, ID: 5},
}
for _, job := range jobs {
pool.Submit(job)
}
// 等待一段时间让任务完成
time.Sleep(5 * time.Second)
// 关闭进程池
pool.Shutdown()
}
实践练习 #
练习 1:进程监控工具 #
创建一个简单的进程监控工具:
package main
import (
"fmt"
"os"
"os/exec"
"time"
"log"
"strconv"
"strings"
)
// ProcessInfo 进程信息
type ProcessInfo struct {
PID int
Command string
CPU float64
Memory float64
}
// getProcessInfo 获取进程信息
func getProcessInfo(pid int) (*ProcessInfo, error) {
// 使用 ps 命令获取进程信息
cmd := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pid,comm,%cpu,%mem", "--no-headers")
output, err := cmd.Output()
if err != nil {
return nil, err
}
fields := strings.Fields(string(output))
if len(fields) < 4 {
return nil, fmt.Errorf("无法解析进程信息")
}
cpu, _ := strconv.ParseFloat(fields[2], 64)
mem, _ := strconv.ParseFloat(fields[3], 64)
return &ProcessInfo{
PID: pid,
Command: fields[1],
CPU: cpu,
Memory: mem,
}, nil
}
// monitorProcess 监控进程
func monitorProcess(pid int, interval time.Duration) {
fmt.Printf("开始监控进程 %d\n", pid)
for {
info, err := getProcessInfo(pid)
if err != nil {
log.Printf("获取进程信息失败: %v", err)
break
}
fmt.Printf("PID: %d, 命令: %s, CPU: %.1f%%, 内存: %.1f%%\n",
info.PID, info.Command, info.CPU, info.Memory)
time.Sleep(interval)
}
}
func main() {
if len(os.Args) < 2 {
fmt.Println("用法: go run monitor.go <PID>")
return
}
pid, err := strconv.Atoi(os.Args[1])
if err != nil {
log.Fatal("无效的 PID")
}
monitorProcess(pid, 2*time.Second)
}
总结 #
本节介绍了 Go 语言中进程创建与管理的核心概念和实践方法:
- 进程基础:了解了进程的生命周期和标识符
- 进程创建:学会使用
os/exec
包创建和控制进程 - 状态管理:掌握了进程状态监控和超时控制
- 信号处理:学会了进程间的信号通信
- 进程池:实现了多进程的管理和调度
这些技能为构建复杂的系统级应用程序奠定了基础。在下一节中,我们将学习进程间通信的各种机制。