4.3.2 UDP 广播与组播

4.3.2 UDP 广播与组播 #

UDP 广播和组播是实现一对多通信的重要技术。广播允许向网络中的所有主机发送数据,而组播则允许向特定的主机组发送数据。本节将深入探讨这两种技术的原理、实现和应用场景。

广播技术 #

广播基础概念 #

广播是指向网络中所有主机发送数据包的通信方式。根据广播范围,可以分为:

  • 有限广播 - 255.255.255.255,仅在本地网络传播
  • 直接广播 - 如 192.168.1.255,向特定网络的所有主机广播
  • 子网广播 - 向特定子网的所有主机广播

基础广播实现 #

package main

import (
    "fmt"
    "net"
    "time"
)

type BroadcastSender struct {
    conn *net.UDPConn
    addr *net.UDPAddr
}

func NewBroadcastSender(port int) (*BroadcastSender, error) {
    // 创建广播地址
    addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("255.255.255.255:%d", port))
    if err != nil {
        return nil, err
    }

    // 创建 UDP 连接
    conn, err := net.DialUDP("udp", nil, addr)
    if err != nil {
        return nil, err
    }

    return &BroadcastSender{
        conn: conn,
        addr: addr,
    }, nil
}

func (bs *BroadcastSender) SendMessage(message string) error {
    _, err := bs.conn.Write([]byte(message))
    return err
}

func (bs *BroadcastSender) Close() error {
    return bs.conn.Close()
}

// 广播接收器
type BroadcastReceiver struct {
    conn *net.UDPConn
    port int
}

func NewBroadcastReceiver(port int) (*BroadcastReceiver, error) {
    addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", port))
    if err != nil {
        return nil, err
    }

    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        return nil, err
    }

    return &BroadcastReceiver{
        conn: conn,
        port: port,
    }, nil
}

func (br *BroadcastReceiver) Listen(handler func(message string, from *net.UDPAddr)) error {
    buffer := make([]byte, 1024)

    for {
        n, addr, err := br.conn.ReadFromUDP(buffer)
        if err != nil {
            return err
        }

        message := string(buffer[:n])
        go handler(message, addr)
    }
}

func (br *BroadcastReceiver) Close() error {
    return br.conn.Close()
}

func demonstrateBroadcast() {
    // 启动接收器
    go func() {
        receiver, err := NewBroadcastReceiver(8080)
        if err != nil {
            fmt.Printf("创建广播接收器失败: %v\n", err)
            return
        }
        defer receiver.Close()

        fmt.Println("广播接收器启动,监听端口 8080")

        receiver.Listen(func(message string, from *net.UDPAddr) {
            fmt.Printf("收到广播消息 [%s]: %s\n", from, message)
        })
    }()

    time.Sleep(1 * time.Second)

    // 发送广播消息
    sender, err := NewBroadcastSender(8080)
    if err != nil {
        fmt.Printf("创建广播发送器失败: %v\n", err)
        return
    }
    defer sender.Close()

    for i := 0; i < 5; i++ {
        message := fmt.Sprintf("广播消息 #%d", i+1)
        err := sender.SendMessage(message)
        if err != nil {
            fmt.Printf("发送广播消息失败: %v\n", err)
        } else {
            fmt.Printf("发送广播消息: %s\n", message)
        }
        time.Sleep(2 * time.Second)
    }
}

高级广播实现 #

import (
    "context"
    "sync"
    "time"
)

type AdvancedBroadcaster struct {
    conn        *net.UDPConn
    networks    []string
    interval    time.Duration
    ctx         context.Context
    cancel      context.CancelFunc
    wg          sync.WaitGroup
    messageQueue chan BroadcastMessage
}

type BroadcastMessage struct {
    Content   string
    Networks  []string // 指定广播网络,空则广播到所有网络
    Priority  int      // 消息优先级
    Timestamp time.Time
}

func NewAdvancedBroadcaster(port int, networks []string) (*AdvancedBroadcaster, error) {
    // 创建本地 UDP 连接用于发送
    localAddr, err := net.ResolveUDPAddr("udp", ":0")
    if err != nil {
        return nil, err
    }

    conn, err := net.ListenUDP("udp", localAddr)
    if err != nil {
        return nil, err
    }

    ctx, cancel := context.WithCancel(context.Background())

    ab := &AdvancedBroadcaster{
        conn:         conn,
        networks:     networks,
        interval:     100 * time.Millisecond,
        ctx:          ctx,
        cancel:       cancel,
        messageQueue: make(chan BroadcastMessage, 100),
    }

    // 启动消息处理协程
    ab.wg.Add(1)
    go ab.messageProcessor()

    return ab, nil
}

