4.2.2 UDP 编程基础

4.2.2 UDP 编程基础 #

UDP(User Datagram Protocol)是一种无连接的、不可靠的传输层协议。与 TCP 不同,UDP 提供了简单、高效的数据传输机制,适用于对实时性要求高、能容忍少量数据丢失的应用场景。本节将详细介绍如何在 Go 语言中进行 UDP 编程。

UDP 协议特性 #

无连接传输 #

UDP 是无连接协议,不需要建立连接就可以发送数据:

客户端                    服务器
   |                        |
   |====== 数据包 1 ======>|  (直接发送)
   |====== 数据包 2 ======>|  (直接发送)
   |<===== 数据包 3 =======|  (直接发送)
   |                        |

主要特点 #

  • 无连接 - 发送数据前不需要建立连接
  • 不可靠 - 不保证数据包的到达和顺序
  • 高效 - 协议开销小,传输速度快
  • 支持广播和组播 - 可以一对多通信
  • 数据包边界保持 - 每次发送的数据作为独立的数据包

UDP 客户端编程 #

基础 UDP 客户端 #

package main

import (
    "fmt"
    "net"
    "time"
)

func basicUDPClient() {
    // 解析服务器地址
    serverAddr, err := net.ResolveUDPAddr("udp", "localhost:8080")
    if err != nil {
        fmt.Printf("解析地址失败: %v\n", err)
        return
    }

    // 创建 UDP 连接
    conn, err := net.DialUDP("udp", nil, serverAddr)
    if err != nil {
        fmt.Printf("连接失败: %v\n", err)
        return
    }
    defer conn.Close()

    fmt.Printf("连接到服务器: %s\n", serverAddr)

    // 发送数据
    message := "Hello, UDP Server!"
    _, err = conn.Write([]byte(message))
    if err != nil {
        fmt.Printf("发送数据失败: %v\n", err)
        return
    }

    // 接收响应
    buffer := make([]byte, 1024)
    n, err := conn.Read(buffer)
    if err != nil {
        fmt.Printf("接收数据失败: %v\n", err)
        return
    }

    fmt.Printf("收到响应: %s\n", string(buffer[:n]))
}

无连接 UDP 客户端 #

func connectionlessUDPClient() {
    // 创建 UDP 套接字
    conn, err := net.ListenUDP("udp", nil)
    if err != nil {
        fmt.Printf("创建套接字失败: %v\n", err)
        return
    }
    defer conn.Close()

    // 解析服务器地址
    serverAddr, err := net.ResolveUDPAddr("udp", "localhost:8080")
    if err != nil {
        fmt.Printf("解析地址失败: %v\n", err)
        return
    }

    // 发送数据到指定地址
    message := "Hello from connectionless client!"
    _, err = conn.WriteToUDP([]byte(message), serverAddr)
    if err != nil {
        fmt.Printf("发送数据失败: %v\n", err)
        return
    }

    // 接收响应
    buffer := make([]byte, 1024)
    n, addr, err := conn.ReadFromUDP(buffer)
    if err != nil {
        fmt.Printf("接收数据失败: %v\n", err)
        return
    }

    fmt.Printf("从 %s 收到响应: %s\n", addr, string(buffer[:n]))
}

高级 UDP 客户端 #

import (
    "context"
    "fmt"
    "net"
    "sync"
    "time"
)

type UDPClient struct {
    conn       *net.UDPConn
    serverAddr *net.UDPAddr
    timeout    time.Duration
    mutex      sync.RWMutex
}

func NewUDPClient(serverAddress string, timeout time.Duration) (*UDPClient, error) {
    serverAddr, err := net.ResolveUDPAddr("udp", serverAddress)
    if err != nil {
        return nil, err
    }

    conn, err := net.DialUDP("udp", nil, serverAddr)
    if err != nil {
        return nil, err
    }

    return &UDPClient{
        conn:       conn,
        serverAddr: serverAddr,
        timeout:    timeout,
    }, nil
}

func (c *UDPClient) SendWithResponse(data []byte) ([]byte, error) {
    c.mutex.Lock()
    defer c.mutex.Unlock()

    // 设置写超时
    err := c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
    if err != nil {
        return nil, err
    }

    // 发送数据
    _, err = c.conn.Write(data)
    if err != nil {
        return nil, err
    }

    // 设置读超时
    err = c.conn.SetReadDeadline(time.Now().Add(c.timeout))
    if err != nil {
        return nil, err
    }

    // 接收响应
    buffer := make([]byte, 1024)
    n, err := c.conn.Read(buffer)
    if err != nil {
        return nil, err
    }

    return buffer[:n], nil
}

