5.2.4 容器网络与存储

5.2.4 容器网络与存储 #

容器网络和存储是容器化应用的重要基础设施组件。理解和掌握这些概念对于构建可扩展、可靠的容器化 Go 应用至关重要。本节将深入探讨 Docker 网络模式、容器间通信机制以及数据持久化方案。

Docker 网络基础 #

网络命名空间 #

Docker 使用 Linux 网络命名空间来隔离容器网络,每个容器都有自己的网络栈:

# 查看容器网络命名空间
docker run -d --name test-container nginx
docker exec test-container ip addr show

# 查看宿主机网络接口
ip addr show

# 查看 Docker 网络列表
docker network ls

网络驱动类型 #

Docker 支持多种网络驱动:

  1. bridge:默认网络驱动
  2. host:使用宿主机网络
  3. none:禁用网络
  4. overlay:跨主机网络(Swarm 模式)
  5. macvlan:分配 MAC 地址

Bridge 网络模式 #

默认 Bridge 网络 #

# 查看默认 bridge 网络
docker network inspect bridge

# 在默认网络中运行容器
docker run -d --name app1 nginx
docker run -d --name app2 nginx

# 容器间通信(通过 IP 地址)
docker exec app1 ping $(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' app2)

自定义 Bridge 网络 #

# 创建自定义网络
docker network create --driver bridge mynetwork

# 查看网络详情
docker network inspect mynetwork

# 在自定义网络中运行容器
docker run -d --name app1 --network mynetwork nginx
docker run -d --name app2 --network mynetwork nginx

# 容器间通信(通过容器名)
docker exec app1 ping app2

Go 应用网络配置示例 #

// network-demo/main.go
package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "os"
    "time"
)

type NetworkInfo struct {
    Hostname    string `json:"hostname"`
    ContainerIP string `json:"container_ip"`
    Port        string `json:"port"`
    Timestamp   string `json:"timestamp"`
}

func main() {
    port := os.Getenv("PORT")
    if port == "" {
        port = "8080"
    }

    http.HandleFunc("/info", func(w http.ResponseWriter, r *http.Request) {
        hostname, _ := os.Hostname()

        info := NetworkInfo{
            Hostname:    hostname,
            ContainerIP: getContainerIP(),
            Port:        port,
            Timestamp:   time.Now().Format(time.RFC3339),
        }

        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(info)
    })

    http.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
        target := r.URL.Query().Get("target")
        if target == "" {
            http.Error(w, "target parameter required", http.StatusBadRequest)
            return
        }

        resp, err := http.Get(fmt.Sprintf("http://%s:8080/info", target))
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        defer resp.Body.Close()

        var targetInfo NetworkInfo
        json.NewDecoder(resp.Body).Decode(&targetInfo)

        result := map[string]interface{}{
            "source": info,
            "target": targetInfo,
            "status": "connected",
        }

        json.NewEncoder(w).Encode(result)
    })

    log.Printf("Server starting on port %s", port)
    log.Fatal(http.ListenAndServe(":"+port, nil))
}

func getContainerIP() string {
    // 简化的 IP 获取逻辑
    return os.Getenv("CONTAINER_IP")
}

网络配置 Dockerfile #

FROM golang:1.19-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 go build -o network-demo .

FROM alpine:latest
RUN apk --no-cache add ca-certificates curl
WORKDIR /app
COPY --from=builder /app/network-demo .
EXPOSE 8080
CMD ["./network-demo"]

Docker Compose 网络配置 #

# docker-compose.yml
version: "3.8"

services:
  app1:
    build: .
    container_name: app1
    networks:
      - app-network
    environment:
      - PORT=8080
      - CONTAINER_IP=app1

  app2:
    build: .
    container_name: app2
    networks:
      - app-network
    environment:
      - PORT=8080
      - CONTAINER_IP=app2

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    networks:
      - app-network
    depends_on:
      - app1
      - app2

networks:
  app-network:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.0.0/16

Host 网络模式 #

Host 网络特点 #

# 使用 host 网络模式
docker run -d --name app-host --network host nginx

# 查看网络配置
docker exec app-host ip addr show

# 直接访问宿主机端口
curl http://localhost:80

Go 应用 Host 网络示例 #

// host-network/main.go
package main

import (
    "fmt"
    "log"
    "net"
    "net/http"
    "os"
)

func main() {
    // 获取本机 IP 地址
    addrs, err := net.InterfaceAddrs()
    if err != nil {
        log.Fatal(err)
    }

    var ips []string
    for _, addr := range addrs {
        if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
            if ipnet.IP.To4() != nil {
                ips = append(ips, ipnet.IP.String())
            }
        }
    }

    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        hostname, _ := os.Hostname()
        fmt.Fprintf(w, "Hostname: %s\nIPs: %v\n", hostname, ips)
    })

    port := os.Getenv("PORT")
    if port == "" {
        port = "8080"
    }

    log.Printf("Server starting on port %s", port)
    log.Printf("Available IPs: %v", ips)
    log.Fatal(http.ListenAndServe(":"+port, nil))
}