func (ab *AdvancedBroadcaster) messageProcessor() {
    defer ab.wg.Done()

    ticker := time.NewTicker(ab.interval)
    defer ticker.Stop()

    var pendingMessages []BroadcastMessage

    for {
        select {
        case <-ab.ctx.Done():
            return
        case msg := <-ab.messageQueue:
            pendingMessages = append(pendingMessages, msg)
        case <-ticker.C:
            if len(pendingMessages) > 0 {
                ab.processPendingMessages(pendingMessages)
                pendingMessages = pendingMessages[:0]
            }
        }
    }
}

func (ab *AdvancedBroadcaster) processPendingMessages(messages []BroadcastMessage) {
    // 按优先级排序
    for i := 0; i < len(messages)-1; i++ {
        for j := i + 1; j < len(messages); j++ {
            if messages[i].Priority < messages[j].Priority {
                messages[i], messages[j] = messages[j], messages[i]
            }
        }
    }

    // 发送消息
    for _, msg := range messages {
        ab.sendMessage(msg)
    }
}

func (ab *AdvancedBroadcaster) sendMessage(msg BroadcastMessage) {
    networks := msg.Networks
    if len(networks) == 0 {
        networks = ab.networks
    }

    for _, network := range networks {
        addr, err := net.ResolveUDPAddr("udp", network)
        if err != nil {
            fmt.Printf("解析广播地址失败 %s: %v\n", network, err)
            continue
        }

        _, err = ab.conn.WriteToUDP([]byte(msg.Content), addr)
        if err != nil {
            fmt.Printf("发送广播消息失败 %s: %v\n", network, err)
        } else {
            fmt.Printf("广播消息到 %s: %s\n", network, msg.Content)
        }
    }
}

func (ab *AdvancedBroadcaster) Broadcast(content string, priority int) error {
    msg := BroadcastMessage{
        Content:   content,
        Priority:  priority,
        Timestamp: time.Now(),
    }

    select {
    case ab.messageQueue <- msg:
        return nil
    default:
        return fmt.Errorf("消息队列已满")
    }
}

func (ab *AdvancedBroadcaster) BroadcastToNetworks(content string, networks []string, priority int) error {
    msg := BroadcastMessage{
        Content:   content,
        Networks:  networks,
        Priority:  priority,
        Timestamp: time.Now(),
    }

    select {
    case ab.messageQueue <- msg:
        return nil
    default:
        return fmt.Errorf("消息队列已满")
    }
}

func (ab *AdvancedBroadcaster) Close() error {
    ab.cancel()
    ab.wg.Wait()
    return ab.conn.Close()
}

组播技术 #

组播基础概念 #

组播(Multicast)使用特殊的 IP 地址范围(224.0.0.0 到 239.255.255.255)来标识组播组。主机可以加入或离开组播组,只有组成员才能接收组播数据。

基础组播实现 #

import (
    "golang.org/x/net/ipv4"
)

type MulticastSender struct {
    conn  *net.UDPConn
    group *net.UDPAddr
}

func NewMulticastSender(groupAddr string) (*MulticastSender, error) {
    group, err := net.ResolveUDPAddr("udp", groupAddr)
    if err != nil {
        return nil, err
    }

    conn, err := net.DialUDP("udp", nil, group)
    if err != nil {
        return nil, err
    }

    return &MulticastSender{
        conn:  conn,
        group: group,
    }, nil
}

func (ms *MulticastSender) SendMessage(message string) error {
    _, err := ms.conn.Write([]byte(message))
    return err
}

func (ms *MulticastSender) Close() error {
    return ms.conn.Close()
}

// 组播接收器
type MulticastReceiver struct {
    conn      *net.UDPConn
    group     *net.UDPAddr
    packetConn *ipv4.PacketConn
}