func (c *UDPClient) SendAsync(data []byte) error {
    c.mutex.RLock()
    defer c.mutex.RUnlock()

    err := c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
    if err != nil {
        return err
    }

    _, err = c.conn.Write(data)
    return err
}

func (c *UDPClient) Close() error {
    return c.conn.Close()
}

func demonstrateAdvancedUDPClient() {
    client, err := NewUDPClient("localhost:8080", 5*time.Second)
    if err != nil {
        fmt.Printf("创建客户端失败: %v\n", err)
        return
    }
    defer client.Close()

    // 发送并等待响应
    response, err := client.SendWithResponse([]byte("Hello, Advanced UDP!"))
    if err != nil {
        fmt.Printf("发送失败: %v\n", err)
        return
    }

    fmt.Printf("收到响应: %s\n", string(response))

    // 异步发送
    err = client.SendAsync([]byte("Async message"))
    if err != nil {
        fmt.Printf("异步发送失败: %v\n", err)
        return
    }

    fmt.Println("异步消息发送成功")
}

UDP 服务器编程 #

基础 UDP 服务器 #

func basicUDPServer() {
    // 监听 UDP 端口
    addr, err := net.ResolveUDPAddr("udp", ":8080")
    if err != nil {
        fmt.Printf("解析地址失败: %v\n", err)
        return
    }

    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        fmt.Printf("监听失败: %v\n", err)
        return
    }
    defer conn.Close()

    fmt.Println("UDP 服务器启动,监听端口 8080")

    buffer := make([]byte, 1024)
    for {
        // 接收数据
        n, clientAddr, err := conn.ReadFromUDP(buffer)
        if err != nil {
            fmt.Printf("接收数据失败: %v\n", err)
            continue
        }

        message := string(buffer[:n])
        fmt.Printf("收到来自 %s 的消息: %s\n", clientAddr, message)

        // 发送响应
        response := fmt.Sprintf("Echo: %s", message)
        _, err = conn.WriteToUDP([]byte(response), clientAddr)
        if err != nil {
            fmt.Printf("发送响应失败: %v\n", err)
        }
    }
}

并发 UDP 服务器 #

type UDPServer struct {
    conn     *net.UDPConn
    address  string
    handlers map[string]func([]byte, *net.UDPAddr) []byte
    ctx      context.Context
    cancel   context.CancelFunc
    wg       sync.WaitGroup
}

func NewUDPServer(address string) *UDPServer {
    ctx, cancel := context.WithCancel(context.Background())
    return &UDPServer{
        address:  address,
        handlers: make(map[string]func([]byte, *net.UDPAddr) []byte),
        ctx:      ctx,
        cancel:   cancel,
    }
}

func (s *UDPServer) RegisterHandler(command string, handler func([]byte, *net.UDPAddr) []byte) {
    s.handlers[command] = handler
}

func (s *UDPServer) Start() error {
    addr, err := net.ResolveUDPAddr("udp", s.address)
    if err != nil {
        return err
    }

    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        return err
    }
    s.conn = conn

    fmt.Printf("UDP 服务器启动: %s\n", s.address)

    // 启动多个工作协程处理请求
    for i := 0; i < 10; i++ {
        s.wg.Add(1)
        go s.worker(i)
    }

    return nil
}

func (s *UDPServer) worker(id int) {
    defer s.wg.Done()

    buffer := make([]byte, 1024)

    for {
        select {
        case <-s.ctx.Done():
            return
        default:
            // 设置读取超时
            s.conn.SetReadDeadline(time.Now().Add(1 * time.Second))

            n, clientAddr, err := s.conn.ReadFromUDP(buffer)
            if err != nil {
                if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
                    continue // 超时,继续循环
                }
                fmt.Printf("Worker %d 读取错误: %v\n", id, err)
                continue
            }

            // 处理请求
            go s.handleRequest(buffer[:n], clientAddr, id)
        }
    }
}

