4.2.2 UDP 编程基础 #
UDP(User Datagram Protocol)是一种无连接的、不可靠的传输层协议。与 TCP 不同,UDP 提供了简单、高效的数据传输机制,适用于对实时性要求高、能容忍少量数据丢失的应用场景。本节将详细介绍如何在 Go 语言中进行 UDP 编程。
UDP 协议特性 #
无连接传输 #
UDP 是无连接协议,不需要建立连接就可以发送数据:
客户端 服务器
| |
|====== 数据包 1 ======>| (直接发送)
|====== 数据包 2 ======>| (直接发送)
|<===== 数据包 3 =======| (直接发送)
| |
主要特点 #
- 无连接 - 发送数据前不需要建立连接
- 不可靠 - 不保证数据包的到达和顺序
- 高效 - 协议开销小,传输速度快
- 支持广播和组播 - 可以一对多通信
- 数据包边界保持 - 每次发送的数据作为独立的数据包
UDP 客户端编程 #
基础 UDP 客户端 #
package main
import (
"fmt"
"net"
"time"
)
func basicUDPClient() {
// 解析服务器地址
serverAddr, err := net.ResolveUDPAddr("udp", "localhost:8080")
if err != nil {
fmt.Printf("解析地址失败: %v\n", err)
return
}
// 创建 UDP 连接
conn, err := net.DialUDP("udp", nil, serverAddr)
if err != nil {
fmt.Printf("连接失败: %v\n", err)
return
}
defer conn.Close()
fmt.Printf("连接到服务器: %s\n", serverAddr)
// 发送数据
message := "Hello, UDP Server!"
_, err = conn.Write([]byte(message))
if err != nil {
fmt.Printf("发送数据失败: %v\n", err)
return
}
// 接收响应
buffer := make([]byte, 1024)
n, err := conn.Read(buffer)
if err != nil {
fmt.Printf("接收数据失败: %v\n", err)
return
}
fmt.Printf("收到响应: %s\n", string(buffer[:n]))
}
无连接 UDP 客户端 #
func connectionlessUDPClient() {
// 创建 UDP 套接字
conn, err := net.ListenUDP("udp", nil)
if err != nil {
fmt.Printf("创建套接字失败: %v\n", err)
return
}
defer conn.Close()
// 解析服务器地址
serverAddr, err := net.ResolveUDPAddr("udp", "localhost:8080")
if err != nil {
fmt.Printf("解析地址失败: %v\n", err)
return
}
// 发送数据到指定地址
message := "Hello from connectionless client!"
_, err = conn.WriteToUDP([]byte(message), serverAddr)
if err != nil {
fmt.Printf("发送数据失败: %v\n", err)
return
}
// 接收响应
buffer := make([]byte, 1024)
n, addr, err := conn.ReadFromUDP(buffer)
if err != nil {
fmt.Printf("接收数据失败: %v\n", err)
return
}
fmt.Printf("从 %s 收到响应: %s\n", addr, string(buffer[:n]))
}
高级 UDP 客户端 #
import (
"context"
"fmt"
"net"
"sync"
"time"
)
type UDPClient struct {
conn *net.UDPConn
serverAddr *net.UDPAddr
timeout time.Duration
mutex sync.RWMutex
}
func NewUDPClient(serverAddress string, timeout time.Duration) (*UDPClient, error) {
serverAddr, err := net.ResolveUDPAddr("udp", serverAddress)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, serverAddr)
if err != nil {
return nil, err
}
return &UDPClient{
conn: conn,
serverAddr: serverAddr,
timeout: timeout,
}, nil
}
func (c *UDPClient) SendWithResponse(data []byte) ([]byte, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
// 设置写超时
err := c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
if err != nil {
return nil, err
}
// 发送数据
_, err = c.conn.Write(data)
if err != nil {
return nil, err
}
// 设置读超时
err = c.conn.SetReadDeadline(time.Now().Add(c.timeout))
if err != nil {
return nil, err
}
// 接收响应
buffer := make([]byte, 1024)
n, err := c.conn.Read(buffer)
if err != nil {
return nil, err
}
return buffer[:n], nil
}
func (c *UDPClient) SendAsync(data []byte) error {
c.mutex.RLock()
defer c.mutex.RUnlock()
err := c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
if err != nil {
return err
}
_, err = c.conn.Write(data)
return err
}
func (c *UDPClient) Close() error {
return c.conn.Close()
}
func demonstrateAdvancedUDPClient() {
client, err := NewUDPClient("localhost:8080", 5*time.Second)
if err != nil {
fmt.Printf("创建客户端失败: %v\n", err)
return
}
defer client.Close()
// 发送并等待响应
response, err := client.SendWithResponse([]byte("Hello, Advanced UDP!"))
if err != nil {
fmt.Printf("发送失败: %v\n", err)
return
}
fmt.Printf("收到响应: %s\n", string(response))
// 异步发送
err = client.SendAsync([]byte("Async message"))
if err != nil {
fmt.Printf("异步发送失败: %v\n", err)
return
}
fmt.Println("异步消息发送成功")
}
UDP 服务器编程 #
基础 UDP 服务器 #
func basicUDPServer() {
// 监听 UDP 端口
addr, err := net.ResolveUDPAddr("udp", ":8080")
if err != nil {
fmt.Printf("解析地址失败: %v\n", err)
return
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
fmt.Printf("监听失败: %v\n", err)
return
}
defer conn.Close()
fmt.Println("UDP 服务器启动,监听端口 8080")
buffer := make([]byte, 1024)
for {
// 接收数据
n, clientAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
fmt.Printf("接收数据失败: %v\n", err)
continue
}
message := string(buffer[:n])
fmt.Printf("收到来自 %s 的消息: %s\n", clientAddr, message)
// 发送响应
response := fmt.Sprintf("Echo: %s", message)
_, err = conn.WriteToUDP([]byte(response), clientAddr)
if err != nil {
fmt.Printf("发送响应失败: %v\n", err)
}
}
}
并发 UDP 服务器 #
type UDPServer struct {
conn *net.UDPConn
address string
handlers map[string]func([]byte, *net.UDPAddr) []byte
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewUDPServer(address string) *UDPServer {
ctx, cancel := context.WithCancel(context.Background())
return &UDPServer{
address: address,
handlers: make(map[string]func([]byte, *net.UDPAddr) []byte),
ctx: ctx,
cancel: cancel,
}
}
func (s *UDPServer) RegisterHandler(command string, handler func([]byte, *net.UDPAddr) []byte) {
s.handlers[command] = handler
}
func (s *UDPServer) Start() error {
addr, err := net.ResolveUDPAddr("udp", s.address)
if err != nil {
return err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return err
}
s.conn = conn
fmt.Printf("UDP 服务器启动: %s\n", s.address)
// 启动多个工作协程处理请求
for i := 0; i < 10; i++ {
s.wg.Add(1)
go s.worker(i)
}
return nil
}
func (s *UDPServer) worker(id int) {
defer s.wg.Done()
buffer := make([]byte, 1024)
for {
select {
case <-s.ctx.Done():
return
default:
// 设置读取超时
s.conn.SetReadDeadline(time.Now().Add(1 * time.Second))
n, clientAddr, err := s.conn.ReadFromUDP(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue // 超时,继续循环
}
fmt.Printf("Worker %d 读取错误: %v\n", id, err)
continue
}
// 处理请求
go s.handleRequest(buffer[:n], clientAddr, id)
}
}
}
func (s *UDPServer) handleRequest(data []byte, clientAddr *net.UDPAddr, workerID int) {
fmt.Printf("Worker %d 处理来自 %s 的请求: %s\n", workerID, clientAddr, string(data))
// 解析命令
message := string(data)
var response []byte
if len(message) > 0 {
parts := strings.SplitN(message, " ", 2)
command := parts[0]
if handler, exists := s.handlers[command]; exists {
response = handler(data, clientAddr)
} else {
response = []byte(fmt.Sprintf("Unknown command: %s", command))
}
} else {
response = []byte("Empty message")
}
// 发送响应
_, err := s.conn.WriteToUDP(response, clientAddr)
if err != nil {
fmt.Printf("发送响应失败: %v\n", err)
}
}
func (s *UDPServer) Stop() error {
s.cancel()
if s.conn != nil {
s.conn.Close()
}
s.wg.Wait()
return nil
}
func demonstrateConcurrentUDPServer() {
server := NewUDPServer(":8080")
// 注册处理器
server.RegisterHandler("PING", func(data []byte, addr *net.UDPAddr) []byte {
return []byte("PONG")
})
server.RegisterHandler("TIME", func(data []byte, addr *net.UDPAddr) []byte {
return []byte(time.Now().Format("2006-01-02 15:04:05"))
})
server.RegisterHandler("ECHO", func(data []byte, addr *net.UDPAddr) []byte {
return []byte(fmt.Sprintf("Echo: %s", string(data)))
})
err := server.Start()
if err != nil {
fmt.Printf("启动服务器失败: %v\n", err)
return
}
// 运行服务器
fmt.Println("服务器运行中,按 Ctrl+C 停止")
time.Sleep(30 * time.Second)
server.Stop()
}
UDP 广播和组播 #
UDP 广播 #
func udpBroadcast() {
// 创建广播地址
broadcastAddr, err := net.ResolveUDPAddr("udp", "255.255.255.255:8080")
if err != nil {
fmt.Printf("解析广播地址失败: %v\n", err)
return
}
// 创建 UDP 连接
conn, err := net.DialUDP("udp", nil, broadcastAddr)
if err != nil {
fmt.Printf("创建连接失败: %v\n", err)
return
}
defer conn.Close()
// 发送广播消息
message := "Broadcast message from Go!"
_, err = conn.Write([]byte(message))
if err != nil {
fmt.Printf("发送广播失败: %v\n", err)
return
}
fmt.Println("广播消息发送成功")
}
func udpBroadcastServer() {
// 监听广播
addr, err := net.ResolveUDPAddr("udp", ":8080")
if err != nil {
fmt.Printf("解析地址失败: %v\n", err)
return
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
fmt.Printf("监听失败: %v\n", err)
return
}
defer conn.Close()
fmt.Println("广播服务器启动,监听端口 8080")
buffer := make([]byte, 1024)
for {
n, clientAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
fmt.Printf("接收广播失败: %v\n", err)
continue
}
message := string(buffer[:n])
fmt.Printf("收到广播消息 [%s]: %s\n", clientAddr, message)
}
}
UDP 组播 #
import (
"golang.org/x/net/ipv4"
)
func udpMulticast() {
// 组播地址
multicastAddr, err := net.ResolveUDPAddr("udp", "224.0.0.1:8080")
if err != nil {
fmt.Printf("解析组播地址失败: %v\n", err)
return
}
// 创建连接
conn, err := net.DialUDP("udp", nil, multicastAddr)
if err != nil {
fmt.Printf("创建连接失败: %v\n", err)
return
}
defer conn.Close()
// 发送组播消息
message := "Multicast message from Go!"
_, err = conn.Write([]byte(message))
if err != nil {
fmt.Printf("发送组播失败: %v\n", err)
return
}
fmt.Println("组播消息发送成功")
}
func udpMulticastServer() {
// 监听组播
addr, err := net.ResolveUDPAddr("udp", "224.0.0.1:8080")
if err != nil {
fmt.Printf("解析地址失败: %v\n", err)
return
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
fmt.Printf("监听失败: %v\n", err)
return
}
defer conn.Close()
// 加入组播组
pc := ipv4.NewPacketConn(conn)
defer pc.Close()
// 获取网络接口
ifi, err := net.InterfaceByName("eth0") // 根据实际情况修改
if err != nil {
fmt.Printf("获取网络接口失败: %v\n", err)
return
}
err = pc.JoinGroup(ifi, &net.UDPAddr{IP: net.IPv4(224, 0, 0, 1)})
if err != nil {
fmt.Printf("加入组播组失败: %v\n", err)
return
}
fmt.Println("组播服务器启动,已加入组播组")
buffer := make([]byte, 1024)
for {
n, clientAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
fmt.Printf("接收组播失败: %v\n", err)
continue
}
message := string(buffer[:n])
fmt.Printf("收到组播消息 [%s]: %s\n", clientAddr, message)
}
}
可靠 UDP 实现 #
带确认的 UDP #
type ReliableUDP struct {
conn *net.UDPConn
timeout time.Duration
maxRetries int
seqNum uint32
mutex sync.Mutex
}
type UDPPacket struct {
SeqNum uint32
Type uint8 // 0: 数据, 1: 确认
Data []byte
}
func NewReliableUDP(localAddr string) (*ReliableUDP, error) {
addr, err := net.ResolveUDPAddr("udp", localAddr)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
return &ReliableUDP{
conn: conn,
timeout: 3 * time.Second,
maxRetries: 3,
seqNum: 0,
}, nil
}
func (r *ReliableUDP) SendReliable(data []byte, remoteAddr *net.UDPAddr) error {
r.mutex.Lock()
r.seqNum++
seqNum := r.seqNum
r.mutex.Unlock()
packet := UDPPacket{
SeqNum: seqNum,
Type: 0, // 数据包
Data: data,
}
packetData := r.serializePacket(packet)
for attempt := 0; attempt < r.maxRetries; attempt++ {
// 发送数据包
_, err := r.conn.WriteToUDP(packetData, remoteAddr)
if err != nil {
return err
}
// 等待确认
r.conn.SetReadDeadline(time.Now().Add(r.timeout))
buffer := make([]byte, 1024)
n, addr, err := r.conn.ReadFromUDP(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
fmt.Printf("超时,重试 %d/%d\n", attempt+1, r.maxRetries)
continue
}
return err
}
// 检查是否是期望的确认包
ackPacket := r.deserializePacket(buffer[:n])
if ackPacket.Type == 1 && ackPacket.SeqNum == seqNum && addr.String() == remoteAddr.String() {
return nil // 收到确认
}
}
return fmt.Errorf("发送失败,已达到最大重试次数")
}
func (r *ReliableUDP) ReceiveReliable() ([]byte, *net.UDPAddr, error) {
buffer := make([]byte, 1024)
n, addr, err := r.conn.ReadFromUDP(buffer)
if err != nil {
return nil, nil, err
}
packet := r.deserializePacket(buffer[:n])
if packet.Type == 0 { // 数据包
// 发送确认
ackPacket := UDPPacket{
SeqNum: packet.SeqNum,
Type: 1, // 确认包
Data: nil,
}
ackData := r.serializePacket(ackPacket)
r.conn.WriteToUDP(ackData, addr)
return packet.Data, addr, nil
}
return nil, nil, fmt.Errorf("收到非数据包")
}
func (r *ReliableUDP) serializePacket(packet UDPPacket) []byte {
// 简单的序列化:4字节序列号 + 1字节类型 + 数据
result := make([]byte, 5+len(packet.Data))
result[0] = byte(packet.SeqNum >> 24)
result[1] = byte(packet.SeqNum >> 16)
result[2] = byte(packet.SeqNum >> 8)
result[3] = byte(packet.SeqNum)
result[4] = packet.Type
copy(result[5:], packet.Data)
return result
}
func (r *ReliableUDP) deserializePacket(data []byte) UDPPacket {
if len(data) < 5 {
return UDPPacket{}
}
seqNum := uint32(data[0])<<24 | uint32(data[1])<<16 | uint32(data[2])<<8 | uint32(data[3])
packetType := data[4]
packetData := data[5:]
return UDPPacket{
SeqNum: seqNum,
Type: packetType,
Data: packetData,
}
}
func (r *ReliableUDP) Close() error {
return r.conn.Close()
}
func demonstrateReliableUDP() {
// 服务器端
go func() {
server, err := NewReliableUDP(":8080")
if err != nil {
fmt.Printf("创建服务器失败: %v\n", err)
return
}
defer server.Close()
fmt.Println("可靠 UDP 服务器启动")
for {
data, addr, err := server.ReceiveReliable()
if err != nil {
fmt.Printf("接收失败: %v\n", err)
continue
}
fmt.Printf("收到可靠消息 [%s]: %s\n", addr, string(data))
}
}()
time.Sleep(1 * time.Second)
// 客户端
client, err := NewReliableUDP(":0")
if err != nil {
fmt.Printf("创建客户端失败: %v\n", err)
return
}
defer client.Close()
serverAddr, _ := net.ResolveUDPAddr("udp", "localhost:8080")
err = client.SendReliable([]byte("Hello, Reliable UDP!"), serverAddr)
if err != nil {
fmt.Printf("发送失败: %v\n", err)
} else {
fmt.Println("可靠消息发送成功")
}
time.Sleep(2 * time.Second)
}
UDP 性能优化 #
批量发送优化 #
type UDPBatcher struct {
conn *net.UDPConn
buffer [][]byte
addresses []*net.UDPAddr
batchSize int
mutex sync.Mutex
}
func NewUDPBatcher(conn *net.UDPConn, batchSize int) *UDPBatcher {
return &UDPBatcher{
conn: conn,
buffer: make([][]byte, 0, batchSize),
addresses: make([]*net.UDPAddr, 0, batchSize),
batchSize: batchSize,
}
}
func (b *UDPBatcher) Add(data []byte, addr *net.UDPAddr) error {
b.mutex.Lock()
defer b.mutex.Unlock()
b.buffer = append(b.buffer, data)
b.addresses = append(b.addresses, addr)
if len(b.buffer) >= b.batchSize {
return b.flush()
}
return nil
}
func (b *UDPBatcher) flush() error {
for i, data := range b.buffer {
_, err := b.conn.WriteToUDP(data, b.addresses[i])
if err != nil {
return err
}
}
// 清空缓冲区
b.buffer = b.buffer[:0]
b.addresses = b.addresses[:0]
return nil
}
func (b *UDPBatcher) Flush() error {
b.mutex.Lock()
defer b.mutex.Unlock()
return b.flush()
}
零拷贝优化 #
func udpZeroCopy(conn *net.UDPConn, data []byte, addr *net.UDPAddr) error {
// 使用 WriteMsgUDP 进行更高效的发送
if udpConn, ok := conn.(*net.UDPConn); ok {
_, _, err := udpConn.WriteMsgUDP(data, nil, addr)
return err
}
return fmt.Errorf("不支持 WriteMsgUDP")
}
错误处理和监控 #
UDP 错误处理 #
func handleUDPErrors(err error) {
if err == nil {
return
}
if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() {
fmt.Println("UDP 操作超时")
} else if netErr.Temporary() {
fmt.Println("UDP 临时错误,可以重试")
} else {
fmt.Printf("UDP 网络错误: %v\n", err)
}
} else {
fmt.Printf("UDP 其他错误: %v\n", err)
}
}
UDP 连接监控 #
type UDPMonitor struct {
packetsSent int64
packetsReceived int64
bytesSent int64
bytesReceived int64
errors int64
mutex sync.RWMutex
}
func (m *UDPMonitor) RecordSent(bytes int) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.packetsSent++
m.bytesSent += int64(bytes)
}
func (m *UDPMonitor) RecordReceived(bytes int) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.packetsReceived++
m.bytesReceived += int64(bytes)
}
func (m *UDPMonitor) RecordError() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.errors++
}
func (m *UDPMonitor) GetStats() (int64, int64, int64, int64, int64) {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.packetsSent, m.packetsReceived, m.bytesSent, m.bytesReceived, m.errors
}
func (m *UDPMonitor) PrintStats() {
sent, recv, bytesSent, bytesRecv, errors := m.GetStats()
fmt.Printf("UDP 统计: 发送包=%d, 接收包=%d, 发送字节=%d, 接收字节=%d, 错误=%d\n",
sent, recv, bytesSent, bytesRecv, errors)
}
实际应用示例 #
UDP 心跳检测 #
type UDPHeartbeat struct {
conn *net.UDPConn
peers map[string]time.Time
interval time.Duration
timeout time.Duration
mutex sync.RWMutex
stopChan chan struct{}
}
func NewUDPHeartbeat(localAddr string, interval, timeout time.Duration) (*UDPHeartbeat, error) {
addr, err := net.ResolveUDPAddr("udp", localAddr)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
return &UDPHeartbeat{
conn: conn,
peers: make(map[string]time.Time),
interval: interval,
timeout: timeout,
stopChan: make(chan struct{}),
}, nil
}
func (h *UDPHeartbeat) Start() {
go h.sendHeartbeats()
go h.receiveHeartbeats()
go h.checkTimeouts()
}
func (h *UDPHeartbeat) sendHeartbeats() {
ticker := time.NewTicker(h.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mutex.RLock()
for peerAddr := range h.peers {
addr, _ := net.ResolveUDPAddr("udp", peerAddr)
h.conn.WriteToUDP([]byte("HEARTBEAT"), addr)
}
h.mutex.RUnlock()
case <-h.stopChan:
return
}
}
}
func (h *UDPHeartbeat) receiveHeartbeats() {
buffer := make([]byte, 1024)
for {
select {
case <-h.stopChan:
return
default:
h.conn.SetReadDeadline(time.Now().Add(1 * time.Second))
n, addr, err := h.conn.ReadFromUDP(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue
}
fmt.Printf("接收心跳失败: %v\n", err)
continue
}
message := string(buffer[:n])
if message == "HEARTBEAT" {
h.mutex.Lock()
h.peers[addr.String()] = time.Now()
h.mutex.Unlock()
}
}
}
}
func (h *UDPHeartbeat) checkTimeouts() {
ticker := time.NewTicker(h.timeout / 2)
defer ticker.Stop()
for {
select {
case <-ticker.C:
now := time.Now()
h.mutex.Lock()
for peerAddr, lastSeen := range h.peers {
if now.Sub(lastSeen) > h.timeout {
fmt.Printf("节点超时: %s\n", peerAddr)
delete(h.peers, peerAddr)
}
}
h.mutex.Unlock()
case <-h.stopChan:
return
}
}
}
func (h *UDPHeartbeat) AddPeer(peerAddr string) {
h.mutex.Lock()
defer h.mutex.Unlock()
h.peers[peerAddr] = time.Now()
}
func (h *UDPHeartbeat) Stop() {
close(h.stopChan)
h.conn.Close()
}
小结 #
本节详细介绍了 Go 语言中的 UDP 编程基础,包括:
- UDP 协议特性 - 无连接、不可靠但高效的传输特点
- UDP 客户端 - 基础客户端、无连接客户端、高级客户端实现
- UDP 服务器 - 基础服务器、并发服务器、命令处理
- 广播和组播 - UDP 的一对多通信能力
- 可靠 UDP - 在 UDP 基础上实现可靠传输机制
- 性能优化 - 批量处理、零拷贝等优化技术
- 实际应用 - 心跳检测等实用场景
掌握这些 UDP 编程技术后,你就能够根据应用需求选择合适的传输协议,构建高效的网络应用程序。在下一节中,我们将学习基于 net/http 包的 HTTP 服务器开发。