4.4.2 进程间通信 IPC

4.4.2 进程间通信 IPC #

进程间通信(Inter-Process Communication,IPC)是操作系统中不同进程之间交换数据和信息的机制。Go 语言提供了多种 IPC 方式,包括管道、命名管道、消息队列等。本节将详细介绍这些通信机制的实现和应用。

IPC 基础概念 #

IPC 的类型 #

进程间通信主要有以下几种类型:

  1. 管道(Pipe):包括匿名管道和命名管道
  2. 消息队列(Message Queue):进程间传递消息的队列
  3. 共享内存(Shared Memory):多个进程共享同一块内存区域
  4. 信号量(Semaphore):用于进程同步的计数器
  5. 套接字(Socket):网络通信机制,也可用于本地进程通信

IPC 的选择原则 #

选择合适的 IPC 机制需要考虑:

  • 数据传输量:大量数据适合共享内存,少量数据可用管道
  • 通信模式:一对一、一对多或多对多
  • 同步需求:是否需要同步机制
  • 跨网络需求:是否需要跨主机通信

管道通信 #

匿名管道 #

匿名管道是最简单的 IPC 机制,通常用于父子进程间通信:

package main

import (
    "fmt"
    "os/exec"
    "io"
    "log"
    "bufio"
)

func main() {
    // 创建管道
    reader, writer := io.Pipe()

    // 创建第一个命令:生成数据
    cmd1 := exec.Command("echo", "Hello\nWorld\nGo\nProgramming")
    cmd1.Stdout = writer

    // 创建第二个命令:处理数据
    cmd2 := exec.Command("grep", "Go")
    cmd2.Stdin = reader

    // 启动第一个命令
    err := cmd1.Start()
    if err != nil {
        log.Fatal(err)
    }

    // 启动第二个命令并获取输出
    output, err := cmd2.Output()
    if err != nil {
        log.Fatal(err)
    }

    // 等待第一个命令完成
    err = cmd1.Wait()
    if err != nil {
        log.Fatal(err)
    }

    // 关闭写端
    writer.Close()

    fmt.Printf("管道输出: %s", output)
}

使用 StdoutPipe 和 StdinPipe #

Go 提供了更便捷的管道操作方法:

package main

import (
    "fmt"
    "os/exec"
    "log"
    "bufio"
    "strings"
)

func pipelineExample() {
    // 第一个命令:列出文件
    cmd1 := exec.Command("ls", "-la")

    // 第二个命令:过滤包含 "go" 的行
    cmd2 := exec.Command("grep", "go")

    // 建立管道连接
    pipe, err := cmd1.StdoutPipe()
    if err != nil {
        log.Fatal(err)
    }

    cmd2.Stdin = pipe

    // 启动两个命令
    err = cmd1.Start()
    if err != nil {
        log.Fatal(err)
    }

    output, err := cmd2.Output()
    if err != nil {
        log.Fatal(err)
    }

    err = cmd1.Wait()
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("管道输出:\n%s", output)
}

func interactiveProcess() {
    // 创建交互式进程
    cmd := exec.Command("bc", "-l") // 计算器程序

    // 获取输入输出管道
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Fatal(err)
    }

    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Fatal(err)
    }

    // 启动进程
    err = cmd.Start()
    if err != nil {
        log.Fatal(err)
    }

    // 发送计算表达式
    expressions := []string{"2+3", "sqrt(16)", "3.14159 * 2"}

    go func() {
        defer stdin.Close()
        for _, expr := range expressions {
            fmt.Fprintf(stdin, "%s\n", expr)
        }
        fmt.Fprintf(stdin, "quit\n")
    }()

    // 读取输出
    scanner := bufio.NewScanner(stdout)
    for scanner.Scan() {
        line := scanner.Text()
        if strings.TrimSpace(line) != "" {
            fmt.Printf("计算结果: %s\n", line)
        }
    }

    err = cmd.Wait()
    if err != nil {
        log.Printf("进程结束: %v", err)
    }
}