容器间通信 #

服务发现机制 #

// service-discovery/main.go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "os"
    "sync"
    "time"
)

type ServiceRegistry struct {
    services map[string][]ServiceInstance
    mutex    sync.RWMutex
}

type ServiceInstance struct {
    ID       string    `json:"id"`
    Name     string    `json:"name"`
    Host     string    `json:"host"`
    Port     int       `json:"port"`
    Health   string    `json:"health"`
    LastSeen time.Time `json:"last_seen"`
}

func NewServiceRegistry() *ServiceRegistry {
    return &ServiceRegistry{
        services: make(map[string][]ServiceInstance),
    }
}

func (sr *ServiceRegistry) Register(instance ServiceInstance) {
    sr.mutex.Lock()
    defer sr.mutex.Unlock()

    instance.LastSeen = time.Now()
    sr.services[instance.Name] = append(sr.services[instance.Name], instance)
}

func (sr *ServiceRegistry) Discover(serviceName string) []ServiceInstance {
    sr.mutex.RLock()
    defer sr.mutex.RUnlock()

    return sr.services[serviceName]
}

func (sr *ServiceRegistry) HealthCheck(ctx context.Context) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            sr.performHealthCheck()
        }
    }
}

func (sr *ServiceRegistry) performHealthCheck() {
    sr.mutex.Lock()
    defer sr.mutex.Unlock()

    for serviceName, instances := range sr.services {
        var healthyInstances []ServiceInstance

        for _, instance := range instances {
            if sr.checkInstanceHealth(instance) {
                instance.Health = "healthy"
                instance.LastSeen = time.Now()
                healthyInstances = append(healthyInstances, instance)
            }
        }

        sr.services[serviceName] = healthyInstances
    }
}

func (sr *ServiceRegistry) checkInstanceHealth(instance ServiceInstance) bool {
    client := &http.Client{Timeout: 5 * time.Second}
    url := fmt.Sprintf("http://%s:%d/health", instance.Host, instance.Port)

    resp, err := client.Get(url)
    if err != nil {
        return false
    }
    defer resp.Body.Close()

    return resp.StatusCode == http.StatusOK
}

func main() {
    registry := NewServiceRegistry()

    // 启动健康检查
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    go registry.HealthCheck(ctx)

    // 服务注册端点
    http.HandleFunc("/register", func(w http.ResponseWriter, r *http.Request) {
        if r.Method != http.MethodPost {
            http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
            return
        }

        var instance ServiceInstance
        if err := json.NewDecoder(r.Body).Decode(&instance); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }

        registry.Register(instance)
        w.WriteHeader(http.StatusCreated)
        json.NewEncoder(w).Encode(map[string]string{"status": "registered"})
    })

    // 服务发现端点
    http.HandleFunc("/discover", func(w http.ResponseWriter, r *http.Request) {
        serviceName := r.URL.Query().Get("service")
        if serviceName == "" {
            http.Error(w, "service parameter required", http.StatusBadRequest)
            return
        }

        instances := registry.Discover(serviceName)
        json.NewEncoder(w).Encode(instances)
    })

    port := os.Getenv("PORT")
    if port == "" {
        port = "8080"
    }

    log.Printf("Service registry starting on port %s", port)
    log.Fatal(http.ListenAndServe(":"+port, nil))
}

负载均衡实现 #

// load-balancer/main.go
package main

import (
    "fmt"
    "log"
    "net/http"
    "net/http/httputil"
    "net/url"
    "os"
    "sync"
    "sync/atomic"
)

type LoadBalancer struct {
    backends []*Backend
    current  uint64
}

type Backend struct {
    URL          *url.URL
    Alive        bool
    ReverseProxy *httputil.ReverseProxy
    mutex        sync.RWMutex
}

func (b *Backend) SetAlive(alive bool) {
    b.mutex.Lock()
    b.Alive = alive
    b.mutex.Unlock()
}

func (b *Backend) IsAlive() bool {
    b.mutex.RLock()
    alive := b.Alive
    b.mutex.RUnlock()
    return alive
}

func NewLoadBalancer() *LoadBalancer {
    return &LoadBalancer{
        backends: make([]*Backend, 0),
    }
}

