4.8.3 网络代理服务器

4.8.3 网络代理服务器 #

网络代理服务器是网络基础设施的重要组件,本节将构建一个高性能的网络代理服务器,支持 HTTP/HTTPS 代理、负载均衡、连接池管理等功能。

系统架构设计 #

整体架构 #

代理服务器采用模块化设计,包含以下核心组件:

  • 代理核心:处理客户端请求和转发
  • 负载均衡器:后端服务器选择和流量分发
  • 连接池:管理到后端服务器的连接
  • 健康检查:监控后端服务器状态
  • 访问控制:认证和授权管理
  • 监控统计:流量统计和性能监控
// pkg/proxy/types.go
package proxy

import (
    "net"
    "net/http"
    "sync"
    "time"
)

// ProxyType 代理类型
type ProxyType string

const (
    ProxyHTTP    ProxyType = "http"
    ProxyHTTPS   ProxyType = "https"
    ProxySOCKS5  ProxyType = "socks5"
    ProxyTCP     ProxyType = "tcp"
)

// Backend 后端服务器
type Backend struct {
    ID       string    `json:"id"`
    Address  string    `json:"address"`
    Weight   int       `json:"weight"`
    Status   string    `json:"status"` // active, inactive, down
    LastSeen time.Time `json:"last_seen"`

    // 统计信息
    Connections int64     `json:"connections"`
    Requests    int64     `json:"requests"`
    Errors      int64     `json:"errors"`
    ResponseTime time.Duration `json:"response_time"`

    mu sync.RWMutex
}

// ProxyConfig 代理配置
type ProxyConfig struct {
    Type            ProxyType         `json:"type"`
    ListenAddr      string            `json:"listen_addr"`
    Backends        []*Backend        `json:"backends"`
    LoadBalancer    string            `json:"load_balancer"` // round_robin, weighted, least_conn
    HealthCheck     *HealthCheckConfig `json:"health_check"`
    ConnectionPool  *PoolConfig       `json:"connection_pool"`
    AccessControl   *AccessConfig     `json:"access_control"`
    Timeout         time.Duration     `json:"timeout"`
    MaxConnections  int               `json:"max_connections"`
    BufferSize      int               `json:"buffer_size"`
}

// HealthCheckConfig 健康检查配置
type HealthCheckConfig struct {
    Enabled     bool          `json:"enabled"`
    Interval    time.Duration `json:"interval"`
    Timeout     time.Duration `json:"timeout"`
    Path        string        `json:"path"`
    Method      string        `json:"method"`
    StatusCodes []int         `json:"status_codes"`
    MaxRetries  int           `json:"max_retries"`
}

// PoolConfig 连接池配置
type PoolConfig struct {
    MaxIdle        int           `json:"max_idle"`
    MaxActive      int           `json:"max_active"`
    IdleTimeout    time.Duration `json:"idle_timeout"`
    ConnectTimeout time.Duration `json:"connect_timeout"`
    ReadTimeout    time.Duration `json:"read_timeout"`
    WriteTimeout   time.Duration `json:"write_timeout"`
}

// AccessConfig 访问控制配置
type AccessConfig struct {
    Enabled   bool     `json:"enabled"`
    AllowIPs  []string `json:"allow_ips"`
    DenyIPs   []string `json:"deny_ips"`
    RateLimit int      `json:"rate_limit"` // requests per second
}

// ProxyStats 代理统计信息
type ProxyStats struct {
    TotalConnections int64         `json:"total_connections"`
    ActiveConnections int64        `json:"active_connections"`
    TotalRequests    int64         `json:"total_requests"`
    TotalErrors      int64         `json:"total_errors"`
    BytesIn          int64         `json:"bytes_in"`
    BytesOut         int64         `json:"bytes_out"`
    AverageResponseTime time.Duration `json:"average_response_time"`
    Uptime           time.Duration `json:"uptime"`
    StartTime        time.Time     `json:"start_time"`

    mu sync.RWMutex
}

HTTP 代理实现 #