func main() {
    fmt.Println("=== 管道示例 ===")
    pipelineExample()

    fmt.Println("\n=== 交互式进程示例 ===")
    interactiveProcess()
}

命名管道 (FIFO) #

命名管道允许无关联的进程进行通信:

package main

import (
    "fmt"
    "os"
    "log"
    "syscall"
    "time"
)

const fifoPath = "/tmp/myfifo"

// 创建命名管道
func createFIFO() error {
    // 删除已存在的 FIFO
    os.Remove(fifoPath)

    // 创建 FIFO
    err := syscall.Mkfifo(fifoPath, 0666)
    if err != nil {
        return fmt.Errorf("创建 FIFO 失败: %v", err)
    }

    fmt.Printf("FIFO 创建成功: %s\n", fifoPath)
    return nil
}

// FIFO 写入者
func fifoWriter() {
    fmt.Println("启动 FIFO 写入者")

    // 打开 FIFO 进行写入
    file, err := os.OpenFile(fifoPath, os.O_WRONLY, 0)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    // 写入数据
    messages := []string{
        "Hello from writer",
        "Message 2",
        "Message 3",
        "EOF",
    }

    for _, msg := range messages {
        fmt.Printf("写入: %s\n", msg)
        _, err := file.WriteString(msg + "\n")
        if err != nil {
            log.Printf("写入失败: %v", err)
            break
        }
        time.Sleep(1 * time.Second)
    }
}

// FIFO 读取者
func fifoReader() {
    fmt.Println("启动 FIFO 读取者")

    // 打开 FIFO 进行读取
    file, err := os.OpenFile(fifoPath, os.O_RDONLY, 0)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    // 读取数据
    buffer := make([]byte, 1024)
    for {
        n, err := file.Read(buffer)
        if err != nil {
            log.Printf("读取失败: %v", err)
            break
        }

        if n > 0 {
            message := string(buffer[:n])
            fmt.Printf("读取: %s", message)

            if message == "EOF\n" {
                break
            }
        }
    }
}

func main() {
    // 创建 FIFO
    err := createFIFO()
    if err != nil {
        log.Fatal(err)
    }
    defer os.Remove(fifoPath)

    // 启动读取者(在单独的 goroutine 中)
    go fifoReader()

    // 等待一下确保读取者已准备好
    time.Sleep(1 * time.Second)

    // 启动写入者
    fifoWriter()

    // 等待完成
    time.Sleep(2 * time.Second)
}

消息队列 #

虽然 Go 标准库没有直接支持 System V 消息队列,但我们可以通过系统调用实现:

package main

import (
    "fmt"
    "unsafe"
    "syscall"
    "log"
    "time"
)

const (
    IPC_CREAT = 01000
    IPC_EXCL  = 02000
    IPC_RMID  = 0
)

// 消息结构
type Message struct {
    Type int64
    Text [256]byte
}

// 消息队列管理器
type MessageQueue struct {
    id  int
    key int
}

// 创建或获取消息队列
func NewMessageQueue(key int) (*MessageQueue, error) {
    // 创建消息队列
    id, _, errno := syscall.Syscall(syscall.SYS_MSGGET,
        uintptr(key),
        uintptr(IPC_CREAT|0666),
        0)

    if errno != 0 {
        return nil, fmt.Errorf("创建消息队列失败: %v", errno)
    }

    return &MessageQueue{
        id:  int(id),
        key: key,
    }, nil
}

// 发送消息
func (mq *MessageQueue) Send(msgType int64, text string) error {
    var msg Message
    msg.Type = msgType
    copy(msg.Text[:], text)

    _, _, errno := syscall.Syscall6(syscall.SYS_MSGSND,
        uintptr(mq.id),
        uintptr(unsafe.Pointer(&msg)),
        uintptr(len(text)),
        0, 0, 0)

    if errno != 0 {
        return fmt.Errorf("发送消息失败: %v", errno)
    }

    return nil
}

