4.4.2 进程间通信 IPC #
进程间通信(Inter-Process Communication,IPC)是操作系统中不同进程之间交换数据和信息的机制。Go 语言提供了多种 IPC 方式,包括管道、命名管道、消息队列等。本节将详细介绍这些通信机制的实现和应用。
IPC 基础概念 #
IPC 的类型 #
进程间通信主要有以下几种类型:
- 管道(Pipe):包括匿名管道和命名管道
- 消息队列(Message Queue):进程间传递消息的队列
- 共享内存(Shared Memory):多个进程共享同一块内存区域
- 信号量(Semaphore):用于进程同步的计数器
- 套接字(Socket):网络通信机制,也可用于本地进程通信
IPC 的选择原则 #
选择合适的 IPC 机制需要考虑:
- 数据传输量:大量数据适合共享内存,少量数据可用管道
- 通信模式:一对一、一对多或多对多
- 同步需求:是否需要同步机制
- 跨网络需求:是否需要跨主机通信
管道通信 #
匿名管道 #
匿名管道是最简单的 IPC 机制,通常用于父子进程间通信:
package main
import (
"fmt"
"os/exec"
"io"
"log"
"bufio"
)
func main() {
// 创建管道
reader, writer := io.Pipe()
// 创建第一个命令:生成数据
cmd1 := exec.Command("echo", "Hello\nWorld\nGo\nProgramming")
cmd1.Stdout = writer
// 创建第二个命令:处理数据
cmd2 := exec.Command("grep", "Go")
cmd2.Stdin = reader
// 启动第一个命令
err := cmd1.Start()
if err != nil {
log.Fatal(err)
}
// 启动第二个命令并获取输出
output, err := cmd2.Output()
if err != nil {
log.Fatal(err)
}
// 等待第一个命令完成
err = cmd1.Wait()
if err != nil {
log.Fatal(err)
}
// 关闭写端
writer.Close()
fmt.Printf("管道输出: %s", output)
}
使用 StdoutPipe 和 StdinPipe #
Go 提供了更便捷的管道操作方法:
package main
import (
"fmt"
"os/exec"
"log"
"bufio"
"strings"
)
func pipelineExample() {
// 第一个命令:列出文件
cmd1 := exec.Command("ls", "-la")
// 第二个命令:过滤包含 "go" 的行
cmd2 := exec.Command("grep", "go")
// 建立管道连接
pipe, err := cmd1.StdoutPipe()
if err != nil {
log.Fatal(err)
}
cmd2.Stdin = pipe
// 启动两个命令
err = cmd1.Start()
if err != nil {
log.Fatal(err)
}
output, err := cmd2.Output()
if err != nil {
log.Fatal(err)
}
err = cmd1.Wait()
if err != nil {
log.Fatal(err)
}
fmt.Printf("管道输出:\n%s", output)
}
func interactiveProcess() {
// 创建交互式进程
cmd := exec.Command("bc", "-l") // 计算器程序
// 获取输入输出管道
stdin, err := cmd.StdinPipe()
if err != nil {
log.Fatal(err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Fatal(err)
}
// 启动进程
err = cmd.Start()
if err != nil {
log.Fatal(err)
}
// 发送计算表达式
expressions := []string{"2+3", "sqrt(16)", "3.14159 * 2"}
go func() {
defer stdin.Close()
for _, expr := range expressions {
fmt.Fprintf(stdin, "%s\n", expr)
}
fmt.Fprintf(stdin, "quit\n")
}()
// 读取输出
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := scanner.Text()
if strings.TrimSpace(line) != "" {
fmt.Printf("计算结果: %s\n", line)
}
}
err = cmd.Wait()
if err != nil {
log.Printf("进程结束: %v", err)
}
}
func main() {
fmt.Println("=== 管道示例 ===")
pipelineExample()
fmt.Println("\n=== 交互式进程示例 ===")
interactiveProcess()
}
命名管道 (FIFO) #
命名管道允许无关联的进程进行通信:
package main
import (
"fmt"
"os"
"log"
"syscall"
"time"
)
const fifoPath = "/tmp/myfifo"
// 创建命名管道
func createFIFO() error {
// 删除已存在的 FIFO
os.Remove(fifoPath)
// 创建 FIFO
err := syscall.Mkfifo(fifoPath, 0666)
if err != nil {
return fmt.Errorf("创建 FIFO 失败: %v", err)
}
fmt.Printf("FIFO 创建成功: %s\n", fifoPath)
return nil
}
// FIFO 写入者
func fifoWriter() {
fmt.Println("启动 FIFO 写入者")
// 打开 FIFO 进行写入
file, err := os.OpenFile(fifoPath, os.O_WRONLY, 0)
if err != nil {
log.Fatal(err)
}
defer file.Close()
// 写入数据
messages := []string{
"Hello from writer",
"Message 2",
"Message 3",
"EOF",
}
for _, msg := range messages {
fmt.Printf("写入: %s\n", msg)
_, err := file.WriteString(msg + "\n")
if err != nil {
log.Printf("写入失败: %v", err)
break
}
time.Sleep(1 * time.Second)
}
}
// FIFO 读取者
func fifoReader() {
fmt.Println("启动 FIFO 读取者")
// 打开 FIFO 进行读取
file, err := os.OpenFile(fifoPath, os.O_RDONLY, 0)
if err != nil {
log.Fatal(err)
}
defer file.Close()
// 读取数据
buffer := make([]byte, 1024)
for {
n, err := file.Read(buffer)
if err != nil {
log.Printf("读取失败: %v", err)
break
}
if n > 0 {
message := string(buffer[:n])
fmt.Printf("读取: %s", message)
if message == "EOF\n" {
break
}
}
}
}
func main() {
// 创建 FIFO
err := createFIFO()
if err != nil {
log.Fatal(err)
}
defer os.Remove(fifoPath)
// 启动读取者(在单独的 goroutine 中)
go fifoReader()
// 等待一下确保读取者已准备好
time.Sleep(1 * time.Second)
// 启动写入者
fifoWriter()
// 等待完成
time.Sleep(2 * time.Second)
}
消息队列 #
虽然 Go 标准库没有直接支持 System V 消息队列,但我们可以通过系统调用实现:
package main
import (
"fmt"
"unsafe"
"syscall"
"log"
"time"
)
const (
IPC_CREAT = 01000
IPC_EXCL = 02000
IPC_RMID = 0
)
// 消息结构
type Message struct {
Type int64
Text [256]byte
}
// 消息队列管理器
type MessageQueue struct {
id int
key int
}
// 创建或获取消息队列
func NewMessageQueue(key int) (*MessageQueue, error) {
// 创建消息队列
id, _, errno := syscall.Syscall(syscall.SYS_MSGGET,
uintptr(key),
uintptr(IPC_CREAT|0666),
0)
if errno != 0 {
return nil, fmt.Errorf("创建消息队列失败: %v", errno)
}
return &MessageQueue{
id: int(id),
key: key,
}, nil
}
// 发送消息
func (mq *MessageQueue) Send(msgType int64, text string) error {
var msg Message
msg.Type = msgType
copy(msg.Text[:], text)
_, _, errno := syscall.Syscall6(syscall.SYS_MSGSND,
uintptr(mq.id),
uintptr(unsafe.Pointer(&msg)),
uintptr(len(text)),
0, 0, 0)
if errno != 0 {
return fmt.Errorf("发送消息失败: %v", errno)
}
return nil
}
// 接收消息
func (mq *MessageQueue) Receive(msgType int64) (string, error) {
var msg Message
n, _, errno := syscall.Syscall6(syscall.SYS_MSGRCV,
uintptr(mq.id),
uintptr(unsafe.Pointer(&msg)),
256,
uintptr(msgType),
0, 0)
if errno != 0 {
return "", fmt.Errorf("接收消息失败: %v", errno)
}
return string(msg.Text[:n]), nil
}
// 删除消息队列
func (mq *MessageQueue) Remove() error {
_, _, errno := syscall.Syscall(syscall.SYS_MSGCTL,
uintptr(mq.id),
IPC_RMID,
0)
if errno != 0 {
return fmt.Errorf("删除消息队列失败: %v", errno)
}
return nil
}
func messageQueueExample() {
key := 12345
// 创建消息队列
mq, err := NewMessageQueue(key)
if err != nil {
log.Fatal(err)
}
defer mq.Remove()
fmt.Printf("消息队列创建成功,ID: %d\n", mq.id)
// 启动接收者
go func() {
fmt.Println("启动消息接收者")
for i := 0; i < 3; i++ {
msg, err := mq.Receive(1) // 接收类型为 1 的消息
if err != nil {
log.Printf("接收消息失败: %v", err)
continue
}
fmt.Printf("接收到消息: %s\n", msg)
}
}()
// 等待接收者准备好
time.Sleep(1 * time.Second)
// 发送消息
messages := []string{"Hello", "World", "Go"}
for _, msg := range messages {
fmt.Printf("发送消息: %s\n", msg)
err := mq.Send(1, msg)
if err != nil {
log.Printf("发送消息失败: %v", err)
}
time.Sleep(1 * time.Second)
}
// 等待处理完成
time.Sleep(2 * time.Second)
}
func main() {
messageQueueExample()
}
基于网络的 IPC #
使用 Unix 域套接字进行本地进程通信:
package main
import (
"fmt"
"net"
"log"
"time"
"bufio"
"os"
)
const socketPath = "/tmp/ipc_socket"
// Unix 域套接字服务器
func unixSocketServer() {
// 删除已存在的套接字文件
os.Remove(socketPath)
// 创建 Unix 域套接字监听器
listener, err := net.Listen("unix", socketPath)
if err != nil {
log.Fatal(err)
}
defer listener.Close()
defer os.Remove(socketPath)
fmt.Printf("Unix 域套接字服务器启动: %s\n", socketPath)
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("接受连接失败: %v", err)
continue
}
go handleConnection(conn)
}
}
// 处理连接
func handleConnection(conn net.Conn) {
defer conn.Close()
fmt.Printf("新连接: %s\n", conn.RemoteAddr())
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
message := scanner.Text()
fmt.Printf("接收到消息: %s\n", message)
// 回显消息
response := fmt.Sprintf("Echo: %s\n", message)
conn.Write([]byte(response))
if message == "quit" {
break
}
}
fmt.Printf("连接关闭: %s\n", conn.RemoteAddr())
}
// Unix 域套接字客户端
func unixSocketClient() {
// 连接到 Unix 域套接字
conn, err := net.Dial("unix", socketPath)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
fmt.Println("连接到 Unix 域套接字服务器")
// 发送消息
messages := []string{"Hello", "World", "Go", "quit"}
for _, msg := range messages {
fmt.Printf("发送: %s\n", msg)
conn.Write([]byte(msg + "\n"))
// 读取响应
buffer := make([]byte, 1024)
n, err := conn.Read(buffer)
if err != nil {
log.Printf("读取响应失败: %v", err)
break
}
fmt.Printf("响应: %s", string(buffer[:n]))
time.Sleep(1 * time.Second)
}
}
func main() {
// 启动服务器
go unixSocketServer()
// 等待服务器启动
time.Sleep(1 * time.Second)
// 启动客户端
unixSocketClient()
// 等待完成
time.Sleep(1 * time.Second)
}
高级 IPC 模式 #
生产者-消费者模式 #
使用管道实现生产者-消费者模式:
package main
import (
"fmt"
"time"
"sync"
"math/rand"
)
// 任务结构
type Task struct {
ID int
Data string
}
// 生产者-消费者系统
type ProducerConsumerSystem struct {
taskChan chan Task
resultChan chan string
wg sync.WaitGroup
}
// 创建系统
func NewProducerConsumerSystem(bufferSize int) *ProducerConsumerSystem {
return &ProducerConsumerSystem{
taskChan: make(chan Task, bufferSize),
resultChan: make(chan string, bufferSize),
}
}
// 生产者
func (pcs *ProducerConsumerSystem) Producer(id int, taskCount int) {
defer pcs.wg.Done()
for i := 0; i < taskCount; i++ {
task := Task{
ID: i,
Data: fmt.Sprintf("Producer-%d-Task-%d", id, i),
}
fmt.Printf("生产者 %d 生产任务: %s\n", id, task.Data)
pcs.taskChan <- task
// 模拟生产时间
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
fmt.Printf("生产者 %d 完成\n", id)
}
// 消费者
func (pcs *ProducerConsumerSystem) Consumer(id int) {
defer pcs.wg.Done()
for task := range pcs.taskChan {
fmt.Printf("消费者 %d 处理任务: %s\n", id, task.Data)
// 模拟处理时间
time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond)
result := fmt.Sprintf("Processed-%s", task.Data)
pcs.resultChan <- result
fmt.Printf("消费者 %d 完成任务: %s\n", id, task.Data)
}
fmt.Printf("消费者 %d 退出\n", id)
}
// 结果收集器
func (pcs *ProducerConsumerSystem) ResultCollector() {
defer pcs.wg.Done()
for result := range pcs.resultChan {
fmt.Printf("收集结果: %s\n", result)
}
fmt.Println("结果收集器退出")
}
// 运行系统
func (pcs *ProducerConsumerSystem) Run(producerCount, consumerCount, taskPerProducer int) {
// 启动生产者
for i := 0; i < producerCount; i++ {
pcs.wg.Add(1)
go pcs.Producer(i, taskPerProducer)
}
// 启动消费者
for i := 0; i < consumerCount; i++ {
pcs.wg.Add(1)
go pcs.Consumer(i)
}
// 启动结果收集器
pcs.wg.Add(1)
go pcs.ResultCollector()
// 等待所有生产者完成
go func() {
pcs.wg.Wait()
close(pcs.taskChan)
}()
// 等待所有消费者完成
go func() {
// 等待任务通道关闭和所有消费者退出
time.Sleep(5 * time.Second)
close(pcs.resultChan)
}()
pcs.wg.Wait()
}
func main() {
rand.Seed(time.Now().UnixNano())
fmt.Println("=== 生产者-消费者模式示例 ===")
system := NewProducerConsumerSystem(10)
system.Run(2, 3, 5) // 2个生产者,3个消费者,每个生产者生产5个任务
fmt.Println("系统运行完成")
}
实践练习 #
练习 1:进程间日志系统 #
创建一个进程间日志系统,多个进程可以向日志服务发送日志消息:
package main
import (
"fmt"
"net"
"log"
"time"
"os"
"bufio"
"encoding/json"
)
// 日志消息结构
type LogMessage struct {
Timestamp time.Time `json:"timestamp"`
Level string `json:"level"`
Process string `json:"process"`
Message string `json:"message"`
}
// 日志服务器
type LogServer struct {
socketPath string
logFile *os.File
}
// 创建日志服务器
func NewLogServer(socketPath, logFilePath string) (*LogServer, error) {
logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
return &LogServer{
socketPath: socketPath,
logFile: logFile,
}, nil
}
// 启动日志服务器
func (ls *LogServer) Start() error {
// 删除已存在的套接字文件
os.Remove(ls.socketPath)
listener, err := net.Listen("unix", ls.socketPath)
if err != nil {
return err
}
defer listener.Close()
defer os.Remove(ls.socketPath)
defer ls.logFile.Close()
fmt.Printf("日志服务器启动: %s\n", ls.socketPath)
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("接受连接失败: %v", err)
continue
}
go ls.handleLogConnection(conn)
}
}
// 处理日志连接
func (ls *LogServer) handleLogConnection(conn net.Conn) {
defer conn.Close()
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
var logMsg LogMessage
err := json.Unmarshal(scanner.Bytes(), &logMsg)
if err != nil {
log.Printf("解析日志消息失败: %v", err)
continue
}
// 写入日志文件
logLine := fmt.Sprintf("[%s] %s [%s] %s\n",
logMsg.Timestamp.Format("2006-01-02 15:04:05"),
logMsg.Level,
logMsg.Process,
logMsg.Message)
ls.logFile.WriteString(logLine)
ls.logFile.Sync()
fmt.Printf("记录日志: %s", logLine)
}
}
// 日志客户端
type LogClient struct {
socketPath string
processName string
conn net.Conn
}
// 创建日志客户端
func NewLogClient(socketPath, processName string) (*LogClient, error) {
conn, err := net.Dial("unix", socketPath)
if err != nil {
return nil, err
}
return &LogClient{
socketPath: socketPath,
processName: processName,
conn: conn,
}, nil
}
// 发送日志
func (lc *LogClient) Log(level, message string) error {
logMsg := LogMessage{
Timestamp: time.Now(),
Level: level,
Process: lc.processName,
Message: message,
}
data, err := json.Marshal(logMsg)
if err != nil {
return err
}
_, err = lc.conn.Write(append(data, '\n'))
return err
}
// 关闭客户端
func (lc *LogClient) Close() error {
return lc.conn.Close()
}
func main() {
socketPath := "/tmp/log_server.sock"
logFilePath := "/tmp/system.log"
if len(os.Args) > 1 && os.Args[1] == "server" {
// 启动日志服务器
server, err := NewLogServer(socketPath, logFilePath)
if err != nil {
log.Fatal(err)
}
server.Start()
} else {
// 启动日志客户端
processName := fmt.Sprintf("client-%d", os.Getpid())
client, err := NewLogClient(socketPath, processName)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 发送一些日志消息
messages := []struct {
level string
message string
}{
{"INFO", "应用程序启动"},
{"DEBUG", "调试信息"},
{"WARN", "警告消息"},
{"ERROR", "错误发生"},
{"INFO", "应用程序结束"},
}
for _, msg := range messages {
err := client.Log(msg.level, msg.message)
if err != nil {
log.Printf("发送日志失败: %v", err)
}
time.Sleep(1 * time.Second)
}
}
}
总结 #
本节详细介绍了 Go 语言中进程间通信的各种机制:
- 管道通信:学会了使用匿名管道和命名管道进行进程间数据传输
- 消息队列:了解了如何使用系统调用实现消息队列通信
- 网络 IPC:掌握了使用 Unix 域套接字进行本地进程通信
- 高级模式:实现了生产者-消费者等经典并发模式
- 实践应用:构建了完整的进程间日志系统
这些 IPC 机制为构建复杂的多进程系统提供了强大的基础。在下一节中,我们将深入学习系统调用的详细使用方法。