// pkg/proxy/http_proxy.go
package proxy

import (
    "context"
    "fmt"
    "io"
    "log"
    "net"
    "net/http"
    "net/http/httputil"
    "net/url"
    "strings"
    "sync"
    "sync/atomic"
    "time"
)

// HTTPProxy HTTP代理服务器
type HTTPProxy struct {
    config      *ProxyConfig
    backends    []*Backend
    balancer    LoadBalancer
    healthCheck *HealthChecker
    connPool    *ConnectionPool
    accessCtrl  *AccessController
    stats       *ProxyStats
    logger      *log.Logger

    server      *http.Server
    mu          sync.RWMutex
}

// NewHTTPProxy 创建HTTP代理
func NewHTTPProxy(config *ProxyConfig, logger *log.Logger) (*HTTPProxy, error) {
    proxy := &HTTPProxy{
        config:   config,
        backends: config.Backends,
        stats: &ProxyStats{
            StartTime: time.Now(),
        },
        logger: logger,
    }

    // 初始化负载均衡器
    var err error
    proxy.balancer, err = NewLoadBalancer(config.LoadBalancer, config.Backends)
    if err != nil {
        return nil, fmt.Errorf("failed to create load balancer: %w", err)
    }

    // 初始化健康检查
    if config.HealthCheck != nil && config.HealthCheck.Enabled {
        proxy.healthCheck = NewHealthChecker(config.HealthCheck, config.Backends, logger)
    }

    // 初始化连接池
    if config.ConnectionPool != nil {
        proxy.connPool = NewConnectionPool(config.ConnectionPool)
    }

    // 初始化访问控制
    if config.AccessControl != nil && config.AccessControl.Enabled {
        proxy.accessCtrl = NewAccessController(config.AccessControl)
    }

    return proxy, nil
}

// Start 启动代理服务器
func (p *HTTPProxy) Start(ctx context.Context) error {
    // 启动健康检查
    if p.healthCheck != nil {
        go p.healthCheck.Start(ctx)
    }

    // 创建HTTP服务器
    mux := http.NewServeMux()
    mux.HandleFunc("/", p.handleRequest)
    mux.HandleFunc("/stats", p.handleStats)
    mux.HandleFunc("/health", p.handleHealth)

    p.server = &http.Server{
        Addr:    p.config.ListenAddr,
        Handler: mux,
        ConnState: p.onConnStateChange,
    }

    p.logger.Printf("HTTP proxy starting on %s", p.config.ListenAddr)

    go func() {
        if err := p.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            p.logger.Printf("HTTP proxy error: %v", err)
        }
    }()

    <-ctx.Done()
    return p.server.Shutdown(context.Background())
}

// handleRequest 处理代理请求
func (p *HTTPProxy) handleRequest(w http.ResponseWriter, r *http.Request) {
    startTime := time.Now()

    // 访问控制检查
    if p.accessCtrl != nil {
        if !p.accessCtrl.Allow(r) {
            http.Error(w, "Access denied", http.StatusForbidden)
            return
        }
    }

    // 更新统计信息
    atomic.AddInt64(&p.stats.TotalRequests, 1)

    // 选择后端服务器
    backend := p.balancer.Select()
    if backend == nil {
        http.Error(w, "No available backend", http.StatusServiceUnavailable)
        atomic.AddInt64(&p.stats.TotalErrors, 1)
        return
    }

    // 处理CONNECT方法(HTTPS代理)
    if r.Method == http.MethodConnect {
        p.handleConnect(w, r, backend)
        return
    }

    // 处理HTTP代理
    p.handleHTTP(w, r, backend, startTime)
}