// 接收消息
func (mq *MessageQueue) Receive(msgType int64) (string, error) {
    var msg Message

    n, _, errno := syscall.Syscall6(syscall.SYS_MSGRCV,
        uintptr(mq.id),
        uintptr(unsafe.Pointer(&msg)),
        256,
        uintptr(msgType),
        0, 0)

    if errno != 0 {
        return "", fmt.Errorf("接收消息失败: %v", errno)
    }

    return string(msg.Text[:n]), nil
}

// 删除消息队列
func (mq *MessageQueue) Remove() error {
    _, _, errno := syscall.Syscall(syscall.SYS_MSGCTL,
        uintptr(mq.id),
        IPC_RMID,
        0)

    if errno != 0 {
        return fmt.Errorf("删除消息队列失败: %v", errno)
    }

    return nil
}

func messageQueueExample() {
    key := 12345

    // 创建消息队列
    mq, err := NewMessageQueue(key)
    if err != nil {
        log.Fatal(err)
    }
    defer mq.Remove()

    fmt.Printf("消息队列创建成功,ID: %d\n", mq.id)

    // 启动接收者
    go func() {
        fmt.Println("启动消息接收者")
        for i := 0; i < 3; i++ {
            msg, err := mq.Receive(1) // 接收类型为 1 的消息
            if err != nil {
                log.Printf("接收消息失败: %v", err)
                continue
            }
            fmt.Printf("接收到消息: %s\n", msg)
        }
    }()

    // 等待接收者准备好
    time.Sleep(1 * time.Second)

    // 发送消息
    messages := []string{"Hello", "World", "Go"}
    for _, msg := range messages {
        fmt.Printf("发送消息: %s\n", msg)
        err := mq.Send(1, msg)
        if err != nil {
            log.Printf("发送消息失败: %v", err)
        }
        time.Sleep(1 * time.Second)
    }

    // 等待处理完成
    time.Sleep(2 * time.Second)
}

func main() {
    messageQueueExample()
}

基于网络的 IPC #

使用 Unix 域套接字进行本地进程通信:

package main

import (
    "fmt"
    "net"
    "log"
    "time"
    "bufio"
    "os"
)

const socketPath = "/tmp/ipc_socket"

// Unix 域套接字服务器
func unixSocketServer() {
    // 删除已存在的套接字文件
    os.Remove(socketPath)

    // 创建 Unix 域套接字监听器
    listener, err := net.Listen("unix", socketPath)
    if err != nil {
        log.Fatal(err)
    }
    defer listener.Close()
    defer os.Remove(socketPath)

    fmt.Printf("Unix 域套接字服务器启动: %s\n", socketPath)

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Printf("接受连接失败: %v", err)
            continue
        }

        go handleConnection(conn)
    }
}

// 处理连接
func handleConnection(conn net.Conn) {
    defer conn.Close()

    fmt.Printf("新连接: %s\n", conn.RemoteAddr())

    scanner := bufio.NewScanner(conn)
    for scanner.Scan() {
        message := scanner.Text()
        fmt.Printf("接收到消息: %s\n", message)

        // 回显消息
        response := fmt.Sprintf("Echo: %s\n", message)
        conn.Write([]byte(response))

        if message == "quit" {
            break
        }
    }

    fmt.Printf("连接关闭: %s\n", conn.RemoteAddr())
}

// Unix 域套接字客户端
func unixSocketClient() {
    // 连接到 Unix 域套接字
    conn, err := net.Dial("unix", socketPath)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    fmt.Println("连接到 Unix 域套接字服务器")

    // 发送消息
    messages := []string{"Hello", "World", "Go", "quit"}

    for _, msg := range messages {
        fmt.Printf("发送: %s\n", msg)
        conn.Write([]byte(msg + "\n"))

        // 读取响应
        buffer := make([]byte, 1024)
        n, err := conn.Read(buffer)
        if err != nil {
            log.Printf("读取响应失败: %v", err)
            break
        }

        fmt.Printf("响应: %s", string(buffer[:n]))
        time.Sleep(1 * time.Second)
    }
}

func main() {
    // 启动服务器
    go unixSocketServer()

    // 等待服务器启动
    time.Sleep(1 * time.Second)

    // 启动客户端
    unixSocketClient()

    // 等待完成
    time.Sleep(1 * time.Second)
}