func NewMulticastReceiver(groupAddr, interfaceName string) (*MulticastReceiver, error) {
    group, err := net.ResolveUDPAddr("udp", groupAddr)
    if err != nil {
        return nil, err
    }

    conn, err := net.ListenUDP("udp", group)
    if err != nil {
        return nil, err
    }

    // 创建 IPv4 包连接
    packetConn := ipv4.NewPacketConn(conn)

    // 获取网络接口
    var ifi *net.Interface
    if interfaceName != "" {
        ifi, err = net.InterfaceByName(interfaceName)
        if err != nil {
            conn.Close()
            return nil, err
        }
    }

    // 加入组播组
    err = packetConn.JoinGroup(ifi, &net.UDPAddr{IP: group.IP})
    if err != nil {
        conn.Close()
        return nil, err
    }

    return &MulticastReceiver{
        conn:       conn,
        group:      group,
        packetConn: packetConn,
    }, nil
}

func (mr *MulticastReceiver) Listen(handler func(message string, from net.Addr)) error {
    buffer := make([]byte, 1024)

    for {
        n, addr, err := mr.conn.ReadFromUDP(buffer)
        if err != nil {
            return err
        }

        message := string(buffer[:n])
        go handler(message, addr)
    }
}

func (mr *MulticastReceiver) Close() error {
    // 离开组播组
    mr.packetConn.LeaveGroup(nil, &net.UDPAddr{IP: mr.group.IP})
    return mr.conn.Close()
}

func demonstrateMulticast() {
    groupAddr := "224.1.1.1:8080"

    // 启动接收器
    go func() {
        receiver, err := NewMulticastReceiver(groupAddr, "")
        if err != nil {
            fmt.Printf("创建组播接收器失败: %v\n", err)
            return
        }
        defer receiver.Close()

        fmt.Printf("组播接收器启动,监听组 %s\n", groupAddr)

        receiver.Listen(func(message string, from net.Addr) {
            fmt.Printf("收到组播消息 [%s]: %s\n", from, message)
        })
    }()

    time.Sleep(1 * time.Second)

    // 发送组播消息
    sender, err := NewMulticastSender(groupAddr)
    if err != nil {
        fmt.Printf("创建组播发送器失败: %v\n", err)
        return
    }
    defer sender.Close()

    for i := 0; i < 5; i++ {
        message := fmt.Sprintf("组播消息 #%d", i+1)
        err := sender.SendMessage(message)
        if err != nil {
            fmt.Printf("发送组播消息失败: %v\n", err)
        } else {
            fmt.Printf("发送组播消息: %s\n", message)
        }
        time.Sleep(2 * time.Second)
    }
}

高级组播管理 #

type MulticastManager struct {
    groups      map[string]*MulticastGroup
    interfaces  []*net.Interface
    mutex       sync.RWMutex
    ctx         context.Context
    cancel      context.CancelFunc
}

type MulticastGroup struct {
    Address     *net.UDPAddr
    Members     map[string]*MulticastMember
    Conn        *net.UDPConn
    PacketConn  *ipv4.PacketConn
    MessageChan chan MulticastMessage
    mutex       sync.RWMutex
}

type MulticastMember struct {
    ID        string
    Interface *net.Interface
    JoinTime  time.Time
    LastSeen  time.Time
}

type MulticastMessage struct {
    GroupAddr string
    Content   string
    TTL       int
    Timestamp time.Time
}

func NewMulticastManager() (*MulticastManager, error) {
    // 获取所有网络接口
    interfaces, err := net.Interfaces()
    if err != nil {
        return nil, err
    }

    ctx, cancel := context.WithCancel(context.Background())

    return &MulticastManager{
        groups:     make(map[string]*MulticastGroup),
        interfaces: interfaces,
        ctx:        ctx,
        cancel:     cancel,
    }, nil
}

func (mm *MulticastManager) CreateGroup(groupAddr string) error {
    mm.mutex.Lock()
    defer mm.mutex.Unlock()

    if _, exists := mm.groups[groupAddr]; exists {
        return fmt.Errorf("组播组已存在: %s", groupAddr)
    }

    addr, err := net.ResolveUDPAddr("udp", groupAddr)
    if err != nil {
        return err
    }

    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        return err
    }

    packetConn := ipv4.NewPacketConn(conn)

    group := &MulticastGroup{
        Address:     addr,
        Members:     make(map[string]*MulticastMember),
        Conn:        conn,
        PacketConn:  packetConn,
        MessageChan: make(chan MulticastMessage, 100),
    }

    mm.groups[groupAddr] = group

    // 启动消息处理协程
    go mm.handleGroupMessages(group)

    fmt.Printf("创建组播组: %s\n", groupAddr)
    return nil
}