// handleHTTP 处理HTTP请求
func (p *HTTPProxy) handleHTTP(w http.ResponseWriter, r *http.Request, backend *Backend, startTime time.Time) {
    // 创建反向代理
    target, err := url.Parse(fmt.Sprintf("http://%s", backend.Address))
    if err != nil {
        http.Error(w, "Invalid backend URL", http.StatusInternalServerError)
        atomic.AddInt64(&p.stats.TotalErrors, 1)
        return
    }

    proxy := httputil.NewSingleHostReverseProxy(target)

    // 自定义传输层
    proxy.Transport = &http.Transport{
        DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
            if p.connPool != nil {
                return p.connPool.Get(backend.Address)
            }
            return net.Dial(network, addr)
        },
        MaxIdleConns:        100,
        IdleConnTimeout:     90 * time.Second,
        TLSHandshakeTimeout: 10 * time.Second,
    }

    // 修改请求
    originalDirector := proxy.Director
    proxy.Director = func(req *http.Request) {
        originalDirector(req)
        req.Header.Set("X-Forwarded-For", r.RemoteAddr)
        req.Header.Set("X-Forwarded-Proto", "http")
        req.Header.Set("X-Real-IP", strings.Split(r.RemoteAddr, ":")[0])
    }

    // 修改响应
    proxy.ModifyResponse = func(resp *http.Response) error {
        resp.Header.Set("X-Proxy-Server", "go-proxy")
        return nil
    }

    // 错误处理
    proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
        p.logger.Printf("Proxy error: %v", err)
        atomic.AddInt64(&p.stats.TotalErrors, 1)
        backend.mu.Lock()
        backend.Errors++
        backend.mu.Unlock()
        http.Error(w, "Bad Gateway", http.StatusBadGateway)
    }

    // 执行代理
    proxy.ServeHTTP(w, r)

    // 更新统计信息
    duration := time.Since(startTime)
    backend.mu.Lock()
    backend.Requests++
    backend.ResponseTime = duration
    backend.mu.Unlock()
}

// handleConnect 处理CONNECT请求(HTTPS代理)
func (p *HTTPProxy) handleConnect(w http.ResponseWriter, r *http.Request, backend *Backend) {
    // 连接到目标服务器
    destConn, err := net.DialTimeout("tcp", r.Host, p.config.Timeout)
    if err != nil {
        http.Error(w, "Cannot connect to destination", http.StatusServiceUnavailable)
        atomic.AddInt64(&p.stats.TotalErrors, 1)
        return
    }
    defer destConn.Close()

    // 劫持客户端连接
    hijacker, ok := w.(http.Hijacker)
    if !ok {
        http.Error(w, "Hijacking not supported", http.StatusInternalServerError)
        return
    }

    clientConn, _, err := hijacker.Hijack()
    if err != nil {
        http.Error(w, "Cannot hijack connection", http.StatusInternalServerError)
        return
    }
    defer clientConn.Close()

    // 发送200 Connection established响应
    _, err = clientConn.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n"))
    if err != nil {
        p.logger.Printf("Failed to write CONNECT response: %v", err)
        return
    }

    // 开始数据转发
    p.relay(clientConn, destConn)
}

// relay 数据转发
func (p *HTTPProxy) relay(client, server net.Conn) {
    var wg sync.WaitGroup
    wg.Add(2)

    // 客户端到服务器
    go func() {
        defer wg.Done()
        written, _ := io.Copy(server, client)
        atomic.AddInt64(&p.stats.BytesOut, written)
    }()

    // 服务器到客户端
    go func() {
        defer wg.Done()
        written, _ := io.Copy(client, server)
        atomic.AddInt64(&p.stats.BytesIn, written)
    }()

    wg.Wait()
}

// onConnStateChange 连接状态变化回调
func (p *HTTPProxy) onConnStateChange(conn net.Conn, state http.ConnState) {
    switch state {
    case http.StateNew:
        atomic.AddInt64(&p.stats.TotalConnections, 1)
        atomic.AddInt64(&p.stats.ActiveConnections, 1)
    case http.StateClosed:
        atomic.AddInt64(&p.stats.ActiveConnections, -1)
    }
}

