4.3.2 UDP 广播与组播 #
UDP 广播和组播是实现一对多通信的重要技术。广播允许向网络中的所有主机发送数据,而组播则允许向特定的主机组发送数据。本节将深入探讨这两种技术的原理、实现和应用场景。
广播技术 #
广播基础概念 #
广播是指向网络中所有主机发送数据包的通信方式。根据广播范围,可以分为:
- 有限广播 - 255.255.255.255,仅在本地网络传播
- 直接广播 - 如 192.168.1.255,向特定网络的所有主机广播
- 子网广播 - 向特定子网的所有主机广播
基础广播实现 #
package main
import (
"fmt"
"net"
"time"
)
type BroadcastSender struct {
conn *net.UDPConn
addr *net.UDPAddr
}
func NewBroadcastSender(port int) (*BroadcastSender, error) {
// 创建广播地址
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("255.255.255.255:%d", port))
if err != nil {
return nil, err
}
// 创建 UDP 连接
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
return nil, err
}
return &BroadcastSender{
conn: conn,
addr: addr,
}, nil
}
func (bs *BroadcastSender) SendMessage(message string) error {
_, err := bs.conn.Write([]byte(message))
return err
}
func (bs *BroadcastSender) Close() error {
return bs.conn.Close()
}
// 广播接收器
type BroadcastReceiver struct {
conn *net.UDPConn
port int
}
func NewBroadcastReceiver(port int) (*BroadcastReceiver, error) {
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", port))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
return &BroadcastReceiver{
conn: conn,
port: port,
}, nil
}
func (br *BroadcastReceiver) Listen(handler func(message string, from *net.UDPAddr)) error {
buffer := make([]byte, 1024)
for {
n, addr, err := br.conn.ReadFromUDP(buffer)
if err != nil {
return err
}
message := string(buffer[:n])
go handler(message, addr)
}
}
func (br *BroadcastReceiver) Close() error {
return br.conn.Close()
}
func demonstrateBroadcast() {
// 启动接收器
go func() {
receiver, err := NewBroadcastReceiver(8080)
if err != nil {
fmt.Printf("创建广播接收器失败: %v\n", err)
return
}
defer receiver.Close()
fmt.Println("广播接收器启动,监听端口 8080")
receiver.Listen(func(message string, from *net.UDPAddr) {
fmt.Printf("收到广播消息 [%s]: %s\n", from, message)
})
}()
time.Sleep(1 * time.Second)
// 发送广播消息
sender, err := NewBroadcastSender(8080)
if err != nil {
fmt.Printf("创建广播发送器失败: %v\n", err)
return
}
defer sender.Close()
for i := 0; i < 5; i++ {
message := fmt.Sprintf("广播消息 #%d", i+1)
err := sender.SendMessage(message)
if err != nil {
fmt.Printf("发送广播消息失败: %v\n", err)
} else {
fmt.Printf("发送广播消息: %s\n", message)
}
time.Sleep(2 * time.Second)
}
}
高级广播实现 #
import (
"context"
"sync"
"time"
)
type AdvancedBroadcaster struct {
conn *net.UDPConn
networks []string
interval time.Duration
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
messageQueue chan BroadcastMessage
}
type BroadcastMessage struct {
Content string
Networks []string // 指定广播网络,空则广播到所有网络
Priority int // 消息优先级
Timestamp time.Time
}
func NewAdvancedBroadcaster(port int, networks []string) (*AdvancedBroadcaster, error) {
// 创建本地 UDP 连接用于发送
localAddr, err := net.ResolveUDPAddr("udp", ":0")
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", localAddr)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
ab := &AdvancedBroadcaster{
conn: conn,
networks: networks,
interval: 100 * time.Millisecond,
ctx: ctx,
cancel: cancel,
messageQueue: make(chan BroadcastMessage, 100),
}
// 启动消息处理协程
ab.wg.Add(1)
go ab.messageProcessor()
return ab, nil
}
func (ab *AdvancedBroadcaster) messageProcessor() {
defer ab.wg.Done()
ticker := time.NewTicker(ab.interval)
defer ticker.Stop()
var pendingMessages []BroadcastMessage
for {
select {
case <-ab.ctx.Done():
return
case msg := <-ab.messageQueue:
pendingMessages = append(pendingMessages, msg)
case <-ticker.C:
if len(pendingMessages) > 0 {
ab.processPendingMessages(pendingMessages)
pendingMessages = pendingMessages[:0]
}
}
}
}
func (ab *AdvancedBroadcaster) processPendingMessages(messages []BroadcastMessage) {
// 按优先级排序
for i := 0; i < len(messages)-1; i++ {
for j := i + 1; j < len(messages); j++ {
if messages[i].Priority < messages[j].Priority {
messages[i], messages[j] = messages[j], messages[i]
}
}
}
// 发送消息
for _, msg := range messages {
ab.sendMessage(msg)
}
}
func (ab *AdvancedBroadcaster) sendMessage(msg BroadcastMessage) {
networks := msg.Networks
if len(networks) == 0 {
networks = ab.networks
}
for _, network := range networks {
addr, err := net.ResolveUDPAddr("udp", network)
if err != nil {
fmt.Printf("解析广播地址失败 %s: %v\n", network, err)
continue
}
_, err = ab.conn.WriteToUDP([]byte(msg.Content), addr)
if err != nil {
fmt.Printf("发送广播消息失败 %s: %v\n", network, err)
} else {
fmt.Printf("广播消息到 %s: %s\n", network, msg.Content)
}
}
}
func (ab *AdvancedBroadcaster) Broadcast(content string, priority int) error {
msg := BroadcastMessage{
Content: content,
Priority: priority,
Timestamp: time.Now(),
}
select {
case ab.messageQueue <- msg:
return nil
default:
return fmt.Errorf("消息队列已满")
}
}
func (ab *AdvancedBroadcaster) BroadcastToNetworks(content string, networks []string, priority int) error {
msg := BroadcastMessage{
Content: content,
Networks: networks,
Priority: priority,
Timestamp: time.Now(),
}
select {
case ab.messageQueue <- msg:
return nil
default:
return fmt.Errorf("消息队列已满")
}
}
func (ab *AdvancedBroadcaster) Close() error {
ab.cancel()
ab.wg.Wait()
return ab.conn.Close()
}
组播技术 #
组播基础概念 #
组播(Multicast)使用特殊的 IP 地址范围(224.0.0.0 到 239.255.255.255)来标识组播组。主机可以加入或离开组播组,只有组成员才能接收组播数据。
基础组播实现 #
import (
"golang.org/x/net/ipv4"
)
type MulticastSender struct {
conn *net.UDPConn
group *net.UDPAddr
}
func NewMulticastSender(groupAddr string) (*MulticastSender, error) {
group, err := net.ResolveUDPAddr("udp", groupAddr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, group)
if err != nil {
return nil, err
}
return &MulticastSender{
conn: conn,
group: group,
}, nil
}
func (ms *MulticastSender) SendMessage(message string) error {
_, err := ms.conn.Write([]byte(message))
return err
}
func (ms *MulticastSender) Close() error {
return ms.conn.Close()
}
// 组播接收器
type MulticastReceiver struct {
conn *net.UDPConn
group *net.UDPAddr
packetConn *ipv4.PacketConn
}
func NewMulticastReceiver(groupAddr, interfaceName string) (*MulticastReceiver, error) {
group, err := net.ResolveUDPAddr("udp", groupAddr)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", group)
if err != nil {
return nil, err
}
// 创建 IPv4 包连接
packetConn := ipv4.NewPacketConn(conn)
// 获取网络接口
var ifi *net.Interface
if interfaceName != "" {
ifi, err = net.InterfaceByName(interfaceName)
if err != nil {
conn.Close()
return nil, err
}
}
// 加入组播组
err = packetConn.JoinGroup(ifi, &net.UDPAddr{IP: group.IP})
if err != nil {
conn.Close()
return nil, err
}
return &MulticastReceiver{
conn: conn,
group: group,
packetConn: packetConn,
}, nil
}
func (mr *MulticastReceiver) Listen(handler func(message string, from net.Addr)) error {
buffer := make([]byte, 1024)
for {
n, addr, err := mr.conn.ReadFromUDP(buffer)
if err != nil {
return err
}
message := string(buffer[:n])
go handler(message, addr)
}
}
func (mr *MulticastReceiver) Close() error {
// 离开组播组
mr.packetConn.LeaveGroup(nil, &net.UDPAddr{IP: mr.group.IP})
return mr.conn.Close()
}
func demonstrateMulticast() {
groupAddr := "224.1.1.1:8080"
// 启动接收器
go func() {
receiver, err := NewMulticastReceiver(groupAddr, "")
if err != nil {
fmt.Printf("创建组播接收器失败: %v\n", err)
return
}
defer receiver.Close()
fmt.Printf("组播接收器启动,监听组 %s\n", groupAddr)
receiver.Listen(func(message string, from net.Addr) {
fmt.Printf("收到组播消息 [%s]: %s\n", from, message)
})
}()
time.Sleep(1 * time.Second)
// 发送组播消息
sender, err := NewMulticastSender(groupAddr)
if err != nil {
fmt.Printf("创建组播发送器失败: %v\n", err)
return
}
defer sender.Close()
for i := 0; i < 5; i++ {
message := fmt.Sprintf("组播消息 #%d", i+1)
err := sender.SendMessage(message)
if err != nil {
fmt.Printf("发送组播消息失败: %v\n", err)
} else {
fmt.Printf("发送组播消息: %s\n", message)
}
time.Sleep(2 * time.Second)
}
}
高级组播管理 #
type MulticastManager struct {
groups map[string]*MulticastGroup
interfaces []*net.Interface
mutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
type MulticastGroup struct {
Address *net.UDPAddr
Members map[string]*MulticastMember
Conn *net.UDPConn
PacketConn *ipv4.PacketConn
MessageChan chan MulticastMessage
mutex sync.RWMutex
}
type MulticastMember struct {
ID string
Interface *net.Interface
JoinTime time.Time
LastSeen time.Time
}
type MulticastMessage struct {
GroupAddr string
Content string
TTL int
Timestamp time.Time
}
func NewMulticastManager() (*MulticastManager, error) {
// 获取所有网络接口
interfaces, err := net.Interfaces()
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
return &MulticastManager{
groups: make(map[string]*MulticastGroup),
interfaces: interfaces,
ctx: ctx,
cancel: cancel,
}, nil
}
func (mm *MulticastManager) CreateGroup(groupAddr string) error {
mm.mutex.Lock()
defer mm.mutex.Unlock()
if _, exists := mm.groups[groupAddr]; exists {
return fmt.Errorf("组播组已存在: %s", groupAddr)
}
addr, err := net.ResolveUDPAddr("udp", groupAddr)
if err != nil {
return err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return err
}
packetConn := ipv4.NewPacketConn(conn)
group := &MulticastGroup{
Address: addr,
Members: make(map[string]*MulticastMember),
Conn: conn,
PacketConn: packetConn,
MessageChan: make(chan MulticastMessage, 100),
}
mm.groups[groupAddr] = group
// 启动消息处理协程
go mm.handleGroupMessages(group)
fmt.Printf("创建组播组: %s\n", groupAddr)
return nil
}
func (mm *MulticastManager) JoinGroup(groupAddr, memberID, interfaceName string) error {
mm.mutex.RLock()
group, exists := mm.groups[groupAddr]
mm.mutex.RUnlock()
if !exists {
return fmt.Errorf("组播组不存在: %s", groupAddr)
}
var ifi *net.Interface
var err error
if interfaceName != "" {
ifi, err = net.InterfaceByName(interfaceName)
if err != nil {
return err
}
}
// 加入组播组
err = group.PacketConn.JoinGroup(ifi, &net.UDPAddr{IP: group.Address.IP})
if err != nil {
return err
}
group.mutex.Lock()
group.Members[memberID] = &MulticastMember{
ID: memberID,
Interface: ifi,
JoinTime: time.Now(),
LastSeen: time.Now(),
}
group.mutex.Unlock()
fmt.Printf("成员 %s 加入组播组 %s\n", memberID, groupAddr)
return nil
}
func (mm *MulticastManager) LeaveGroup(groupAddr, memberID string) error {
mm.mutex.RLock()
group, exists := mm.groups[groupAddr]
mm.mutex.RUnlock()
if !exists {
return fmt.Errorf("组播组不存在: %s", groupAddr)
}
group.mutex.Lock()
member, exists := group.Members[memberID]
if exists {
// 离开组播组
group.PacketConn.LeaveGroup(member.Interface, &net.UDPAddr{IP: group.Address.IP})
delete(group.Members, memberID)
fmt.Printf("成员 %s 离开组播组 %s\n", memberID, groupAddr)
}
group.mutex.Unlock()
return nil
}
func (mm *MulticastManager) SendMessage(groupAddr, content string, ttl int) error {
mm.mutex.RLock()
group, exists := mm.groups[groupAddr]
mm.mutex.RUnlock()
if !exists {
return fmt.Errorf("组播组不存在: %s", groupAddr)
}
msg := MulticastMessage{
GroupAddr: groupAddr,
Content: content,
TTL: ttl,
Timestamp: time.Now(),
}
select {
case group.MessageChan <- msg:
return nil
default:
return fmt.Errorf("消息队列已满")
}
}
func (mm *MulticastManager) handleGroupMessages(group *MulticastGroup) {
for {
select {
case <-mm.ctx.Done():
return
case msg := <-group.MessageChan:
mm.sendMulticastMessage(group, msg)
}
}
}
func (mm *MulticastManager) sendMulticastMessage(group *MulticastGroup, msg MulticastMessage) {
// 设置 TTL
if msg.TTL > 0 {
group.PacketConn.SetTTL(msg.TTL)
}
// 发送消息
_, err := group.Conn.WriteToUDP([]byte(msg.Content), group.Address)
if err != nil {
fmt.Printf("发送组播消息失败: %v\n", err)
} else {
fmt.Printf("发送组播消息到 %s: %s\n", group.Address, msg.Content)
}
}
func (mm *MulticastManager) GetGroupStats(groupAddr string) map[string]interface{} {
mm.mutex.RLock()
group, exists := mm.groups[groupAddr]
mm.mutex.RUnlock()
if !exists {
return nil
}
group.mutex.RLock()
defer group.mutex.RUnlock()
stats := map[string]interface{}{
"group_address": groupAddr,
"member_count": len(group.Members),
"members": make([]string, 0, len(group.Members)),
"message_queue": len(group.MessageChan),
}
for memberID := range group.Members {
stats["members"] = append(stats["members"].([]string), memberID)
}
return stats
}
func (mm *MulticastManager) Close() error {
mm.cancel()
mm.mutex.Lock()
defer mm.mutex.Unlock()
for groupAddr, group := range mm.groups {
// 所有成员离开组播组
group.mutex.Lock()
for memberID, member := range group.Members {
group.PacketConn.LeaveGroup(member.Interface, &net.UDPAddr{IP: group.Address.IP})
delete(group.Members, memberID)
}
group.mutex.Unlock()
// 关闭连接
group.Conn.Close()
close(group.MessageChan)
delete(mm.groups, groupAddr)
}
return nil
}
服务发现系统 #
基于广播的服务发现 #
type ServiceDiscovery struct {
services map[string]*ServiceInfo
broadcaster *AdvancedBroadcaster
receiver *BroadcastReceiver
port int
mutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
type ServiceInfo struct {
Name string `json:"name"`
Address string `json:"address"`
Port int `json:"port"`
Metadata map[string]string `json:"metadata"`
LastSeen time.Time `json:"last_seen"`
TTL time.Duration `json:"ttl"`
}
type ServiceMessage struct {
Type string `json:"type"` // "announce", "query", "response"
Service *ServiceInfo `json:"service,omitempty"`
Query string `json:"query,omitempty"`
}
func NewServiceDiscovery(port int) (*ServiceDiscovery, error) {
ctx, cancel := context.WithCancel(context.Background())
broadcaster, err := NewAdvancedBroadcaster(port, []string{fmt.Sprintf("255.255.255.255:%d", port)})
if err != nil {
cancel()
return nil, err
}
receiver, err := NewBroadcastReceiver(port)
if err != nil {
broadcaster.Close()
cancel()
return nil, err
}
sd := &ServiceDiscovery{
services: make(map[string]*ServiceInfo),
broadcaster: broadcaster,
receiver: receiver,
port: port,
ctx: ctx,
cancel: cancel,
}
// 启动接收协程
go sd.receiveMessages()
// 启动清理协程
go sd.cleanupExpiredServices()
return sd, nil
}
func (sd *ServiceDiscovery) RegisterService(name, address string, port int, metadata map[string]string, ttl time.Duration) error {
service := &ServiceInfo{
Name: name,
Address: address,
Port: port,
Metadata: metadata,
LastSeen: time.Now(),
TTL: ttl,
}
sd.mutex.Lock()
sd.services[name] = service
sd.mutex.Unlock()
// 广播服务注册
return sd.announceService(service)
}
func (sd *ServiceDiscovery) announceService(service *ServiceInfo) error {
msg := ServiceMessage{
Type: "announce",
Service: service,
}
data, err := json.Marshal(msg)
if err != nil {
return err
}
return sd.broadcaster.Broadcast(string(data), 1)
}
func (sd *ServiceDiscovery) QueryService(serviceName string) ([]*ServiceInfo, error) {
// 发送查询请求
msg := ServiceMessage{
Type: "query",
Query: serviceName,
}
data, err := json.Marshal(msg)
if err != nil {
return nil, err
}
err = sd.broadcaster.Broadcast(string(data), 2)
if err != nil {
return nil, err
}
// 等待响应
time.Sleep(1 * time.Second)
// 返回本地缓存的服务
sd.mutex.RLock()
defer sd.mutex.RUnlock()
var services []*ServiceInfo
for name, service := range sd.services {
if serviceName == "" || name == serviceName {
if time.Since(service.LastSeen) <= service.TTL {
services = append(services, service)
}
}
}
return services, nil
}
func (sd *ServiceDiscovery) receiveMessages() {
sd.receiver.Listen(func(message string, from *net.UDPAddr) {
var msg ServiceMessage
if err := json.Unmarshal([]byte(message), &msg); err != nil {
return
}
switch msg.Type {
case "announce":
sd.handleServiceAnnounce(msg.Service)
case "query":
sd.handleServiceQuery(msg.Query, from)
case "response":
sd.handleServiceResponse(msg.Service)
}
})
}
func (sd *ServiceDiscovery) handleServiceAnnounce(service *ServiceInfo) {
if service == nil {
return
}
sd.mutex.Lock()
defer sd.mutex.Unlock()
service.LastSeen = time.Now()
sd.services[service.Name] = service
fmt.Printf("发现服务: %s @ %s:%d\n", service.Name, service.Address, service.Port)
}
func (sd *ServiceDiscovery) handleServiceQuery(query string, from *net.UDPAddr) {
sd.mutex.RLock()
defer sd.mutex.RUnlock()
for name, service := range sd.services {
if query == "" || name == query {
if time.Since(service.LastSeen) <= service.TTL {
// 发送响应
msg := ServiceMessage{
Type: "response",
Service: service,
}
data, _ := json.Marshal(msg)
sd.broadcaster.Broadcast(string(data), 0)
}
}
}
}
func (sd *ServiceDiscovery) handleServiceResponse(service *ServiceInfo) {
sd.handleServiceAnnounce(service)
}
func (sd *ServiceDiscovery) cleanupExpiredServices() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-sd.ctx.Done():
return
case <-ticker.C:
sd.mutex.Lock()
now := time.Now()
for name, service := range sd.services {
if now.Sub(service.LastSeen) > service.TTL {
delete(sd.services, name)
fmt.Printf("服务过期: %s\n", name)
}
}
sd.mutex.Unlock()
}
}
}
func (sd *ServiceDiscovery) GetAllServices() map[string]*ServiceInfo {
sd.mutex.RLock()
defer sd.mutex.RUnlock()
services := make(map[string]*ServiceInfo)
now := time.Now()
for name, service := range sd.services {
if now.Sub(service.LastSeen) <= service.TTL {
services[name] = service
}
}
return services
}
func (sd *ServiceDiscovery) Close() error {
sd.cancel()
sd.broadcaster.Close()
return sd.receiver.Close()
}
实时数据分发系统 #
基于组播的数据分发 #
type DataDistributor struct {
manager *MulticastManager
publishers map[string]*Publisher
subscribers map[string]*Subscriber
mutex sync.RWMutex
}
type Publisher struct {
ID string
GroupAddr string
DataChan chan []byte
ctx context.Context
cancel context.CancelFunc
}
type Subscriber struct {
ID string
GroupAddr string
Handler func(data []byte)
ctx context.Context
cancel context.CancelFunc
}
func NewDataDistributor() (*DataDistributor, error) {
manager, err := NewMulticastManager()
if err != nil {
return nil, err
}
return &DataDistributor{
manager: manager,
publishers: make(map[string]*Publisher),
subscribers: make(map[string]*Subscriber),
}, nil
}
func (dd *DataDistributor) CreateTopic(topicName string) error {
groupAddr := fmt.Sprintf("224.1.1.%d:8080", hash(topicName)%254+1)
return dd.manager.CreateGroup(groupAddr)
}
func (dd *DataDistributor) CreatePublisher(publisherID, topicName string) error {
groupAddr := fmt.Sprintf("224.1.1.%d:8080", hash(topicName)%254+1)
ctx, cancel := context.WithCancel(context.Background())
publisher := &Publisher{
ID: publisherID,
GroupAddr: groupAddr,
DataChan: make(chan []byte, 100),
ctx: ctx,
cancel: cancel,
}
dd.mutex.Lock()
dd.publishers[publisherID] = publisher
dd.mutex.Unlock()
// 启动发布协程
go dd.publishLoop(publisher)
return nil
}
func (dd *DataDistributor) CreateSubscriber(subscriberID, topicName string, handler func(data []byte)) error {
groupAddr := fmt.Sprintf("224.1.1.%d:8080", hash(topicName)%254+1)
ctx, cancel := context.WithCancel(context.Background())
subscriber := &Subscriber{
ID: subscriberID,
GroupAddr: groupAddr,
Handler: handler,
ctx: ctx,
cancel: cancel,
}
dd.mutex.Lock()
dd.subscribers[subscriberID] = subscriber
dd.mutex.Unlock()
// 加入组播组
err := dd.manager.JoinGroup(groupAddr, subscriberID, "")
if err != nil {
return err
}
// 启动订阅协程
go dd.subscribeLoop(subscriber)
return nil
}
func (dd *DataDistributor) Publish(publisherID string, data []byte) error {
dd.mutex.RLock()
publisher, exists := dd.publishers[publisherID]
dd.mutex.RUnlock()
if !exists {
return fmt.Errorf("发布者不存在: %s", publisherID)
}
select {
case publisher.DataChan <- data:
return nil
default:
return fmt.Errorf("发布队列已满")
}
}
func (dd *DataDistributor) publishLoop(publisher *Publisher) {
for {
select {
case <-publisher.ctx.Done():
return
case data := <-publisher.DataChan:
err := dd.manager.SendMessage(publisher.GroupAddr, string(data), 32)
if err != nil {
fmt.Printf("发布数据失败: %v\n", err)
}
}
}
}
func (dd *DataDistributor) subscribeLoop(subscriber *Subscriber) {
// 这里需要实现组播数据接收逻辑
// 由于组播接收比较复杂,这里简化处理
fmt.Printf("订阅者 %s 开始监听组播组 %s\n", subscriber.ID, subscriber.GroupAddr)
}
func hash(s string) int {
h := 0
for _, c := range s {
h = 31*h + int(c)
}
if h < 0 {
h = -h
}
return h
}
func (dd *DataDistributor) Close() error {
// 关闭所有发布者
dd.mutex.Lock()
for _, publisher := range dd.publishers {
publisher.cancel()
close(publisher.DataChan)
}
// 关闭所有订阅者
for _, subscriber := range dd.subscribers {
subscriber.cancel()
dd.manager.LeaveGroup(subscriber.GroupAddr, subscriber.ID)
}
dd.mutex.Unlock()
return dd.manager.Close()
}
性能监控和调试 #
网络流量监控 #
type NetworkMonitor struct {
interfaces map[string]*InterfaceStats
mutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
type InterfaceStats struct {
Name string
PacketsSent int64
PacketsReceived int64
BytesSent int64
BytesReceived int64
Errors int64
LastUpdate time.Time
}
func NewNetworkMonitor() *NetworkMonitor {
ctx, cancel := context.WithCancel(context.Background())
nm := &NetworkMonitor{
interfaces: make(map[string]*InterfaceStats),
ctx: ctx,
cancel: cancel,
}
go nm.monitorLoop()
return nm
}
func (nm *NetworkMonitor) monitorLoop() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-nm.ctx.Done():
return
case <-ticker.C:
nm.updateStats()
}
}
}
func (nm *NetworkMonitor) updateStats() {
interfaces, err := net.Interfaces()
if err != nil {
return
}
nm.mutex.Lock()
defer nm.mutex.Unlock()
for _, iface := range interfaces {
if iface.Flags&net.FlagUp == 0 {
continue
}
stats, exists := nm.interfaces[iface.Name]
if !exists {
stats = &InterfaceStats{
Name: iface.Name,
}
nm.interfaces[iface.Name] = stats
}
// 这里应该读取系统的网络统计信息
// 由于 Go 标准库没有直接提供,这里模拟数据
stats.LastUpdate = time.Now()
}
}
func (nm *NetworkMonitor) GetStats() map[string]*InterfaceStats {
nm.mutex.RLock()
defer nm.mutex.RUnlock()
stats := make(map[string]*InterfaceStats)
for name, stat := range nm.interfaces {
statCopy := *stat
stats[name] = &statCopy
}
return stats
}
func (nm *NetworkMonitor) Close() {
nm.cancel()
}
小结 #
本节详细介绍了 UDP 广播与组播技术,包括:
- 广播技术 - 基础广播实现和高级广播管理
- 组播技术 - 组播基础概念和高级组播管理
- 服务发现 - 基于广播的服务发现系统
- 数据分发 - 基于组播的实时数据分发系统
- 性能监控 - 网络流量监控和调试工具
掌握这些技术后,你就能够实现高效的一对多网络通信,构建分布式系统中的服务发现、数据分发等关键组件。在下一节中,我们将学习网络协议设计的相关知识。