高级 IPC 模式 #

生产者-消费者模式 #

使用管道实现生产者-消费者模式:

package main

import (
    "fmt"
    "time"
    "sync"
    "math/rand"
)

// 任务结构
type Task struct {
    ID   int
    Data string
}

// 生产者-消费者系统
type ProducerConsumerSystem struct {
    taskChan   chan Task
    resultChan chan string
    wg         sync.WaitGroup
}

// 创建系统
func NewProducerConsumerSystem(bufferSize int) *ProducerConsumerSystem {
    return &ProducerConsumerSystem{
        taskChan:   make(chan Task, bufferSize),
        resultChan: make(chan string, bufferSize),
    }
}

// 生产者
func (pcs *ProducerConsumerSystem) Producer(id int, taskCount int) {
    defer pcs.wg.Done()

    for i := 0; i < taskCount; i++ {
        task := Task{
            ID:   i,
            Data: fmt.Sprintf("Producer-%d-Task-%d", id, i),
        }

        fmt.Printf("生产者 %d 生产任务: %s\n", id, task.Data)
        pcs.taskChan <- task

        // 模拟生产时间
        time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    }

    fmt.Printf("生产者 %d 完成\n", id)
}

// 消费者
func (pcs *ProducerConsumerSystem) Consumer(id int) {
    defer pcs.wg.Done()

    for task := range pcs.taskChan {
        fmt.Printf("消费者 %d 处理任务: %s\n", id, task.Data)

        // 模拟处理时间
        time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond)

        result := fmt.Sprintf("Processed-%s", task.Data)
        pcs.resultChan <- result

        fmt.Printf("消费者 %d 完成任务: %s\n", id, task.Data)
    }

    fmt.Printf("消费者 %d 退出\n", id)
}

// 结果收集器
func (pcs *ProducerConsumerSystem) ResultCollector() {
    defer pcs.wg.Done()

    for result := range pcs.resultChan {
        fmt.Printf("收集结果: %s\n", result)
    }

    fmt.Println("结果收集器退出")
}

// 运行系统
func (pcs *ProducerConsumerSystem) Run(producerCount, consumerCount, taskPerProducer int) {
    // 启动生产者
    for i := 0; i < producerCount; i++ {
        pcs.wg.Add(1)
        go pcs.Producer(i, taskPerProducer)
    }

    // 启动消费者
    for i := 0; i < consumerCount; i++ {
        pcs.wg.Add(1)
        go pcs.Consumer(i)
    }

    // 启动结果收集器
    pcs.wg.Add(1)
    go pcs.ResultCollector()

    // 等待所有生产者完成
    go func() {
        pcs.wg.Wait()
        close(pcs.taskChan)
    }()

    // 等待所有消费者完成
    go func() {
        // 等待任务通道关闭和所有消费者退出
        time.Sleep(5 * time.Second)
        close(pcs.resultChan)
    }()

    pcs.wg.Wait()
}

func main() {
    rand.Seed(time.Now().UnixNano())

    fmt.Println("=== 生产者-消费者模式示例 ===")

    system := NewProducerConsumerSystem(10)
    system.Run(2, 3, 5) // 2个生产者,3个消费者,每个生产者生产5个任务

    fmt.Println("系统运行完成")
}

实践练习 #

练习 1:进程间日志系统 #

创建一个进程间日志系统,多个进程可以向日志服务发送日志消息:

package main

import (
    "fmt"
    "net"
    "log"
    "time"
    "os"
    "bufio"
    "encoding/json"
)

// 日志消息结构
type LogMessage struct {
    Timestamp time.Time `json:"timestamp"`
    Level     string    `json:"level"`
    Process   string    `json:"process"`
    Message   string    `json:"message"`
}

// 日志服务器
type LogServer struct {
    socketPath string
    logFile    *os.File
}

// 创建日志服务器
func NewLogServer(socketPath, logFilePath string) (*LogServer, error) {
    logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
    if err != nil {
        return nil, err
    }

    return &LogServer{
        socketPath: socketPath,
        logFile:    logFile,
    }, nil
}