func (mm *MulticastManager) JoinGroup(groupAddr, memberID, interfaceName string) error {
    mm.mutex.RLock()
    group, exists := mm.groups[groupAddr]
    mm.mutex.RUnlock()

    if !exists {
        return fmt.Errorf("组播组不存在: %s", groupAddr)
    }

    var ifi *net.Interface
    var err error
    if interfaceName != "" {
        ifi, err = net.InterfaceByName(interfaceName)
        if err != nil {
            return err
        }
    }

    // 加入组播组
    err = group.PacketConn.JoinGroup(ifi, &net.UDPAddr{IP: group.Address.IP})
    if err != nil {
        return err
    }

    group.mutex.Lock()
    group.Members[memberID] = &MulticastMember{
        ID:        memberID,
        Interface: ifi,
        JoinTime:  time.Now(),
        LastSeen:  time.Now(),
    }
    group.mutex.Unlock()

    fmt.Printf("成员 %s 加入组播组 %s\n", memberID, groupAddr)
    return nil
}

func (mm *MulticastManager) LeaveGroup(groupAddr, memberID string) error {
    mm.mutex.RLock()
    group, exists := mm.groups[groupAddr]
    mm.mutex.RUnlock()

    if !exists {
        return fmt.Errorf("组播组不存在: %s", groupAddr)
    }

    group.mutex.Lock()
    member, exists := group.Members[memberID]
    if exists {
        // 离开组播组
        group.PacketConn.LeaveGroup(member.Interface, &net.UDPAddr{IP: group.Address.IP})
        delete(group.Members, memberID)
        fmt.Printf("成员 %s 离开组播组 %s\n", memberID, groupAddr)
    }
    group.mutex.Unlock()

    return nil
}

func (mm *MulticastManager) SendMessage(groupAddr, content string, ttl int) error {
    mm.mutex.RLock()
    group, exists := mm.groups[groupAddr]
    mm.mutex.RUnlock()

    if !exists {
        return fmt.Errorf("组播组不存在: %s", groupAddr)
    }

    msg := MulticastMessage{
        GroupAddr: groupAddr,
        Content:   content,
        TTL:       ttl,
        Timestamp: time.Now(),
    }

    select {
    case group.MessageChan <- msg:
        return nil
    default:
        return fmt.Errorf("消息队列已满")
    }
}

func (mm *MulticastManager) handleGroupMessages(group *MulticastGroup) {
    for {
        select {
        case <-mm.ctx.Done():
            return
        case msg := <-group.MessageChan:
            mm.sendMulticastMessage(group, msg)
        }
    }
}

func (mm *MulticastManager) sendMulticastMessage(group *MulticastGroup, msg MulticastMessage) {
    // 设置 TTL
    if msg.TTL > 0 {
        group.PacketConn.SetTTL(msg.TTL)
    }

    // 发送消息
    _, err := group.Conn.WriteToUDP([]byte(msg.Content), group.Address)
    if err != nil {
        fmt.Printf("发送组播消息失败: %v\n", err)
    } else {
        fmt.Printf("发送组播消息到 %s: %s\n", group.Address, msg.Content)
    }
}

func (mm *MulticastManager) GetGroupStats(groupAddr string) map[string]interface{} {
    mm.mutex.RLock()
    group, exists := mm.groups[groupAddr]
    mm.mutex.RUnlock()

    if !exists {
        return nil
    }

    group.mutex.RLock()
    defer group.mutex.RUnlock()

    stats := map[string]interface{}{
        "group_address":  groupAddr,
        "member_count":   len(group.Members),
        "members":        make([]string, 0, len(group.Members)),
        "message_queue":  len(group.MessageChan),
    }

    for memberID := range group.Members {
        stats["members"] = append(stats["members"].([]string), memberID)
    }

    return stats
}

func (mm *MulticastManager) Close() error {
    mm.cancel()

    mm.mutex.Lock()
    defer mm.mutex.Unlock()

    for groupAddr, group := range mm.groups {
        // 所有成员离开组播组
        group.mutex.Lock()
        for memberID, member := range group.Members {
            group.PacketConn.LeaveGroup(member.Interface, &net.UDPAddr{IP: group.Address.IP})
            delete(group.Members, memberID)
        }
        group.mutex.Unlock()

        // 关闭连接
        group.Conn.Close()
        close(group.MessageChan)
        delete(mm.groups, groupAddr)
    }

    return nil
}