// handleStats 处理统计信息请求
func (p *HTTPProxy) handleStats(w http.ResponseWriter, r *http.Request) {
    p.stats.mu.RLock()
    stats := *p.stats
    stats.Uptime = time.Since(p.stats.StartTime)
    p.stats.mu.RUnlock()

    w.Header().Set("Content-Type", "application/json")
    // 返回JSON格式的统计信息
}

// handleHealth 处理健康检查请求
func (p *HTTPProxy) handleHealth(w http.ResponseWriter, r *http.Request) {
    activeBackends := 0
    for _, backend := range p.backends {
        if backend.Status == "active" {
            activeBackends++
        }
    }

    if activeBackends > 0 {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("OK"))
    } else {
        w.WriteHeader(http.StatusServiceUnavailable)
        w.Write([]byte("No active backends"))
    }
}

负载均衡器实现 #

// pkg/proxy/load_balancer.go
package proxy

import (
    "fmt"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"
)

// LoadBalancer 负载均衡器接口
type LoadBalancer interface {
    Select() *Backend
    UpdateBackends(backends []*Backend)
}

// RoundRobinBalancer 轮询负载均衡器
type RoundRobinBalancer struct {
    backends []*Backend
    current  uint64
    mu       sync.RWMutex
}

// NewRoundRobinBalancer 创建轮询负载均衡器
func NewRoundRobinBalancer(backends []*Backend) *RoundRobinBalancer {
    return &RoundRobinBalancer{
        backends: backends,
    }
}

// Select 选择后端服务器
func (rb *RoundRobinBalancer) Select() *Backend {
    rb.mu.RLock()
    defer rb.mu.RUnlock()

    if len(rb.backends) == 0 {
        return nil
    }

    // 过滤活跃的后端
    activeBackends := make([]*Backend, 0)
    for _, backend := range rb.backends {
        if backend.Status == "active" {
            activeBackends = append(activeBackends, backend)
        }
    }

    if len(activeBackends) == 0 {
        return nil
    }

    // 轮询选择
    index := atomic.AddUint64(&rb.current, 1) % uint64(len(activeBackends))
    return activeBackends[index]
}

// UpdateBackends 更新后端列表
func (rb *RoundRobinBalancer) UpdateBackends(backends []*Backend) {
    rb.mu.Lock()
    defer rb.mu.Unlock()
    rb.backends = backends
}

// WeightedBalancer 加权负载均衡器
type WeightedBalancer struct {
    backends []*Backend
    mu       sync.RWMutex
}

// NewWeightedBalancer 创建加权负载均衡器
func NewWeightedBalancer(backends []*Backend) *WeightedBalancer {
    return &WeightedBalancer{
        backends: backends,
    }
}

// Select 根据权重选择后端服务器
func (wb *WeightedBalancer) Select() *Backend {
    wb.mu.RLock()
    defer wb.mu.RUnlock()

    if len(wb.backends) == 0 {
        return nil
    }

    // 计算总权重
    totalWeight := 0
    activeBackends := make([]*Backend, 0)

    for _, backend := range wb.backends {
        if backend.Status == "active" {
            activeBackends = append(activeBackends, backend)
            totalWeight += backend.Weight
        }
    }

    if len(activeBackends) == 0 || totalWeight == 0 {
        return nil
    }

    // 随机选择
    rand.Seed(time.Now().UnixNano())
    random := rand.Intn(totalWeight)

    for _, backend := range activeBackends {
        random -= backend.Weight
        if random < 0 {
            return backend
        }
    }

    return activeBackends[0]
}

// UpdateBackends 更新后端列表
func (wb *WeightedBalancer) UpdateBackends(backends []*Backend) {
    wb.mu.Lock()
    defer wb.mu.Unlock()
    wb.backends = backends
}

// LeastConnBalancer 最少连接负载均衡器
type LeastConnBalancer struct {
    backends []*Backend
    mu       sync.RWMutex
}

// NewLeastConnBalancer 创建最少连接负载均衡器
func NewLeastConnBalancer(backends []*Backend) *LeastConnBalancer {
    return &LeastConnBalancer{
        backends: backends,
    }
}