func (lb *LoadBalancer) AddBackend(backendURL string) {
    url, err := url.Parse(backendURL)
    if err != nil {
        log.Printf("Error parsing backend URL %s: %v", backendURL, err)
        return
    }

    backend := &Backend{
        URL:          url,
        Alive:        true,
        ReverseProxy: httputil.NewSingleHostReverseProxy(url),
    }

    lb.backends = append(lb.backends, backend)
    log.Printf("Added backend: %s", backendURL)
}

func (lb *LoadBalancer) NextBackend() *Backend {
    next := atomic.AddUint64(&lb.current, 1)
    return lb.backends[next%uint64(len(lb.backends))]
}

func (lb *LoadBalancer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    attempts := 0
    maxAttempts := len(lb.backends)

    for attempts < maxAttempts {
        backend := lb.NextBackend()

        if backend.IsAlive() {
            backend.ReverseProxy.ServeHTTP(w, r)
            return
        }

        attempts++
    }

    http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
}

func main() {
    lb := NewLoadBalancer()

    // 从环境变量读取后端服务
    backends := []string{
        os.Getenv("BACKEND_1"),
        os.Getenv("BACKEND_2"),
        os.Getenv("BACKEND_3"),
    }

    for _, backend := range backends {
        if backend != "" {
            lb.AddBackend(backend)
        }
    }

    if len(lb.backends) == 0 {
        // 默认后端
        lb.AddBackend("http://app1:8080")
        lb.AddBackend("http://app2:8080")
    }

    port := os.Getenv("PORT")
    if port == "" {
        port = "8080"
    }

    log.Printf("Load balancer starting on port %s", port)
    log.Printf("Backends: %d", len(lb.backends))

    http.Handle("/", lb)
    log.Fatal(http.ListenAndServe(":"+port, nil))
}

数据持久化方案 #

数据卷(Volumes) #

# 创建命名数据卷
docker volume create myapp-data

# 查看数据卷信息
docker volume inspect myapp-data

# 使用数据卷运行容器
docker run -d --name app -v myapp-data:/app/data myapp:latest

# 数据卷备份
docker run --rm -v myapp-data:/data -v $(pwd):/backup alpine tar czf /backup/backup.tar.gz -C /data .

# 数据卷恢复
docker run --rm -v myapp-data:/data -v $(pwd):/backup alpine tar xzf /backup/backup.tar.gz -C /data

Go 应用数据持久化示例 #

// persistence/main.go
package main

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "path/filepath"
    "sync"
    "time"
)

type DataStore struct {
    dataDir string
    mutex   sync.RWMutex
}

type Record struct {
    ID        string    `json:"id"`
    Data      string    `json:"data"`
    Timestamp time.Time `json:"timestamp"`
}

func NewDataStore(dataDir string) *DataStore {
    // 确保数据目录存在
    if err := os.MkdirAll(dataDir, 0755); err != nil {
        log.Fatal("Failed to create data directory:", err)
    }

    return &DataStore{
        dataDir: dataDir,
    }
}

func (ds *DataStore) Save(record Record) error {
    ds.mutex.Lock()
    defer ds.mutex.Unlock()

    record.Timestamp = time.Now()

    data, err := json.Marshal(record)
    if err != nil {
        return err
    }

    filename := filepath.Join(ds.dataDir, record.ID+".json")
    return ioutil.WriteFile(filename, data, 0644)
}

func (ds *DataStore) Load(id string) (*Record, error) {
    ds.mutex.RLock()
    defer ds.mutex.RUnlock()

    filename := filepath.Join(ds.dataDir, id+".json")
    data, err := ioutil.ReadFile(filename)
    if err != nil {
        return nil, err
    }

    var record Record
    err = json.Unmarshal(data, &record)
    return &record, err
}

func (ds *DataStore) List() ([]Record, error) {
    ds.mutex.RLock()
    defer ds.mutex.RUnlock()

    files, err := ioutil.ReadDir(ds.dataDir)
    if err != nil {
        return nil, err
    }

    var records []Record
    for _, file := range files {
        if filepath.Ext(file.Name()) == ".json" {
            id := file.Name()[:len(file.Name())-5] // 移除 .json 扩展名
            if record, err := ds.Load(id); err == nil {
                records = append(records, *record)
            }
        }
    }

    return records, nil
}