func (s *UDPServer) handleRequest(data []byte, clientAddr *net.UDPAddr, workerID int) {
    fmt.Printf("Worker %d 处理来自 %s 的请求: %s\n", workerID, clientAddr, string(data))

    // 解析命令
    message := string(data)
    var response []byte

    if len(message) > 0 {
        parts := strings.SplitN(message, " ", 2)
        command := parts[0]

        if handler, exists := s.handlers[command]; exists {
            response = handler(data, clientAddr)
        } else {
            response = []byte(fmt.Sprintf("Unknown command: %s", command))
        }
    } else {
        response = []byte("Empty message")
    }

    // 发送响应
    _, err := s.conn.WriteToUDP(response, clientAddr)
    if err != nil {
        fmt.Printf("发送响应失败: %v\n", err)
    }
}

func (s *UDPServer) Stop() error {
    s.cancel()

    if s.conn != nil {
        s.conn.Close()
    }

    s.wg.Wait()
    return nil
}

func demonstrateConcurrentUDPServer() {
    server := NewUDPServer(":8080")

    // 注册处理器
    server.RegisterHandler("PING", func(data []byte, addr *net.UDPAddr) []byte {
        return []byte("PONG")
    })

    server.RegisterHandler("TIME", func(data []byte, addr *net.UDPAddr) []byte {
        return []byte(time.Now().Format("2006-01-02 15:04:05"))
    })

    server.RegisterHandler("ECHO", func(data []byte, addr *net.UDPAddr) []byte {
        return []byte(fmt.Sprintf("Echo: %s", string(data)))
    })

    err := server.Start()
    if err != nil {
        fmt.Printf("启动服务器失败: %v\n", err)
        return
    }

    // 运行服务器
    fmt.Println("服务器运行中,按 Ctrl+C 停止")
    time.Sleep(30 * time.Second)

    server.Stop()
}

UDP 广播和组播 #

UDP 广播 #

func udpBroadcast() {
    // 创建广播地址
    broadcastAddr, err := net.ResolveUDPAddr("udp", "255.255.255.255:8080")
    if err != nil {
        fmt.Printf("解析广播地址失败: %v\n", err)
        return
    }

    // 创建 UDP 连接
    conn, err := net.DialUDP("udp", nil, broadcastAddr)
    if err != nil {
        fmt.Printf("创建连接失败: %v\n", err)
        return
    }
    defer conn.Close()

    // 发送广播消息
    message := "Broadcast message from Go!"
    _, err = conn.Write([]byte(message))
    if err != nil {
        fmt.Printf("发送广播失败: %v\n", err)
        return
    }

    fmt.Println("广播消息发送成功")
}

func udpBroadcastServer() {
    // 监听广播
    addr, err := net.ResolveUDPAddr("udp", ":8080")
    if err != nil {
        fmt.Printf("解析地址失败: %v\n", err)
        return
    }

    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        fmt.Printf("监听失败: %v\n", err)
        return
    }
    defer conn.Close()

    fmt.Println("广播服务器启动,监听端口 8080")

    buffer := make([]byte, 1024)
    for {
        n, clientAddr, err := conn.ReadFromUDP(buffer)
        if err != nil {
            fmt.Printf("接收广播失败: %v\n", err)
            continue
        }

        message := string(buffer[:n])
        fmt.Printf("收到广播消息 [%s]: %s\n", clientAddr, message)
    }
}

UDP 组播 #

import (
    "golang.org/x/net/ipv4"
)

func udpMulticast() {
    // 组播地址
    multicastAddr, err := net.ResolveUDPAddr("udp", "224.0.0.1:8080")
    if err != nil {
        fmt.Printf("解析组播地址失败: %v\n", err)
        return
    }

    // 创建连接
    conn, err := net.DialUDP("udp", nil, multicastAddr)
    if err != nil {
        fmt.Printf("创建连接失败: %v\n", err)
        return
    }
    defer conn.Close()

    // 发送组播消息
    message := "Multicast message from Go!"
    _, err = conn.Write([]byte(message))
    if err != nil {
        fmt.Printf("发送组播失败: %v\n", err)
        return
    }

    fmt.Println("组播消息发送成功")
}