// Select 选择连接数最少的后端服务器
func (lb *LeastConnBalancer) Select() *Backend {
    lb.mu.RLock()
    defer lb.mu.RUnlock()

    if len(lb.backends) == 0 {
        return nil
    }

    var selected *Backend
    minConnections := int64(-1)

    for _, backend := range lb.backends {
        if backend.Status != "active" {
            continue
        }

        backend.mu.RLock()
        connections := backend.Connections
        backend.mu.RUnlock()

        if minConnections == -1 || connections < minConnections {
            minConnections = connections
            selected = backend
        }
    }

    return selected
}

// UpdateBackends 更新后端列表
func (lb *LeastConnBalancer) UpdateBackends(backends []*Backend) {
    lb.mu.Lock()
    defer lb.mu.Unlock()
    lb.backends = backends
}

// NewLoadBalancer 创建负载均衡器
func NewLoadBalancer(algorithm string, backends []*Backend) (LoadBalancer, error) {
    switch algorithm {
    case "round_robin":
        return NewRoundRobinBalancer(backends), nil
    case "weighted":
        return NewWeightedBalancer(backends), nil
    case "least_conn":
        return NewLeastConnBalancer(backends), nil
    default:
        return nil, fmt.Errorf("unsupported load balancer algorithm: %s", algorithm)
    }
}

健康检查实现 #

// pkg/proxy/health_checker.go
package proxy

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"
)

// HealthChecker 健康检查器
type HealthChecker struct {
    config   *HealthCheckConfig
    backends []*Backend
    logger   *log.Logger
    client   *http.Client
    mu       sync.RWMutex
}

// NewHealthChecker 创建健康检查器
func NewHealthChecker(config *HealthCheckConfig, backends []*Backend, logger *log.Logger) *HealthChecker {
    return &HealthChecker{
        config:   config,
        backends: backends,
        logger:   logger,
        client: &http.Client{
            Timeout: config.Timeout,
        },
    }
}

// Start 启动健康检查
func (hc *HealthChecker) Start(ctx context.Context) {
    ticker := time.NewTicker(hc.config.Interval)
    defer ticker.Stop()

    hc.logger.Printf("Health checker started with interval %v", hc.config.Interval)

    // 初始检查
    hc.checkAllBackends()

    for {
        select {
        case <-ticker.C:
            hc.checkAllBackends()
        case <-ctx.Done():
            hc.logger.Println("Health checker stopped")
            return
        }
    }
}

// checkAllBackends 检查所有后端服务器
func (hc *HealthChecker) checkAllBackends() {
    hc.mu.RLock()
    backends := make([]*Backend, len(hc.backends))
    copy(backends, hc.backends)
    hc.mu.RUnlock()

    var wg sync.WaitGroup
    for _, backend := range backends {
        wg.Add(1)
        go func(b *Backend) {
            defer wg.Done()
            hc.checkBackend(b)
        }(backend)
    }
    wg.Wait()
}

// checkBackend 检查单个后端服务器
func (hc *HealthChecker) checkBackend(backend *Backend) {
    url := fmt.Sprintf("http://%s%s", backend.Address, hc.config.Path)

    req, err := http.NewRequest(hc.config.Method, url, nil)
    if err != nil {
        hc.markBackendDown(backend, fmt.Sprintf("Failed to create request: %v", err))
        return
    }

    resp, err := hc.client.Do(req)
    if err != nil {
        hc.markBackendDown(backend, fmt.Sprintf("Request failed: %v", err))
        return
    }
    defer resp.Body.Close()

    // 检查状态码
    healthy := false
    for _, code := range hc.config.StatusCodes {
        if resp.StatusCode == code {
            healthy = true
            break
        }
    }

    if healthy {
        hc.markBackendUp(backend)
    } else {
        hc.markBackendDown(backend, fmt.Sprintf("Unhealthy status code: %d", resp.StatusCode))
    }
}