服务发现系统 #

基于广播的服务发现 #

type ServiceDiscovery struct {
    services    map[string]*ServiceInfo
    broadcaster *AdvancedBroadcaster
    receiver    *BroadcastReceiver
    port        int
    mutex       sync.RWMutex
    ctx         context.Context
    cancel      context.CancelFunc
}

type ServiceInfo struct {
    Name        string            `json:"name"`
    Address     string            `json:"address"`
    Port        int               `json:"port"`
    Metadata    map[string]string `json:"metadata"`
    LastSeen    time.Time         `json:"last_seen"`
    TTL         time.Duration     `json:"ttl"`
}

type ServiceMessage struct {
    Type    string       `json:"type"` // "announce", "query", "response"
    Service *ServiceInfo `json:"service,omitempty"`
    Query   string       `json:"query,omitempty"`
}

func NewServiceDiscovery(port int) (*ServiceDiscovery, error) {
    ctx, cancel := context.WithCancel(context.Background())

    broadcaster, err := NewAdvancedBroadcaster(port, []string{fmt.Sprintf("255.255.255.255:%d", port)})
    if err != nil {
        cancel()
        return nil, err
    }

    receiver, err := NewBroadcastReceiver(port)
    if err != nil {
        broadcaster.Close()
        cancel()
        return nil, err
    }

    sd := &ServiceDiscovery{
        services:    make(map[string]*ServiceInfo),
        broadcaster: broadcaster,
        receiver:    receiver,
        port:        port,
        ctx:         ctx,
        cancel:      cancel,
    }

    // 启动接收协程
    go sd.receiveMessages()

    // 启动清理协程
    go sd.cleanupExpiredServices()

    return sd, nil
}

func (sd *ServiceDiscovery) RegisterService(name, address string, port int, metadata map[string]string, ttl time.Duration) error {
    service := &ServiceInfo{
        Name:     name,
        Address:  address,
        Port:     port,
        Metadata: metadata,
        LastSeen: time.Now(),
        TTL:      ttl,
    }

    sd.mutex.Lock()
    sd.services[name] = service
    sd.mutex.Unlock()

    // 广播服务注册
    return sd.announceService(service)
}

func (sd *ServiceDiscovery) announceService(service *ServiceInfo) error {
    msg := ServiceMessage{
        Type:    "announce",
        Service: service,
    }

    data, err := json.Marshal(msg)
    if err != nil {
        return err
    }

    return sd.broadcaster.Broadcast(string(data), 1)
}

func (sd *ServiceDiscovery) QueryService(serviceName string) ([]*ServiceInfo, error) {
    // 发送查询请求
    msg := ServiceMessage{
        Type:  "query",
        Query: serviceName,
    }

    data, err := json.Marshal(msg)
    if err != nil {
        return nil, err
    }

    err = sd.broadcaster.Broadcast(string(data), 2)
    if err != nil {
        return nil, err
    }

    // 等待响应
    time.Sleep(1 * time.Second)

    // 返回本地缓存的服务
    sd.mutex.RLock()
    defer sd.mutex.RUnlock()

    var services []*ServiceInfo
    for name, service := range sd.services {
        if serviceName == "" || name == serviceName {
            if time.Since(service.LastSeen) <= service.TTL {
                services = append(services, service)
            }
        }
    }

    return services, nil
}

func (sd *ServiceDiscovery) receiveMessages() {
    sd.receiver.Listen(func(message string, from *net.UDPAddr) {
        var msg ServiceMessage
        if err := json.Unmarshal([]byte(message), &msg); err != nil {
            return
        }

        switch msg.Type {
        case "announce":
            sd.handleServiceAnnounce(msg.Service)
        case "query":
            sd.handleServiceQuery(msg.Query, from)
        case "response":
            sd.handleServiceResponse(msg.Service)
        }
    })
}

func (sd *ServiceDiscovery) handleServiceAnnounce(service *ServiceInfo) {
    if service == nil {
        return
    }

    sd.mutex.Lock()
    defer sd.mutex.Unlock()

    service.LastSeen = time.Now()
    sd.services[service.Name] = service

    fmt.Printf("发现服务: %s @ %s:%d\n", service.Name, service.Address, service.Port)
}