func udpMulticastServer() {
    // 监听组播
    addr, err := net.ResolveUDPAddr("udp", "224.0.0.1:8080")
    if err != nil {
        fmt.Printf("解析地址失败: %v\n", err)
        return
    }

    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        fmt.Printf("监听失败: %v\n", err)
        return
    }
    defer conn.Close()

    // 加入组播组
    pc := ipv4.NewPacketConn(conn)
    defer pc.Close()

    // 获取网络接口
    ifi, err := net.InterfaceByName("eth0") // 根据实际情况修改
    if err != nil {
        fmt.Printf("获取网络接口失败: %v\n", err)
        return
    }

    err = pc.JoinGroup(ifi, &net.UDPAddr{IP: net.IPv4(224, 0, 0, 1)})
    if err != nil {
        fmt.Printf("加入组播组失败: %v\n", err)
        return
    }

    fmt.Println("组播服务器启动,已加入组播组")

    buffer := make([]byte, 1024)
    for {
        n, clientAddr, err := conn.ReadFromUDP(buffer)
        if err != nil {
            fmt.Printf("接收组播失败: %v\n", err)
            continue
        }

        message := string(buffer[:n])
        fmt.Printf("收到组播消息 [%s]: %s\n", clientAddr, message)
    }
}

可靠 UDP 实现 #

带确认的 UDP #

type ReliableUDP struct {
    conn       *net.UDPConn
    timeout    time.Duration
    maxRetries int
    seqNum     uint32
    mutex      sync.Mutex
}

type UDPPacket struct {
    SeqNum uint32
    Type   uint8 // 0: 数据, 1: 确认
    Data   []byte
}

func NewReliableUDP(localAddr string) (*ReliableUDP, error) {
    addr, err := net.ResolveUDPAddr("udp", localAddr)
    if err != nil {
        return nil, err
    }

    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        return nil, err
    }

    return &ReliableUDP{
        conn:       conn,
        timeout:    3 * time.Second,
        maxRetries: 3,
        seqNum:     0,
    }, nil
}

func (r *ReliableUDP) SendReliable(data []byte, remoteAddr *net.UDPAddr) error {
    r.mutex.Lock()
    r.seqNum++
    seqNum := r.seqNum
    r.mutex.Unlock()

    packet := UDPPacket{
        SeqNum: seqNum,
        Type:   0, // 数据包
        Data:   data,
    }

    packetData := r.serializePacket(packet)

    for attempt := 0; attempt < r.maxRetries; attempt++ {
        // 发送数据包
        _, err := r.conn.WriteToUDP(packetData, remoteAddr)
        if err != nil {
            return err
        }

        // 等待确认
        r.conn.SetReadDeadline(time.Now().Add(r.timeout))

        buffer := make([]byte, 1024)
        n, addr, err := r.conn.ReadFromUDP(buffer)
        if err != nil {
            if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
                fmt.Printf("超时,重试 %d/%d\n", attempt+1, r.maxRetries)
                continue
            }
            return err
        }

        // 检查是否是期望的确认包
        ackPacket := r.deserializePacket(buffer[:n])
        if ackPacket.Type == 1 && ackPacket.SeqNum == seqNum && addr.String() == remoteAddr.String() {
            return nil // 收到确认
        }
    }

    return fmt.Errorf("发送失败,已达到最大重试次数")
}

func (r *ReliableUDP) ReceiveReliable() ([]byte, *net.UDPAddr, error) {
    buffer := make([]byte, 1024)
    n, addr, err := r.conn.ReadFromUDP(buffer)
    if err != nil {
        return nil, nil, err
    }

    packet := r.deserializePacket(buffer[:n])

    if packet.Type == 0 { // 数据包
        // 发送确认
        ackPacket := UDPPacket{
            SeqNum: packet.SeqNum,
            Type:   1, // 确认包
            Data:   nil,
        }

        ackData := r.serializePacket(ackPacket)
        r.conn.WriteToUDP(ackData, addr)

        return packet.Data, addr, nil
    }

    return nil, nil, fmt.Errorf("收到非数据包")
}

func (r *ReliableUDP) serializePacket(packet UDPPacket) []byte {
    // 简单的序列化:4字节序列号 + 1字节类型 + 数据
    result := make([]byte, 5+len(packet.Data))
    result[0] = byte(packet.SeqNum >> 24)
    result[1] = byte(packet.SeqNum >> 16)
    result[2] = byte(packet.SeqNum >> 8)
    result[3] = byte(packet.SeqNum)
    result[4] = packet.Type
    copy(result[5:], packet.Data)
    return result
}