// markBackendUp 标记后端服务器为健康
func (hc *HealthChecker) markBackendUp(backend *Backend) {
    backend.mu.Lock()
    defer backend.mu.Unlock()

    if backend.Status != "active" {
        backend.Status = "active"
        backend.LastSeen = time.Now()
        hc.logger.Printf("Backend %s is now healthy", backend.Address)
    }
}

// markBackendDown 标记后端服务器为不健康
func (hc *HealthChecker) markBackendDown(backend *Backend, reason string) {
    backend.mu.Lock()
    defer backend.mu.Unlock()

    if backend.Status == "active" {
        backend.Status = "down"
        hc.logger.Printf("Backend %s is now unhealthy: %s", backend.Address, reason)
    }
}

// UpdateBackends 更新后端列表
func (hc *HealthChecker) UpdateBackends(backends []*Backend) {
    hc.mu.Lock()
    defer hc.mu.Unlock()
    hc.backends = backends
}

连接池实现 #

// pkg/proxy/connection_pool.go
package proxy

import (
    "fmt"
    "net"
    "sync"
    "time"
)

// ConnectionPool 连接池
type ConnectionPool struct {
    config *PoolConfig
    pools  map[string]*Pool
    mu     sync.RWMutex
}

// Pool 单个地址的连接池
type Pool struct {
    address     string
    config      *PoolConfig
    connections chan net.Conn
    active      int
    mu          sync.Mutex
}

// NewConnectionPool 创建连接池
func NewConnectionPool(config *PoolConfig) *ConnectionPool {
    return &ConnectionPool{
        config: config,
        pools:  make(map[string]*Pool),
    }
}

// Get 获取连接
func (cp *ConnectionPool) Get(address string) (net.Conn, error) {
    cp.mu.RLock()
    pool, exists := cp.pools[address]
    cp.mu.RUnlock()

    if !exists {
        cp.mu.Lock()
        // 双重检查
        if pool, exists = cp.pools[address]; !exists {
            pool = &Pool{
                address:     address,
                config:      cp.config,
                connections: make(chan net.Conn, cp.config.MaxIdle),
            }
            cp.pools[address] = pool
        }
        cp.mu.Unlock()
    }

    return pool.Get()
}

// Get 从池中获取连接
func (p *Pool) Get() (net.Conn, error) {
    // 尝试从池中获取空闲连接
    select {
    case conn := <-p.connections:
        // 检查连接是否仍然有效
        if p.isConnValid(conn) {
            return conn, nil
        }
        conn.Close()
    default:
    }

    // 检查活跃连接数限制
    p.mu.Lock()
    if p.active >= p.config.MaxActive {
        p.mu.Unlock()
        return nil, fmt.Errorf("connection pool exhausted")
    }
    p.active++
    p.mu.Unlock()

    // 创建新连接
    conn, err := net.DialTimeout("tcp", p.address, p.config.ConnectTimeout)
    if err != nil {
        p.mu.Lock()
        p.active--
        p.mu.Unlock()
        return nil, err
    }

    return &pooledConn{
        Conn: conn,
        pool: p,
    }, nil
}

// Put 将连接放回池中
func (p *Pool) Put(conn net.Conn) {
    select {
    case p.connections <- conn:
        // 成功放回池中
    default:
        // 池已满,关闭连接
        conn.Close()
        p.mu.Lock()
        p.active--
        p.mu.Unlock()
    }
}

// isConnValid 检查连接是否有效
func (p *Pool) isConnValid(conn net.Conn) bool {
    // 设置读取超时
    conn.SetReadDeadline(time.Now().Add(time.Millisecond))

    // 尝试读取一个字节
    buf := make([]byte, 1)
    _, err := conn.Read(buf)

    // 重置读取超时
    conn.SetReadDeadline(time.Time{})

    // 如果是超时错误,连接可能仍然有效
    if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
        return true
    }

    return err == nil
}

// pooledConn 池化连接包装器
type pooledConn struct {
    net.Conn
    pool *Pool
}

// Close 关闭连接(实际上是放回池中)
func (pc *pooledConn) Close() error {
    pc.pool.Put(pc.Conn)
    return nil
}