// 启动日志服务器
func (ls *LogServer) Start() error {
    // 删除已存在的套接字文件
    os.Remove(ls.socketPath)

    listener, err := net.Listen("unix", ls.socketPath)
    if err != nil {
        return err
    }
    defer listener.Close()
    defer os.Remove(ls.socketPath)
    defer ls.logFile.Close()

    fmt.Printf("日志服务器启动: %s\n", ls.socketPath)

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Printf("接受连接失败: %v", err)
            continue
        }

        go ls.handleLogConnection(conn)
    }
}

// 处理日志连接
func (ls *LogServer) handleLogConnection(conn net.Conn) {
    defer conn.Close()

    scanner := bufio.NewScanner(conn)
    for scanner.Scan() {
        var logMsg LogMessage
        err := json.Unmarshal(scanner.Bytes(), &logMsg)
        if err != nil {
            log.Printf("解析日志消息失败: %v", err)
            continue
        }

        // 写入日志文件
        logLine := fmt.Sprintf("[%s] %s [%s] %s\n",
            logMsg.Timestamp.Format("2006-01-02 15:04:05"),
            logMsg.Level,
            logMsg.Process,
            logMsg.Message)

        ls.logFile.WriteString(logLine)
        ls.logFile.Sync()

        fmt.Printf("记录日志: %s", logLine)
    }
}

// 日志客户端
type LogClient struct {
    socketPath string
    processName string
    conn       net.Conn
}

// 创建日志客户端
func NewLogClient(socketPath, processName string) (*LogClient, error) {
    conn, err := net.Dial("unix", socketPath)
    if err != nil {
        return nil, err
    }

    return &LogClient{
        socketPath:  socketPath,
        processName: processName,
        conn:        conn,
    }, nil
}

// 发送日志
func (lc *LogClient) Log(level, message string) error {
    logMsg := LogMessage{
        Timestamp: time.Now(),
        Level:     level,
        Process:   lc.processName,
        Message:   message,
    }

    data, err := json.Marshal(logMsg)
    if err != nil {
        return err
    }

    _, err = lc.conn.Write(append(data, '\n'))
    return err
}

// 关闭客户端
func (lc *LogClient) Close() error {
    return lc.conn.Close()
}

func main() {
    socketPath := "/tmp/log_server.sock"
    logFilePath := "/tmp/system.log"

    if len(os.Args) > 1 && os.Args[1] == "server" {
        // 启动日志服务器
        server, err := NewLogServer(socketPath, logFilePath)
        if err != nil {
            log.Fatal(err)
        }

        server.Start()
    } else {
        // 启动日志客户端
        processName := fmt.Sprintf("client-%d", os.Getpid())

        client, err := NewLogClient(socketPath, processName)
        if err != nil {
            log.Fatal(err)
        }
        defer client.Close()

        // 发送一些日志消息
        messages := []struct {
            level   string
            message string
        }{
            {"INFO", "应用程序启动"},
            {"DEBUG", "调试信息"},
            {"WARN", "警告消息"},
            {"ERROR", "错误发生"},
            {"INFO", "应用程序结束"},
        }

        for _, msg := range messages {
            err := client.Log(msg.level, msg.message)
            if err != nil {
                log.Printf("发送日志失败: %v", err)
            }
            time.Sleep(1 * time.Second)
        }
    }
}

总结 #

本节详细介绍了 Go 语言中进程间通信的各种机制:

  1. 管道通信:学会了使用匿名管道和命名管道进行进程间数据传输
  2. 消息队列:了解了如何使用系统调用实现消息队列通信
  3. 网络 IPC:掌握了使用 Unix 域套接字进行本地进程通信
  4. 高级模式:实现了生产者-消费者等经典并发模式
  5. 实践应用:构建了完整的进程间日志系统

这些 IPC 机制为构建复杂的多进程系统提供了强大的基础。在下一节中,我们将深入学习系统调用的详细使用方法。