func (r *ReliableUDP) deserializePacket(data []byte) UDPPacket {
    if len(data) < 5 {
        return UDPPacket{}
    }

    seqNum := uint32(data[0])<<24 | uint32(data[1])<<16 | uint32(data[2])<<8 | uint32(data[3])
    packetType := data[4]
    packetData := data[5:]

    return UDPPacket{
        SeqNum: seqNum,
        Type:   packetType,
        Data:   packetData,
    }
}

func (r *ReliableUDP) Close() error {
    return r.conn.Close()
}

func demonstrateReliableUDP() {
    // 服务器端
    go func() {
        server, err := NewReliableUDP(":8080")
        if err != nil {
            fmt.Printf("创建服务器失败: %v\n", err)
            return
        }
        defer server.Close()

        fmt.Println("可靠 UDP 服务器启动")

        for {
            data, addr, err := server.ReceiveReliable()
            if err != nil {
                fmt.Printf("接收失败: %v\n", err)
                continue
            }

            fmt.Printf("收到可靠消息 [%s]: %s\n", addr, string(data))
        }
    }()

    time.Sleep(1 * time.Second)

    // 客户端
    client, err := NewReliableUDP(":0")
    if err != nil {
        fmt.Printf("创建客户端失败: %v\n", err)
        return
    }
    defer client.Close()

    serverAddr, _ := net.ResolveUDPAddr("udp", "localhost:8080")

    err = client.SendReliable([]byte("Hello, Reliable UDP!"), serverAddr)
    if err != nil {
        fmt.Printf("发送失败: %v\n", err)
    } else {
        fmt.Println("可靠消息发送成功")
    }

    time.Sleep(2 * time.Second)
}

UDP 性能优化 #

批量发送优化 #

type UDPBatcher struct {
    conn      *net.UDPConn
    buffer    [][]byte
    addresses []*net.UDPAddr
    batchSize int
    mutex     sync.Mutex
}

func NewUDPBatcher(conn *net.UDPConn, batchSize int) *UDPBatcher {
    return &UDPBatcher{
        conn:      conn,
        buffer:    make([][]byte, 0, batchSize),
        addresses: make([]*net.UDPAddr, 0, batchSize),
        batchSize: batchSize,
    }
}

func (b *UDPBatcher) Add(data []byte, addr *net.UDPAddr) error {
    b.mutex.Lock()
    defer b.mutex.Unlock()

    b.buffer = append(b.buffer, data)
    b.addresses = append(b.addresses, addr)

    if len(b.buffer) >= b.batchSize {
        return b.flush()
    }

    return nil
}

func (b *UDPBatcher) flush() error {
    for i, data := range b.buffer {
        _, err := b.conn.WriteToUDP(data, b.addresses[i])
        if err != nil {
            return err
        }
    }

    // 清空缓冲区
    b.buffer = b.buffer[:0]
    b.addresses = b.addresses[:0]

    return nil
}

func (b *UDPBatcher) Flush() error {
    b.mutex.Lock()
    defer b.mutex.Unlock()
    return b.flush()
}

零拷贝优化 #

func udpZeroCopy(conn *net.UDPConn, data []byte, addr *net.UDPAddr) error {
    // 使用 WriteMsgUDP 进行更高效的发送
    if udpConn, ok := conn.(*net.UDPConn); ok {
        _, _, err := udpConn.WriteMsgUDP(data, nil, addr)
        return err
    }
    return fmt.Errorf("不支持 WriteMsgUDP")
}

错误处理和监控 #

UDP 错误处理 #

func handleUDPErrors(err error) {
    if err == nil {
        return
    }

    if netErr, ok := err.(net.Error); ok {
        if netErr.Timeout() {
            fmt.Println("UDP 操作超时")
        } else if netErr.Temporary() {
            fmt.Println("UDP 临时错误,可以重试")
        } else {
            fmt.Printf("UDP 网络错误: %v\n", err)
        }
    } else {
        fmt.Printf("UDP 其他错误: %v\n", err)
    }
}

UDP 连接监控 #

type UDPMonitor struct {
    packetsSent     int64
    packetsReceived int64
    bytesSent       int64
    bytesReceived   int64
    errors          int64
    mutex           sync.RWMutex
}

func (m *UDPMonitor) RecordSent(bytes int) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    m.packetsSent++
    m.bytesSent += int64(bytes)
}