访问控制实现 #

// pkg/proxy/access_control.go
package proxy

import (
    "net"
    "net/http"
    "strings"
    "sync"
    "time"
)

// AccessController 访问控制器
type AccessController struct {
    config     *AccessConfig
    allowIPs   map[string]bool
    denyIPs    map[string]bool
    rateLimiter *RateLimiter
    mu         sync.RWMutex
}

// NewAccessController 创建访问控制器
func NewAccessController(config *AccessConfig) *AccessController {
    ac := &AccessController{
        config:   config,
        allowIPs: make(map[string]bool),
        denyIPs:  make(map[string]bool),
    }

    // 解析允许的IP
    for _, ip := range config.AllowIPs {
        ac.allowIPs[ip] = true
    }

    // 解析拒绝的IP
    for _, ip := range config.DenyIPs {
        ac.denyIPs[ip] = true
    }

    // 初始化限流器
    if config.RateLimit > 0 {
        ac.rateLimiter = NewRateLimiter(config.RateLimit)
    }

    return ac
}

// Allow 检查是否允许访问
func (ac *AccessController) Allow(r *http.Request) bool {
    clientIP := ac.getClientIP(r)

    // 检查IP黑名单
    if ac.denyIPs[clientIP] {
        return false
    }

    // 检查IP白名单
    if len(ac.allowIPs) > 0 && !ac.allowIPs[clientIP] {
        return false
    }

    // 检查限流
    if ac.rateLimiter != nil && !ac.rateLimiter.Allow(clientIP) {
        return false
    }

    return true
}

// getClientIP 获取客户端IP
func (ac *AccessController) getClientIP(r *http.Request) string {
    // 检查X-Forwarded-For头
    if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
        ips := strings.Split(xff, ",")
        return strings.TrimSpace(ips[0])
    }

    // 检查X-Real-IP头
    if xri := r.Header.Get("X-Real-IP"); xri != "" {
        return xri
    }

    // 使用RemoteAddr
    host, _, _ := net.SplitHostPort(r.RemoteAddr)
    return host
}

// RateLimiter 限流器
type RateLimiter struct {
    rate    int
    clients map[string]*ClientLimiter
    mu      sync.RWMutex
}

// ClientLimiter 客户端限流器
type ClientLimiter struct {
    tokens   int
    lastSeen time.Time
}

// NewRateLimiter 创建限流器
func NewRateLimiter(rate int) *RateLimiter {
    rl := &RateLimiter{
        rate:    rate,
        clients: make(map[string]*ClientLimiter),
    }

    // 启动清理协程
    go rl.cleanup()

    return rl
}

// Allow 检查是否允许请求
func (rl *RateLimiter) Allow(clientIP string) bool {
    rl.mu.Lock()
    defer rl.mu.Unlock()

    now := time.Now()
    client, exists := rl.clients[clientIP]

    if !exists {
        client = &ClientLimiter{
            tokens:   rl.rate - 1,
            lastSeen: now,
        }
        rl.clients[clientIP] = client
        return true
    }

    // 令牌桶算法
    elapsed := now.Sub(client.lastSeen)
    tokensToAdd := int(elapsed.Seconds()) * rl.rate
    client.tokens += tokensToAdd

    if client.tokens > rl.rate {
        client.tokens = rl.rate
    }

    client.lastSeen = now

    if client.tokens > 0 {
        client.tokens--
        return true
    }

    return false
}

// cleanup 清理过期的客户端记录
func (rl *RateLimiter) cleanup() {
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()

    for range ticker.C {
        rl.mu.Lock()
        now := time.Now()
        for ip, client := range rl.clients {
            if now.Sub(client.lastSeen) > 10*time.Minute {
                delete(rl.clients, ip)
            }
        }
        rl.mu.Unlock()
    }
}

主程序实现 #

// cmd/proxy/main.go
package main

import (
    "context"
    "encoding/json"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "your-project/pkg/proxy"
)