func main() {
    dataDir := os.Getenv("DATA_DIR")
    if dataDir == "" {
        dataDir = "/app/data"
    }

    store := NewDataStore(dataDir)

    // 保存记录
    http.HandleFunc("/save", func(w http.ResponseWriter, r *http.Request) {
        if r.Method != http.MethodPost {
            http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
            return
        }

        var record Record
        if err := json.NewDecoder(r.Body).Decode(&record); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }

        if err := store.Save(record); err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }

        w.WriteHeader(http.StatusCreated)
        json.NewEncoder(w).Encode(map[string]string{"status": "saved"})
    })

    // 加载记录
    http.HandleFunc("/load", func(w http.ResponseWriter, r *http.Request) {
        id := r.URL.Query().Get("id")
        if id == "" {
            http.Error(w, "id parameter required", http.StatusBadRequest)
            return
        }

        record, err := store.Load(id)
        if err != nil {
            http.Error(w, err.Error(), http.StatusNotFound)
            return
        }

        json.NewEncoder(w).Encode(record)
    })

    // 列出所有记录
    http.HandleFunc("/list", func(w http.ResponseWriter, r *http.Request) {
        records, err := store.List()
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }

        json.NewEncoder(w).Encode(records)
    })

    // 健康检查
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        // 检查数据目录是否可访问
        if _, err := os.Stat(dataDir); os.IsNotExist(err) {
            http.Error(w, "Data directory not accessible", http.StatusServiceUnavailable)
            return
        }

        w.WriteHeader(http.StatusOK)
        fmt.Fprint(w, "OK")
    })

    port := os.Getenv("PORT")
    if port == "" {
        port = "8080"
    }

    log.Printf("Server starting on port %s", port)
    log.Printf("Data directory: %s", dataDir)
    log.Fatal(http.ListenAndServe(":"+port, nil))
}

数据库容器化配置 #

# docker-compose.db.yml
version: "3.8"

services:
  app:
    build: .
    ports:
      - "8080:8080"
    environment:
      - DB_HOST=postgres
      - DB_USER=myuser
      - DB_PASSWORD=mypassword
      - DB_NAME=mydb
      - DATA_DIR=/app/data
    volumes:
      - app-data:/app/data
    depends_on:
      - postgres
      - redis
    networks:
      - app-network

  postgres:
    image: postgres:13-alpine
    environment:
      - POSTGRES_USER=myuser
      - POSTGRES_PASSWORD=mypassword
      - POSTGRES_DB=mydb
    volumes:
      - postgres-data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - app-network

  redis:
    image: redis:6-alpine
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data
    networks:
      - app-network

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
      - static-files:/usr/share/nginx/html
    depends_on:
      - app
    networks:
      - app-network

volumes:
  app-data:
    driver: local
  postgres-data:
    driver: local
  redis-data:
    driver: local
  static-files:
    driver: local

networks:
  app-network:
    driver: bridge

绑定挂载配置 #

# 开发环境:挂载源代码目录
docker run -d --name dev-app \
  -v $(pwd):/app \
  -v $(pwd)/data:/app/data \
  -p 8080:8080 \
  myapp:dev

# 生产环境:只挂载数据目录
docker run -d --name prod-app \
  -v /opt/myapp/data:/app/data \
  -p 8080:8080 \
  myapp:latest

网络安全和隔离 #

网络策略配置 #

# docker-compose.secure.yml
version: "3.8"

services:
  frontend:
    build: ./frontend
    networks:
      - frontend-network
    ports:
      - "80:80"

  api:
    build: ./api
    networks:
      - frontend-network
      - backend-network
    environment:
      - DB_HOST=database

  database:
    image: postgres:13-alpine
    networks:
      - backend-network
    environment:
      - POSTGRES_PASSWORD=secretpassword
    volumes:
      - db-data:/var/lib/postgresql/data

networks:
  frontend-network:
    driver: bridge
    internal: false
  backend-network:
    driver: bridge
    internal: true # 内部网络,不能访问外部

volumes:
  db-data:

防火墙规则 #

# 限制容器网络访问
iptables -I DOCKER-USER -s 172.17.0.0/16 -d 192.168.1.0/24 -j DROP

# 允许特定端口
iptables -I DOCKER-USER -p tcp --dport 8080 -j ACCEPT

# 查看 Docker 网络规则
iptables -L DOCKER-USER

监控和故障排除 #

网络诊断工具 #

# 网络连通性测试
docker run --rm --network container:myapp nicolaka/netshoot ping google.com

# 端口扫描
docker run --rm --network container:myapp nicolaka/netshoot nmap -p 1-1000 localhost

# 网络抓包
docker run --rm --network container:myapp nicolaka/netshoot tcpdump -i eth0

# DNS 解析测试
docker run --rm --network mynetwork nicolaka/netshoot nslookup myservice

存储监控 #

# 查看数据卷使用情况
docker system df -v

# 监控容器存储使用
docker stats --format "table {{.Container}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.BlockIO}}"

# 清理未使用的数据卷
docker volume prune

通过掌握容器网络和存储的核心概念和实践技巧,您可以构建更加健壮、可扩展的容器化 Go 应用。这些知识为后续的微服务架构和云原生部署奠定了坚实的基础。