4.8.3 网络代理服务器 #
网络代理服务器是网络基础设施的重要组件,本节将构建一个高性能的网络代理服务器,支持 HTTP/HTTPS 代理、负载均衡、连接池管理等功能。
系统架构设计 #
整体架构 #
代理服务器采用模块化设计,包含以下核心组件:
- 代理核心:处理客户端请求和转发
- 负载均衡器:后端服务器选择和流量分发
- 连接池:管理到后端服务器的连接
- 健康检查:监控后端服务器状态
- 访问控制:认证和授权管理
- 监控统计:流量统计和性能监控
// pkg/proxy/types.go
package proxy
import (
"net"
"net/http"
"sync"
"time"
)
// ProxyType 代理类型
type ProxyType string
const (
ProxyHTTP ProxyType = "http"
ProxyHTTPS ProxyType = "https"
ProxySOCKS5 ProxyType = "socks5"
ProxyTCP ProxyType = "tcp"
)
// Backend 后端服务器
type Backend struct {
ID string `json:"id"`
Address string `json:"address"`
Weight int `json:"weight"`
Status string `json:"status"` // active, inactive, down
LastSeen time.Time `json:"last_seen"`
// 统计信息
Connections int64 `json:"connections"`
Requests int64 `json:"requests"`
Errors int64 `json:"errors"`
ResponseTime time.Duration `json:"response_time"`
mu sync.RWMutex
}
// ProxyConfig 代理配置
type ProxyConfig struct {
Type ProxyType `json:"type"`
ListenAddr string `json:"listen_addr"`
Backends []*Backend `json:"backends"`
LoadBalancer string `json:"load_balancer"` // round_robin, weighted, least_conn
HealthCheck *HealthCheckConfig `json:"health_check"`
ConnectionPool *PoolConfig `json:"connection_pool"`
AccessControl *AccessConfig `json:"access_control"`
Timeout time.Duration `json:"timeout"`
MaxConnections int `json:"max_connections"`
BufferSize int `json:"buffer_size"`
}
// HealthCheckConfig 健康检查配置
type HealthCheckConfig struct {
Enabled bool `json:"enabled"`
Interval time.Duration `json:"interval"`
Timeout time.Duration `json:"timeout"`
Path string `json:"path"`
Method string `json:"method"`
StatusCodes []int `json:"status_codes"`
MaxRetries int `json:"max_retries"`
}
// PoolConfig 连接池配置
type PoolConfig struct {
MaxIdle int `json:"max_idle"`
MaxActive int `json:"max_active"`
IdleTimeout time.Duration `json:"idle_timeout"`
ConnectTimeout time.Duration `json:"connect_timeout"`
ReadTimeout time.Duration `json:"read_timeout"`
WriteTimeout time.Duration `json:"write_timeout"`
}
// AccessConfig 访问控制配置
type AccessConfig struct {
Enabled bool `json:"enabled"`
AllowIPs []string `json:"allow_ips"`
DenyIPs []string `json:"deny_ips"`
RateLimit int `json:"rate_limit"` // requests per second
}
// ProxyStats 代理统计信息
type ProxyStats struct {
TotalConnections int64 `json:"total_connections"`
ActiveConnections int64 `json:"active_connections"`
TotalRequests int64 `json:"total_requests"`
TotalErrors int64 `json:"total_errors"`
BytesIn int64 `json:"bytes_in"`
BytesOut int64 `json:"bytes_out"`
AverageResponseTime time.Duration `json:"average_response_time"`
Uptime time.Duration `json:"uptime"`
StartTime time.Time `json:"start_time"`
mu sync.RWMutex
}
HTTP 代理实现 #
// pkg/proxy/http_proxy.go
package proxy
import (
"context"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
)
// HTTPProxy HTTP代理服务器
type HTTPProxy struct {
config *ProxyConfig
backends []*Backend
balancer LoadBalancer
healthCheck *HealthChecker
connPool *ConnectionPool
accessCtrl *AccessController
stats *ProxyStats
logger *log.Logger
server *http.Server
mu sync.RWMutex
}
// NewHTTPProxy 创建HTTP代理
func NewHTTPProxy(config *ProxyConfig, logger *log.Logger) (*HTTPProxy, error) {
proxy := &HTTPProxy{
config: config,
backends: config.Backends,
stats: &ProxyStats{
StartTime: time.Now(),
},
logger: logger,
}
// 初始化负载均衡器
var err error
proxy.balancer, err = NewLoadBalancer(config.LoadBalancer, config.Backends)
if err != nil {
return nil, fmt.Errorf("failed to create load balancer: %w", err)
}
// 初始化健康检查
if config.HealthCheck != nil && config.HealthCheck.Enabled {
proxy.healthCheck = NewHealthChecker(config.HealthCheck, config.Backends, logger)
}
// 初始化连接池
if config.ConnectionPool != nil {
proxy.connPool = NewConnectionPool(config.ConnectionPool)
}
// 初始化访问控制
if config.AccessControl != nil && config.AccessControl.Enabled {
proxy.accessCtrl = NewAccessController(config.AccessControl)
}
return proxy, nil
}
// Start 启动代理服务器
func (p *HTTPProxy) Start(ctx context.Context) error {
// 启动健康检查
if p.healthCheck != nil {
go p.healthCheck.Start(ctx)
}
// 创建HTTP服务器
mux := http.NewServeMux()
mux.HandleFunc("/", p.handleRequest)
mux.HandleFunc("/stats", p.handleStats)
mux.HandleFunc("/health", p.handleHealth)
p.server = &http.Server{
Addr: p.config.ListenAddr,
Handler: mux,
ConnState: p.onConnStateChange,
}
p.logger.Printf("HTTP proxy starting on %s", p.config.ListenAddr)
go func() {
if err := p.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
p.logger.Printf("HTTP proxy error: %v", err)
}
}()
<-ctx.Done()
return p.server.Shutdown(context.Background())
}
// handleRequest 处理代理请求
func (p *HTTPProxy) handleRequest(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
// 访问控制检查
if p.accessCtrl != nil {
if !p.accessCtrl.Allow(r) {
http.Error(w, "Access denied", http.StatusForbidden)
return
}
}
// 更新统计信息
atomic.AddInt64(&p.stats.TotalRequests, 1)
// 选择后端服务器
backend := p.balancer.Select()
if backend == nil {
http.Error(w, "No available backend", http.StatusServiceUnavailable)
atomic.AddInt64(&p.stats.TotalErrors, 1)
return
}
// 处理CONNECT方法(HTTPS代理)
if r.Method == http.MethodConnect {
p.handleConnect(w, r, backend)
return
}
// 处理HTTP代理
p.handleHTTP(w, r, backend, startTime)
}
// handleHTTP 处理HTTP请求
func (p *HTTPProxy) handleHTTP(w http.ResponseWriter, r *http.Request, backend *Backend, startTime time.Time) {
// 创建反向代理
target, err := url.Parse(fmt.Sprintf("http://%s", backend.Address))
if err != nil {
http.Error(w, "Invalid backend URL", http.StatusInternalServerError)
atomic.AddInt64(&p.stats.TotalErrors, 1)
return
}
proxy := httputil.NewSingleHostReverseProxy(target)
// 自定义传输层
proxy.Transport = &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
if p.connPool != nil {
return p.connPool.Get(backend.Address)
}
return net.Dial(network, addr)
},
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
}
// 修改请求
originalDirector := proxy.Director
proxy.Director = func(req *http.Request) {
originalDirector(req)
req.Header.Set("X-Forwarded-For", r.RemoteAddr)
req.Header.Set("X-Forwarded-Proto", "http")
req.Header.Set("X-Real-IP", strings.Split(r.RemoteAddr, ":")[0])
}
// 修改响应
proxy.ModifyResponse = func(resp *http.Response) error {
resp.Header.Set("X-Proxy-Server", "go-proxy")
return nil
}
// 错误处理
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
p.logger.Printf("Proxy error: %v", err)
atomic.AddInt64(&p.stats.TotalErrors, 1)
backend.mu.Lock()
backend.Errors++
backend.mu.Unlock()
http.Error(w, "Bad Gateway", http.StatusBadGateway)
}
// 执行代理
proxy.ServeHTTP(w, r)
// 更新统计信息
duration := time.Since(startTime)
backend.mu.Lock()
backend.Requests++
backend.ResponseTime = duration
backend.mu.Unlock()
}
// handleConnect 处理CONNECT请求(HTTPS代理)
func (p *HTTPProxy) handleConnect(w http.ResponseWriter, r *http.Request, backend *Backend) {
// 连接到目标服务器
destConn, err := net.DialTimeout("tcp", r.Host, p.config.Timeout)
if err != nil {
http.Error(w, "Cannot connect to destination", http.StatusServiceUnavailable)
atomic.AddInt64(&p.stats.TotalErrors, 1)
return
}
defer destConn.Close()
// 劫持客户端连接
hijacker, ok := w.(http.Hijacker)
if !ok {
http.Error(w, "Hijacking not supported", http.StatusInternalServerError)
return
}
clientConn, _, err := hijacker.Hijack()
if err != nil {
http.Error(w, "Cannot hijack connection", http.StatusInternalServerError)
return
}
defer clientConn.Close()
// 发送200 Connection established响应
_, err = clientConn.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n"))
if err != nil {
p.logger.Printf("Failed to write CONNECT response: %v", err)
return
}
// 开始数据转发
p.relay(clientConn, destConn)
}
// relay 数据转发
func (p *HTTPProxy) relay(client, server net.Conn) {
var wg sync.WaitGroup
wg.Add(2)
// 客户端到服务器
go func() {
defer wg.Done()
written, _ := io.Copy(server, client)
atomic.AddInt64(&p.stats.BytesOut, written)
}()
// 服务器到客户端
go func() {
defer wg.Done()
written, _ := io.Copy(client, server)
atomic.AddInt64(&p.stats.BytesIn, written)
}()
wg.Wait()
}
// onConnStateChange 连接状态变化回调
func (p *HTTPProxy) onConnStateChange(conn net.Conn, state http.ConnState) {
switch state {
case http.StateNew:
atomic.AddInt64(&p.stats.TotalConnections, 1)
atomic.AddInt64(&p.stats.ActiveConnections, 1)
case http.StateClosed:
atomic.AddInt64(&p.stats.ActiveConnections, -1)
}
}
// handleStats 处理统计信息请求
func (p *HTTPProxy) handleStats(w http.ResponseWriter, r *http.Request) {
p.stats.mu.RLock()
stats := *p.stats
stats.Uptime = time.Since(p.stats.StartTime)
p.stats.mu.RUnlock()
w.Header().Set("Content-Type", "application/json")
// 返回JSON格式的统计信息
}
// handleHealth 处理健康检查请求
func (p *HTTPProxy) handleHealth(w http.ResponseWriter, r *http.Request) {
activeBackends := 0
for _, backend := range p.backends {
if backend.Status == "active" {
activeBackends++
}
}
if activeBackends > 0 {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
} else {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("No active backends"))
}
}
负载均衡器实现 #
// pkg/proxy/load_balancer.go
package proxy
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// LoadBalancer 负载均衡器接口
type LoadBalancer interface {
Select() *Backend
UpdateBackends(backends []*Backend)
}
// RoundRobinBalancer 轮询负载均衡器
type RoundRobinBalancer struct {
backends []*Backend
current uint64
mu sync.RWMutex
}
// NewRoundRobinBalancer 创建轮询负载均衡器
func NewRoundRobinBalancer(backends []*Backend) *RoundRobinBalancer {
return &RoundRobinBalancer{
backends: backends,
}
}
// Select 选择后端服务器
func (rb *RoundRobinBalancer) Select() *Backend {
rb.mu.RLock()
defer rb.mu.RUnlock()
if len(rb.backends) == 0 {
return nil
}
// 过滤活跃的后端
activeBackends := make([]*Backend, 0)
for _, backend := range rb.backends {
if backend.Status == "active" {
activeBackends = append(activeBackends, backend)
}
}
if len(activeBackends) == 0 {
return nil
}
// 轮询选择
index := atomic.AddUint64(&rb.current, 1) % uint64(len(activeBackends))
return activeBackends[index]
}
// UpdateBackends 更新后端列表
func (rb *RoundRobinBalancer) UpdateBackends(backends []*Backend) {
rb.mu.Lock()
defer rb.mu.Unlock()
rb.backends = backends
}
// WeightedBalancer 加权负载均衡器
type WeightedBalancer struct {
backends []*Backend
mu sync.RWMutex
}
// NewWeightedBalancer 创建加权负载均衡器
func NewWeightedBalancer(backends []*Backend) *WeightedBalancer {
return &WeightedBalancer{
backends: backends,
}
}
// Select 根据权重选择后端服务器
func (wb *WeightedBalancer) Select() *Backend {
wb.mu.RLock()
defer wb.mu.RUnlock()
if len(wb.backends) == 0 {
return nil
}
// 计算总权重
totalWeight := 0
activeBackends := make([]*Backend, 0)
for _, backend := range wb.backends {
if backend.Status == "active" {
activeBackends = append(activeBackends, backend)
totalWeight += backend.Weight
}
}
if len(activeBackends) == 0 || totalWeight == 0 {
return nil
}
// 随机选择
rand.Seed(time.Now().UnixNano())
random := rand.Intn(totalWeight)
for _, backend := range activeBackends {
random -= backend.Weight
if random < 0 {
return backend
}
}
return activeBackends[0]
}
// UpdateBackends 更新后端列表
func (wb *WeightedBalancer) UpdateBackends(backends []*Backend) {
wb.mu.Lock()
defer wb.mu.Unlock()
wb.backends = backends
}
// LeastConnBalancer 最少连接负载均衡器
type LeastConnBalancer struct {
backends []*Backend
mu sync.RWMutex
}
// NewLeastConnBalancer 创建最少连接负载均衡器
func NewLeastConnBalancer(backends []*Backend) *LeastConnBalancer {
return &LeastConnBalancer{
backends: backends,
}
}
// Select 选择连接数最少的后端服务器
func (lb *LeastConnBalancer) Select() *Backend {
lb.mu.RLock()
defer lb.mu.RUnlock()
if len(lb.backends) == 0 {
return nil
}
var selected *Backend
minConnections := int64(-1)
for _, backend := range lb.backends {
if backend.Status != "active" {
continue
}
backend.mu.RLock()
connections := backend.Connections
backend.mu.RUnlock()
if minConnections == -1 || connections < minConnections {
minConnections = connections
selected = backend
}
}
return selected
}
// UpdateBackends 更新后端列表
func (lb *LeastConnBalancer) UpdateBackends(backends []*Backend) {
lb.mu.Lock()
defer lb.mu.Unlock()
lb.backends = backends
}
// NewLoadBalancer 创建负载均衡器
func NewLoadBalancer(algorithm string, backends []*Backend) (LoadBalancer, error) {
switch algorithm {
case "round_robin":
return NewRoundRobinBalancer(backends), nil
case "weighted":
return NewWeightedBalancer(backends), nil
case "least_conn":
return NewLeastConnBalancer(backends), nil
default:
return nil, fmt.Errorf("unsupported load balancer algorithm: %s", algorithm)
}
}
健康检查实现 #
// pkg/proxy/health_checker.go
package proxy
import (
"context"
"fmt"
"log"
"net/http"
"sync"
"time"
)
// HealthChecker 健康检查器
type HealthChecker struct {
config *HealthCheckConfig
backends []*Backend
logger *log.Logger
client *http.Client
mu sync.RWMutex
}
// NewHealthChecker 创建健康检查器
func NewHealthChecker(config *HealthCheckConfig, backends []*Backend, logger *log.Logger) *HealthChecker {
return &HealthChecker{
config: config,
backends: backends,
logger: logger,
client: &http.Client{
Timeout: config.Timeout,
},
}
}
// Start 启动健康检查
func (hc *HealthChecker) Start(ctx context.Context) {
ticker := time.NewTicker(hc.config.Interval)
defer ticker.Stop()
hc.logger.Printf("Health checker started with interval %v", hc.config.Interval)
// 初始检查
hc.checkAllBackends()
for {
select {
case <-ticker.C:
hc.checkAllBackends()
case <-ctx.Done():
hc.logger.Println("Health checker stopped")
return
}
}
}
// checkAllBackends 检查所有后端服务器
func (hc *HealthChecker) checkAllBackends() {
hc.mu.RLock()
backends := make([]*Backend, len(hc.backends))
copy(backends, hc.backends)
hc.mu.RUnlock()
var wg sync.WaitGroup
for _, backend := range backends {
wg.Add(1)
go func(b *Backend) {
defer wg.Done()
hc.checkBackend(b)
}(backend)
}
wg.Wait()
}
// checkBackend 检查单个后端服务器
func (hc *HealthChecker) checkBackend(backend *Backend) {
url := fmt.Sprintf("http://%s%s", backend.Address, hc.config.Path)
req, err := http.NewRequest(hc.config.Method, url, nil)
if err != nil {
hc.markBackendDown(backend, fmt.Sprintf("Failed to create request: %v", err))
return
}
resp, err := hc.client.Do(req)
if err != nil {
hc.markBackendDown(backend, fmt.Sprintf("Request failed: %v", err))
return
}
defer resp.Body.Close()
// 检查状态码
healthy := false
for _, code := range hc.config.StatusCodes {
if resp.StatusCode == code {
healthy = true
break
}
}
if healthy {
hc.markBackendUp(backend)
} else {
hc.markBackendDown(backend, fmt.Sprintf("Unhealthy status code: %d", resp.StatusCode))
}
}
// markBackendUp 标记后端服务器为健康
func (hc *HealthChecker) markBackendUp(backend *Backend) {
backend.mu.Lock()
defer backend.mu.Unlock()
if backend.Status != "active" {
backend.Status = "active"
backend.LastSeen = time.Now()
hc.logger.Printf("Backend %s is now healthy", backend.Address)
}
}
// markBackendDown 标记后端服务器为不健康
func (hc *HealthChecker) markBackendDown(backend *Backend, reason string) {
backend.mu.Lock()
defer backend.mu.Unlock()
if backend.Status == "active" {
backend.Status = "down"
hc.logger.Printf("Backend %s is now unhealthy: %s", backend.Address, reason)
}
}
// UpdateBackends 更新后端列表
func (hc *HealthChecker) UpdateBackends(backends []*Backend) {
hc.mu.Lock()
defer hc.mu.Unlock()
hc.backends = backends
}
连接池实现 #
// pkg/proxy/connection_pool.go
package proxy
import (
"fmt"
"net"
"sync"
"time"
)
// ConnectionPool 连接池
type ConnectionPool struct {
config *PoolConfig
pools map[string]*Pool
mu sync.RWMutex
}
// Pool 单个地址的连接池
type Pool struct {
address string
config *PoolConfig
connections chan net.Conn
active int
mu sync.Mutex
}
// NewConnectionPool 创建连接池
func NewConnectionPool(config *PoolConfig) *ConnectionPool {
return &ConnectionPool{
config: config,
pools: make(map[string]*Pool),
}
}
// Get 获取连接
func (cp *ConnectionPool) Get(address string) (net.Conn, error) {
cp.mu.RLock()
pool, exists := cp.pools[address]
cp.mu.RUnlock()
if !exists {
cp.mu.Lock()
// 双重检查
if pool, exists = cp.pools[address]; !exists {
pool = &Pool{
address: address,
config: cp.config,
connections: make(chan net.Conn, cp.config.MaxIdle),
}
cp.pools[address] = pool
}
cp.mu.Unlock()
}
return pool.Get()
}
// Get 从池中获取连接
func (p *Pool) Get() (net.Conn, error) {
// 尝试从池中获取空闲连接
select {
case conn := <-p.connections:
// 检查连接是否仍然有效
if p.isConnValid(conn) {
return conn, nil
}
conn.Close()
default:
}
// 检查活跃连接数限制
p.mu.Lock()
if p.active >= p.config.MaxActive {
p.mu.Unlock()
return nil, fmt.Errorf("connection pool exhausted")
}
p.active++
p.mu.Unlock()
// 创建新连接
conn, err := net.DialTimeout("tcp", p.address, p.config.ConnectTimeout)
if err != nil {
p.mu.Lock()
p.active--
p.mu.Unlock()
return nil, err
}
return &pooledConn{
Conn: conn,
pool: p,
}, nil
}
// Put 将连接放回池中
func (p *Pool) Put(conn net.Conn) {
select {
case p.connections <- conn:
// 成功放回池中
default:
// 池已满,关闭连接
conn.Close()
p.mu.Lock()
p.active--
p.mu.Unlock()
}
}
// isConnValid 检查连接是否有效
func (p *Pool) isConnValid(conn net.Conn) bool {
// 设置读取超时
conn.SetReadDeadline(time.Now().Add(time.Millisecond))
// 尝试读取一个字节
buf := make([]byte, 1)
_, err := conn.Read(buf)
// 重置读取超时
conn.SetReadDeadline(time.Time{})
// 如果是超时错误,连接可能仍然有效
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
return true
}
return err == nil
}
// pooledConn 池化连接包装器
type pooledConn struct {
net.Conn
pool *Pool
}
// Close 关闭连接(实际上是放回池中)
func (pc *pooledConn) Close() error {
pc.pool.Put(pc.Conn)
return nil
}
访问控制实现 #
// pkg/proxy/access_control.go
package proxy
import (
"net"
"net/http"
"strings"
"sync"
"time"
)
// AccessController 访问控制器
type AccessController struct {
config *AccessConfig
allowIPs map[string]bool
denyIPs map[string]bool
rateLimiter *RateLimiter
mu sync.RWMutex
}
// NewAccessController 创建访问控制器
func NewAccessController(config *AccessConfig) *AccessController {
ac := &AccessController{
config: config,
allowIPs: make(map[string]bool),
denyIPs: make(map[string]bool),
}
// 解析允许的IP
for _, ip := range config.AllowIPs {
ac.allowIPs[ip] = true
}
// 解析拒绝的IP
for _, ip := range config.DenyIPs {
ac.denyIPs[ip] = true
}
// 初始化限流器
if config.RateLimit > 0 {
ac.rateLimiter = NewRateLimiter(config.RateLimit)
}
return ac
}
// Allow 检查是否允许访问
func (ac *AccessController) Allow(r *http.Request) bool {
clientIP := ac.getClientIP(r)
// 检查IP黑名单
if ac.denyIPs[clientIP] {
return false
}
// 检查IP白名单
if len(ac.allowIPs) > 0 && !ac.allowIPs[clientIP] {
return false
}
// 检查限流
if ac.rateLimiter != nil && !ac.rateLimiter.Allow(clientIP) {
return false
}
return true
}
// getClientIP 获取客户端IP
func (ac *AccessController) getClientIP(r *http.Request) string {
// 检查X-Forwarded-For头
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
ips := strings.Split(xff, ",")
return strings.TrimSpace(ips[0])
}
// 检查X-Real-IP头
if xri := r.Header.Get("X-Real-IP"); xri != "" {
return xri
}
// 使用RemoteAddr
host, _, _ := net.SplitHostPort(r.RemoteAddr)
return host
}
// RateLimiter 限流器
type RateLimiter struct {
rate int
clients map[string]*ClientLimiter
mu sync.RWMutex
}
// ClientLimiter 客户端限流器
type ClientLimiter struct {
tokens int
lastSeen time.Time
}
// NewRateLimiter 创建限流器
func NewRateLimiter(rate int) *RateLimiter {
rl := &RateLimiter{
rate: rate,
clients: make(map[string]*ClientLimiter),
}
// 启动清理协程
go rl.cleanup()
return rl
}
// Allow 检查是否允许请求
func (rl *RateLimiter) Allow(clientIP string) bool {
rl.mu.Lock()
defer rl.mu.Unlock()
now := time.Now()
client, exists := rl.clients[clientIP]
if !exists {
client = &ClientLimiter{
tokens: rl.rate - 1,
lastSeen: now,
}
rl.clients[clientIP] = client
return true
}
// 令牌桶算法
elapsed := now.Sub(client.lastSeen)
tokensToAdd := int(elapsed.Seconds()) * rl.rate
client.tokens += tokensToAdd
if client.tokens > rl.rate {
client.tokens = rl.rate
}
client.lastSeen = now
if client.tokens > 0 {
client.tokens--
return true
}
return false
}
// cleanup 清理过期的客户端记录
func (rl *RateLimiter) cleanup() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
rl.mu.Lock()
now := time.Now()
for ip, client := range rl.clients {
if now.Sub(client.lastSeen) > 10*time.Minute {
delete(rl.clients, ip)
}
}
rl.mu.Unlock()
}
}
主程序实现 #
// cmd/proxy/main.go
package main
import (
"context"
"encoding/json"
"log"
"os"
"os/signal"
"syscall"
"time"
"your-project/pkg/proxy"
)
func main() {
// 读取配置文件
configFile := "config.json"
if len(os.Args) > 1 {
configFile = os.Args[1]
}
config, err := loadConfig(configFile)
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// 创建日志器
logger := log.New(os.Stdout, "[PROXY] ", log.LstdFlags)
// 创建代理服务器
var proxyServer interface {
Start(context.Context) error
}
switch config.Type {
case proxy.ProxyHTTP:
proxyServer, err = proxy.NewHTTPProxy(config, logger)
case proxy.ProxyHTTPS:
// 实现HTTPS代理
case proxy.ProxySOCKS5:
// 实现SOCKS5代理
default:
log.Fatalf("Unsupported proxy type: %s", config.Type)
}
if err != nil {
log.Fatalf("Failed to create proxy server: %v", err)
}
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 启动代理服务器
go func() {
if err := proxyServer.Start(ctx); err != nil {
logger.Printf("Proxy server error: %v", err)
}
}()
logger.Printf("Proxy server started on %s", config.ListenAddr)
// 等待信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
logger.Println("Shutting down proxy server...")
cancel()
}
// loadConfig 加载配置文件
func loadConfig(filename string) (*proxy.ProxyConfig, error) {
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()
var config proxy.ProxyConfig
decoder := json.NewDecoder(file)
if err := decoder.Decode(&config); err != nil {
return nil, err
}
return &config, nil
}
配置文件示例 #
{
"type": "http",
"listen_addr": ":8080",
"backends": [
{
"id": "backend1",
"address": "127.0.0.1:8081",
"weight": 1,
"status": "active"
},
{
"id": "backend2",
"address": "127.0.0.1:8082",
"weight": 2,
"status": "active"
}
],
"load_balancer": "weighted",
"health_check": {
"enabled": true,
"interval": "30s",
"timeout": "5s",
"path": "/health",
"method": "GET",
"status_codes": [200],
"max_retries": 3
},
"connection_pool": {
"max_idle": 10,
"max_active": 100,
"idle_timeout": "300s",
"connect_timeout": "10s",
"read_timeout": "30s",
"write_timeout": "30s"
},
"access_control": {
"enabled": true,
"allow_ips": ["127.0.0.1", "192.168.1.0/24"],
"deny_ips": [],
"rate_limit": 100
},
"timeout": "30s",
"max_connections": 1000,
"buffer_size": 4096
}
功能扩展 #
SSL/TLS 支持 #
// 添加HTTPS代理支持
func (p *HTTPProxy) StartTLS(ctx context.Context, certFile, keyFile string) error {
// 配置TLS
p.server.TLSConfig = &tls.Config{
// TLS配置
}
go func() {
if err := p.server.ListenAndServeTLS(certFile, keyFile); err != nil && err != http.ErrServerClosed {
p.logger.Printf("HTTPS proxy error: %v", err)
}
}()
<-ctx.Done()
return p.server.Shutdown(context.Background())
}
监控和日志 #
// 添加Prometheus监控
import "github.com/prometheus/client_golang/prometheus"
var (
requestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "proxy_requests_total",
Help: "Total number of proxy requests",
},
[]string{"backend", "status"},
)
requestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "proxy_request_duration_seconds",
Help: "Request duration in seconds",
},
[]string{"backend"},
)
)
通过本节的学习,我们实现了一个功能完整的网络代理服务器,包括 HTTP/HTTPS 代理、负载均衡、健康检查、连接池管理等核心功能。这个系统展示了如何运用 Go 语言的网络编程能力构建高性能的网络基础设施组件。