func main() {
    // 读取配置文件
    configFile := "config.json"
    if len(os.Args) > 1 {
        configFile = os.Args[1]
    }

    config, err := loadConfig(configFile)
    if err != nil {
        log.Fatalf("Failed to load config: %v", err)
    }

    // 创建日志器
    logger := log.New(os.Stdout, "[PROXY] ", log.LstdFlags)

    // 创建代理服务器
    var proxyServer interface {
        Start(context.Context) error
    }

    switch config.Type {
    case proxy.ProxyHTTP:
        proxyServer, err = proxy.NewHTTPProxy(config, logger)
    case proxy.ProxyHTTPS:
        // 实现HTTPS代理
    case proxy.ProxySOCKS5:
        // 实现SOCKS5代理
    default:
        log.Fatalf("Unsupported proxy type: %s", config.Type)
    }

    if err != nil {
        log.Fatalf("Failed to create proxy server: %v", err)
    }

    // 创建上下文
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 启动代理服务器
    go func() {
        if err := proxyServer.Start(ctx); err != nil {
            logger.Printf("Proxy server error: %v", err)
        }
    }()

    logger.Printf("Proxy server started on %s", config.ListenAddr)

    // 等待信号
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    <-sigChan
    logger.Println("Shutting down proxy server...")
    cancel()
}

// loadConfig 加载配置文件
func loadConfig(filename string) (*proxy.ProxyConfig, error) {
    file, err := os.Open(filename)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    var config proxy.ProxyConfig
    decoder := json.NewDecoder(file)
    if err := decoder.Decode(&config); err != nil {
        return nil, err
    }

    return &config, nil
}

配置文件示例 #

{
  "type": "http",
  "listen_addr": ":8080",
  "backends": [
    {
      "id": "backend1",
      "address": "127.0.0.1:8081",
      "weight": 1,
      "status": "active"
    },
    {
      "id": "backend2",
      "address": "127.0.0.1:8082",
      "weight": 2,
      "status": "active"
    }
  ],
  "load_balancer": "weighted",
  "health_check": {
    "enabled": true,
    "interval": "30s",
    "timeout": "5s",
    "path": "/health",
    "method": "GET",
    "status_codes": [200],
    "max_retries": 3
  },
  "connection_pool": {
    "max_idle": 10,
    "max_active": 100,
    "idle_timeout": "300s",
    "connect_timeout": "10s",
    "read_timeout": "30s",
    "write_timeout": "30s"
  },
  "access_control": {
    "enabled": true,
    "allow_ips": ["127.0.0.1", "192.168.1.0/24"],
    "deny_ips": [],
    "rate_limit": 100
  },
  "timeout": "30s",
  "max_connections": 1000,
  "buffer_size": 4096
}

功能扩展 #

SSL/TLS 支持 #

// 添加HTTPS代理支持
func (p *HTTPProxy) StartTLS(ctx context.Context, certFile, keyFile string) error {
    // 配置TLS
    p.server.TLSConfig = &tls.Config{
        // TLS配置
    }

    go func() {
        if err := p.server.ListenAndServeTLS(certFile, keyFile); err != nil && err != http.ErrServerClosed {
            p.logger.Printf("HTTPS proxy error: %v", err)
        }
    }()

    <-ctx.Done()
    return p.server.Shutdown(context.Background())
}

监控和日志 #

// 添加Prometheus监控
import "github.com/prometheus/client_golang/prometheus"

var (
    requestsTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "proxy_requests_total",
            Help: "Total number of proxy requests",
        },
        []string{"backend", "status"},
    )

    requestDuration = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name: "proxy_request_duration_seconds",
            Help: "Request duration in seconds",
        },
        []string{"backend"},
    )
)

通过本节的学习,我们实现了一个功能完整的网络代理服务器,包括 HTTP/HTTPS 代理、负载均衡、健康检查、连接池管理等核心功能。这个系统展示了如何运用 Go 语言的网络编程能力构建高性能的网络基础设施组件。