func (sd *ServiceDiscovery) handleServiceQuery(query string, from *net.UDPAddr) {
    sd.mutex.RLock()
    defer sd.mutex.RUnlock()

    for name, service := range sd.services {
        if query == "" || name == query {
            if time.Since(service.LastSeen) <= service.TTL {
                // 发送响应
                msg := ServiceMessage{
                    Type:    "response",
                    Service: service,
                }

                data, _ := json.Marshal(msg)
                sd.broadcaster.Broadcast(string(data), 0)
            }
        }
    }
}

func (sd *ServiceDiscovery) handleServiceResponse(service *ServiceInfo) {
    sd.handleServiceAnnounce(service)
}

func (sd *ServiceDiscovery) cleanupExpiredServices() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-sd.ctx.Done():
            return
        case <-ticker.C:
            sd.mutex.Lock()
            now := time.Now()
            for name, service := range sd.services {
                if now.Sub(service.LastSeen) > service.TTL {
                    delete(sd.services, name)
                    fmt.Printf("服务过期: %s\n", name)
                }
            }
            sd.mutex.Unlock()
        }
    }
}

func (sd *ServiceDiscovery) GetAllServices() map[string]*ServiceInfo {
    sd.mutex.RLock()
    defer sd.mutex.RUnlock()

    services := make(map[string]*ServiceInfo)
    now := time.Now()
    for name, service := range sd.services {
        if now.Sub(service.LastSeen) <= service.TTL {
            services[name] = service
        }
    }

    return services
}

func (sd *ServiceDiscovery) Close() error {
    sd.cancel()
    sd.broadcaster.Close()
    return sd.receiver.Close()
}

实时数据分发系统 #

基于组播的数据分发 #

type DataDistributor struct {
    manager     *MulticastManager
    publishers  map[string]*Publisher
    subscribers map[string]*Subscriber
    mutex       sync.RWMutex
}

type Publisher struct {
    ID       string
    GroupAddr string
    DataChan chan []byte
    ctx      context.Context
    cancel   context.CancelFunc
}

type Subscriber struct {
    ID       string
    GroupAddr string
    Handler  func(data []byte)
    ctx      context.Context
    cancel   context.CancelFunc
}

func NewDataDistributor() (*DataDistributor, error) {
    manager, err := NewMulticastManager()
    if err != nil {
        return nil, err
    }

    return &DataDistributor{
        manager:     manager,
        publishers:  make(map[string]*Publisher),
        subscribers: make(map[string]*Subscriber),
    }, nil
}

func (dd *DataDistributor) CreateTopic(topicName string) error {
    groupAddr := fmt.Sprintf("224.1.1.%d:8080", hash(topicName)%254+1)
    return dd.manager.CreateGroup(groupAddr)
}

func (dd *DataDistributor) CreatePublisher(publisherID, topicName string) error {
    groupAddr := fmt.Sprintf("224.1.1.%d:8080", hash(topicName)%254+1)

    ctx, cancel := context.WithCancel(context.Background())

    publisher := &Publisher{
        ID:       publisherID,
        GroupAddr: groupAddr,
        DataChan: make(chan []byte, 100),
        ctx:      ctx,
        cancel:   cancel,
    }

    dd.mutex.Lock()
    dd.publishers[publisherID] = publisher
    dd.mutex.Unlock()

    // 启动发布协程
    go dd.publishLoop(publisher)

    return nil
}

func (dd *DataDistributor) CreateSubscriber(subscriberID, topicName string, handler func(data []byte)) error {
    groupAddr := fmt.Sprintf("224.1.1.%d:8080", hash(topicName)%254+1)

    ctx, cancel := context.WithCancel(context.Background())

    subscriber := &Subscriber{
        ID:       subscriberID,
        GroupAddr: groupAddr,
        Handler:  handler,
        ctx:      ctx,
        cancel:   cancel,
    }

    dd.mutex.Lock()
    dd.subscribers[subscriberID] = subscriber
    dd.mutex.Unlock()

    // 加入组播组
    err := dd.manager.JoinGroup(groupAddr, subscriberID, "")
    if err != nil {
        return err
    }

    // 启动订阅协程
    go dd.subscribeLoop(subscriber)

    return nil
}