func (m *UDPMonitor) RecordReceived(bytes int) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    m.packetsReceived++
    m.bytesReceived += int64(bytes)
}

func (m *UDPMonitor) RecordError() {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    m.errors++
}

func (m *UDPMonitor) GetStats() (int64, int64, int64, int64, int64) {
    m.mutex.RLock()
    defer m.mutex.RUnlock()
    return m.packetsSent, m.packetsReceived, m.bytesSent, m.bytesReceived, m.errors
}

func (m *UDPMonitor) PrintStats() {
    sent, recv, bytesSent, bytesRecv, errors := m.GetStats()
    fmt.Printf("UDP 统计: 发送包=%d, 接收包=%d, 发送字节=%d, 接收字节=%d, 错误=%d\n",
        sent, recv, bytesSent, bytesRecv, errors)
}

实际应用示例 #

UDP 心跳检测 #

type UDPHeartbeat struct {
    conn     *net.UDPConn
    peers    map[string]time.Time
    interval time.Duration
    timeout  time.Duration
    mutex    sync.RWMutex
    stopChan chan struct{}
}

func NewUDPHeartbeat(localAddr string, interval, timeout time.Duration) (*UDPHeartbeat, error) {
    addr, err := net.ResolveUDPAddr("udp", localAddr)
    if err != nil {
        return nil, err
    }

    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        return nil, err
    }

    return &UDPHeartbeat{
        conn:     conn,
        peers:    make(map[string]time.Time),
        interval: interval,
        timeout:  timeout,
        stopChan: make(chan struct{}),
    }, nil
}

func (h *UDPHeartbeat) Start() {
    go h.sendHeartbeats()
    go h.receiveHeartbeats()
    go h.checkTimeouts()
}

func (h *UDPHeartbeat) sendHeartbeats() {
    ticker := time.NewTicker(h.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            h.mutex.RLock()
            for peerAddr := range h.peers {
                addr, _ := net.ResolveUDPAddr("udp", peerAddr)
                h.conn.WriteToUDP([]byte("HEARTBEAT"), addr)
            }
            h.mutex.RUnlock()
        case <-h.stopChan:
            return
        }
    }
}

func (h *UDPHeartbeat) receiveHeartbeats() {
    buffer := make([]byte, 1024)

    for {
        select {
        case <-h.stopChan:
            return
        default:
            h.conn.SetReadDeadline(time.Now().Add(1 * time.Second))
            n, addr, err := h.conn.ReadFromUDP(buffer)
            if err != nil {
                if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
                    continue
                }
                fmt.Printf("接收心跳失败: %v\n", err)
                continue
            }

            message := string(buffer[:n])
            if message == "HEARTBEAT" {
                h.mutex.Lock()
                h.peers[addr.String()] = time.Now()
                h.mutex.Unlock()
            }
        }
    }
}

func (h *UDPHeartbeat) checkTimeouts() {
    ticker := time.NewTicker(h.timeout / 2)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            now := time.Now()
            h.mutex.Lock()
            for peerAddr, lastSeen := range h.peers {
                if now.Sub(lastSeen) > h.timeout {
                    fmt.Printf("节点超时: %s\n", peerAddr)
                    delete(h.peers, peerAddr)
                }
            }
            h.mutex.Unlock()
        case <-h.stopChan:
            return
        }
    }
}

func (h *UDPHeartbeat) AddPeer(peerAddr string) {
    h.mutex.Lock()
    defer h.mutex.Unlock()
    h.peers[peerAddr] = time.Now()
}

func (h *UDPHeartbeat) Stop() {
    close(h.stopChan)
    h.conn.Close()
}

小结 #

本节详细介绍了 Go 语言中的 UDP 编程基础,包括:

  1. UDP 协议特性 - 无连接、不可靠但高效的传输特点
  2. UDP 客户端 - 基础客户端、无连接客户端、高级客户端实现
  3. UDP 服务器 - 基础服务器、并发服务器、命令处理
  4. 广播和组播 - UDP 的一对多通信能力
  5. 可靠 UDP - 在 UDP 基础上实现可靠传输机制
  6. 性能优化 - 批量处理、零拷贝等优化技术
  7. 实际应用 - 心跳检测等实用场景

掌握这些 UDP 编程技术后,你就能够根据应用需求选择合适的传输协议,构建高效的网络应用程序。在下一节中,我们将学习基于 net/http 包的 HTTP 服务器开发。