func (dd *DataDistributor) Publish(publisherID string, data []byte) error {
    dd.mutex.RLock()
    publisher, exists := dd.publishers[publisherID]
    dd.mutex.RUnlock()

    if !exists {
        return fmt.Errorf("发布者不存在: %s", publisherID)
    }

    select {
    case publisher.DataChan <- data:
        return nil
    default:
        return fmt.Errorf("发布队列已满")
    }
}

func (dd *DataDistributor) publishLoop(publisher *Publisher) {
    for {
        select {
        case <-publisher.ctx.Done():
            return
        case data := <-publisher.DataChan:
            err := dd.manager.SendMessage(publisher.GroupAddr, string(data), 32)
            if err != nil {
                fmt.Printf("发布数据失败: %v\n", err)
            }
        }
    }
}

func (dd *DataDistributor) subscribeLoop(subscriber *Subscriber) {
    // 这里需要实现组播数据接收逻辑
    // 由于组播接收比较复杂,这里简化处理
    fmt.Printf("订阅者 %s 开始监听组播组 %s\n", subscriber.ID, subscriber.GroupAddr)
}

func hash(s string) int {
    h := 0
    for _, c := range s {
        h = 31*h + int(c)
    }
    if h < 0 {
        h = -h
    }
    return h
}

func (dd *DataDistributor) Close() error {
    // 关闭所有发布者
    dd.mutex.Lock()
    for _, publisher := range dd.publishers {
        publisher.cancel()
        close(publisher.DataChan)
    }

    // 关闭所有订阅者
    for _, subscriber := range dd.subscribers {
        subscriber.cancel()
        dd.manager.LeaveGroup(subscriber.GroupAddr, subscriber.ID)
    }
    dd.mutex.Unlock()

    return dd.manager.Close()
}

性能监控和调试 #

网络流量监控 #

type NetworkMonitor struct {
    interfaces map[string]*InterfaceStats
    mutex      sync.RWMutex
    ctx        context.Context
    cancel     context.CancelFunc
}

type InterfaceStats struct {
    Name            string
    PacketsSent     int64
    PacketsReceived int64
    BytesSent       int64
    BytesReceived   int64
    Errors          int64
    LastUpdate      time.Time
}

func NewNetworkMonitor() *NetworkMonitor {
    ctx, cancel := context.WithCancel(context.Background())

    nm := &NetworkMonitor{
        interfaces: make(map[string]*InterfaceStats),
        ctx:        ctx,
        cancel:     cancel,
    }

    go nm.monitorLoop()
    return nm
}

func (nm *NetworkMonitor) monitorLoop() {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-nm.ctx.Done():
            return
        case <-ticker.C:
            nm.updateStats()
        }
    }
}

func (nm *NetworkMonitor) updateStats() {
    interfaces, err := net.Interfaces()
    if err != nil {
        return
    }

    nm.mutex.Lock()
    defer nm.mutex.Unlock()

    for _, iface := range interfaces {
        if iface.Flags&net.FlagUp == 0 {
            continue
        }

        stats, exists := nm.interfaces[iface.Name]
        if !exists {
            stats = &InterfaceStats{
                Name: iface.Name,
            }
            nm.interfaces[iface.Name] = stats
        }

        // 这里应该读取系统的网络统计信息
        // 由于 Go 标准库没有直接提供,这里模拟数据
        stats.LastUpdate = time.Now()
    }
}

func (nm *NetworkMonitor) GetStats() map[string]*InterfaceStats {
    nm.mutex.RLock()
    defer nm.mutex.RUnlock()

    stats := make(map[string]*InterfaceStats)
    for name, stat := range nm.interfaces {
        statCopy := *stat
        stats[name] = &statCopy
    }

    return stats
}

func (nm *NetworkMonitor) Close() {
    nm.cancel()
}

小结 #

本节详细介绍了 UDP 广播与组播技术,包括:

  1. 广播技术 - 基础广播实现和高级广播管理
  2. 组播技术 - 组播基础概念和高级组播管理
  3. 服务发现 - 基于广播的服务发现系统
  4. 数据分发 - 基于组播的实时数据分发系统
  5. 性能监控 - 网络流量监控和调试工具

掌握这些技术后,你就能够实现高效的一对多网络通信,构建分布式系统中的服务发现、数据分发等关键组件。在下一节中,我们将学习网络协议设计的相关知识。