3.8.4 实时聊天系统

3.8.4 实时聊天系统 #

实时聊天系统是 WebSocket 技术的经典应用场景,它综合运用了前面学习的所有知识点。本节将通过构建一个完整的实时聊天系统,展示如何将 WebSocket 服务端、客户端、消息路由、用户管理等组件整合在一起。

系统架构设计 #

整体架构 #

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Web Client    │    │   Go Client     │    │  Mobile Client  │
│   (Browser)     │    │   (Terminal)    │    │    (App)        │
└─────────┬───────┘    └─────────┬───────┘    └─────────┬───────┘
          │                      │                      │
          └──────────────────────┼──────────────────────┘
                                 │
                    ┌─────────────┴─────────────┐
                    │     WebSocket Server      │
                    │                           │
                    │  ┌─────────────────────┐  │
                    │  │    Message Hub      │  │
                    │  │                     │  │
                    │  │ ┌─────┐ ┌─────────┐ │  │
                    │  │ │Users│ │  Rooms  │ │  │
                    │  │ └─────┘ └─────────┘ │  │
                    │  └─────────────────────┘  │
                    └─────────────┬─────────────┘
                                  │
                    ┌─────────────┴─────────────┐
                    │      Data Layer           │
                    │                           │
                    │ ┌─────────┐ ┌───────────┐ │
                    │ │Database │ │   Redis   │ │
                    │ │(Messages│ │ (Sessions)│ │
                    │ │ History)│ │           │ │
                    │ └─────────┘ └───────────┘ │
                    └───────────────────────────┘

核心组件 #

  1. 用户管理:用户注册、登录、在线状态
  2. 房间管理:房间创建、加入、离开、权限控制
  3. 消息系统:实时消息、历史消息、私聊、群聊
  4. 通知系统:用户上线下线、系统通知
  5. 文件传输:图片、文件分享功能

数据模型设计 #

用户模型 #

// internal/models/user.go
package models

import (
    "time"
    "golang.org/x/crypto/bcrypt"
)

// 用户状态
type UserStatus int

const (
    UserStatusOffline UserStatus = iota
    UserStatusOnline
    UserStatusAway
    UserStatusBusy
)

func (s UserStatus) String() string {
    switch s {
    case UserStatusOffline:
        return "offline"
    case UserStatusOnline:
        return "online"
    case UserStatusAway:
        return "away"
    case UserStatusBusy:
        return "busy"
    default:
        return "unknown"
    }
}

// 用户模型
type User struct {
    ID          int64     `json:"id" db:"id"`
    Username    string    `json:"username" db:"username"`
    Email       string    `json:"email" db:"email"`
    Password    string    `json:"-" db:"password_hash"`
    Nickname    string    `json:"nickname" db:"nickname"`
    Avatar      string    `json:"avatar" db:"avatar"`
    Status      UserStatus `json:"status" db:"status"`
    LastSeen    time.Time `json:"last_seen" db:"last_seen"`
    CreatedAt   time.Time `json:"created_at" db:"created_at"`
    UpdatedAt   time.Time `json:"updated_at" db:"updated_at"`
}

// 设置密码
func (u *User) SetPassword(password string) error {
    hash, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
    if err != nil {
        return err
    }
    u.Password = string(hash)
    return nil
}

// 验证密码
func (u *User) CheckPassword(password string) bool {
    err := bcrypt.CompareHashAndPassword([]byte(u.Password), []byte(password))
    return err == nil
}

// 用户简要信息
type UserInfo struct {
    ID       int64      `json:"id"`
    Username string     `json:"username"`
    Nickname string     `json:"nickname"`
    Avatar   string     `json:"avatar"`
    Status   UserStatus `json:"status"`
}

// 转换为用户信息
func (u *User) ToUserInfo() *UserInfo {
    return &UserInfo{
        ID:       u.ID,
        Username: u.Username,
        Nickname: u.Nickname,
        Avatar:   u.Avatar,
        Status:   u.Status,
    }
}

房间模型 #

// internal/models/room.go
package models

import (
    "time"
)

// 房间类型
type RoomType int

const (
    RoomTypePublic RoomType = iota
    RoomTypePrivate
    RoomTypeDirect
)

func (t RoomType) String() string {
    switch t {
    case RoomTypePublic:
        return "public"
    case RoomTypePrivate:
        return "private"
    case RoomTypeDirect:
        return "direct"
    default:
        return "unknown"
    }
}

// 房间模型
type Room struct {
    ID          int64     `json:"id" db:"id"`
    Name        string    `json:"name" db:"name"`
    Description string    `json:"description" db:"description"`
    Type        RoomType  `json:"type" db:"type"`
    OwnerID     int64     `json:"owner_id" db:"owner_id"`
    MaxMembers  int       `json:"max_members" db:"max_members"`
    IsActive    bool      `json:"is_active" db:"is_active"`
    CreatedAt   time.Time `json:"created_at" db:"created_at"`
    UpdatedAt   time.Time `json:"updated_at" db:"updated_at"`
}

// 房间成员
type RoomMember struct {
    ID       int64     `json:"id" db:"id"`
    RoomID   int64     `json:"room_id" db:"room_id"`
    UserID   int64     `json:"user_id" db:"user_id"`
    Role     string    `json:"role" db:"role"` // owner, admin, member
    JoinedAt time.Time `json:"joined_at" db:"joined_at"`
}

// 房间信息(包含成员数量)
type RoomInfo struct {
    *Room
    MemberCount int `json:"member_count"`
    IsJoined    bool `json:"is_joined"`
}

消息模型 #

// internal/models/message.go
package models

import (
    "encoding/json"
    "time"
)

// 消息类型
type MessageType string

const (
    MessageTypeText     MessageType = "text"
    MessageTypeImage    MessageType = "image"
    MessageTypeFile     MessageType = "file"
    MessageTypeSystem   MessageType = "system"
    MessageTypeNotice   MessageType = "notice"
)

// 消息模型
type Message struct {
    ID        int64       `json:"id" db:"id"`
    Type      MessageType `json:"type" db:"type"`
    FromID    int64       `json:"from_id" db:"from_id"`
    ToID      *int64      `json:"to_id,omitempty" db:"to_id"`
    RoomID    *int64      `json:"room_id,omitempty" db:"room_id"`
    Content   string      `json:"content" db:"content"`
    Extra     string      `json:"extra,omitempty" db:"extra"` // JSON 格式的额外数据
    CreatedAt time.Time   `json:"created_at" db:"created_at"`

    // 关联数据(不存储在数据库)
    From *UserInfo `json:"from,omitempty" db:"-"`
    To   *UserInfo `json:"to,omitempty" db:"-"`
}

// 消息额外数据
type MessageExtra struct {
    FileName string `json:"file_name,omitempty"`
    FileSize int64  `json:"file_size,omitempty"`
    FileURL  string `json:"file_url,omitempty"`
    Width    int    `json:"width,omitempty"`
    Height   int    `json:"height,omitempty"`
}

// 设置额外数据
func (m *Message) SetExtra(extra *MessageExtra) error {
    if extra == nil {
        m.Extra = ""
        return nil
    }

    data, err := json.Marshal(extra)
    if err != nil {
        return err
    }

    m.Extra = string(data)
    return nil
}

// 获取额外数据
func (m *Message) GetExtra() (*MessageExtra, error) {
    if m.Extra == "" {
        return nil, nil
    }

    var extra MessageExtra
    err := json.Unmarshal([]byte(m.Extra), &extra)
    if err != nil {
        return nil, err
    }

    return &extra, nil
}

用户管理系统 #

用户服务 #

// internal/service/user.go
package service

import (
    "database/sql"
    "fmt"
    "time"

    "your-project/internal/models"
    "your-project/internal/repository"
)

// 用户服务
type UserService struct {
    userRepo *repository.UserRepository
}

// 创建用户服务
func NewUserService(userRepo *repository.UserRepository) *UserService {
    return &UserService{
        userRepo: userRepo,
    }
}

// 用户注册
func (s *UserService) Register(username, email, password, nickname string) (*models.User, error) {
    // 检查用户名是否已存在
    if exists, err := s.userRepo.ExistsByUsername(username); err != nil {
        return nil, err
    } else if exists {
        return nil, fmt.Errorf("用户名已存在")
    }

    // 检查邮箱是否已存在
    if exists, err := s.userRepo.ExistsByEmail(email); err != nil {
        return nil, err
    } else if exists {
        return nil, fmt.Errorf("邮箱已存在")
    }

    // 创建用户
    user := &models.User{
        Username:  username,
        Email:     email,
        Nickname:  nickname,
        Status:    models.UserStatusOffline,
        CreatedAt: time.Now(),
        UpdatedAt: time.Now(),
    }

    // 设置密码
    if err := user.SetPassword(password); err != nil {
        return nil, fmt.Errorf("密码加密失败: %v", err)
    }

    // 保存用户
    if err := s.userRepo.Create(user); err != nil {
        return nil, fmt.Errorf("创建用户失败: %v", err)
    }

    return user, nil
}

// 用户登录
func (s *UserService) Login(username, password string) (*models.User, error) {
    // 查找用户
    user, err := s.userRepo.GetByUsername(username)
    if err != nil {
        if err == sql.ErrNoRows {
            return nil, fmt.Errorf("用户不存在")
        }
        return nil, err
    }

    // 验证密码
    if !user.CheckPassword(password) {
        return nil, fmt.Errorf("密码错误")
    }

    // 更新最后登录时间和状态
    user.Status = models.UserStatusOnline
    user.LastSeen = time.Now()
    user.UpdatedAt = time.Now()

    if err := s.userRepo.Update(user); err != nil {
        return nil, fmt.Errorf("更新用户状态失败: %v", err)
    }

    return user, nil
}

// 用户登出
func (s *UserService) Logout(userID int64) error {
    user, err := s.userRepo.GetByID(userID)
    if err != nil {
        return err
    }

    user.Status = models.UserStatusOffline
    user.LastSeen = time.Now()
    user.UpdatedAt = time.Now()

    return s.userRepo.Update(user)
}

// 更新用户状态
func (s *UserService) UpdateStatus(userID int64, status models.UserStatus) error {
    user, err := s.userRepo.GetByID(userID)
    if err != nil {
        return err
    }

    user.Status = status
    user.LastSeen = time.Now()
    user.UpdatedAt = time.Now()

    return s.userRepo.Update(user)
}

// 搜索用户
func (s *UserService) SearchUsers(query string, limit int) ([]*models.UserInfo, error) {
    users, err := s.userRepo.Search(query, limit)
    if err != nil {
        return nil, err
    }

    userInfos := make([]*models.UserInfo, len(users))
    for i, user := range users {
        userInfos[i] = user.ToUserInfo()
    }

    return userInfos, nil
}

// 获取在线用户
func (s *UserService) GetOnlineUsers() ([]*models.UserInfo, error) {
    users, err := s.userRepo.GetByStatus(models.UserStatusOnline)
    if err != nil {
        return nil, err
    }

    userInfos := make([]*models.UserInfo, len(users))
    for i, user := range users {
        userInfos[i] = user.ToUserInfo()
    }

    return userInfos, nil
}

认证中间件 #

// internal/middleware/auth.go
package middleware

import (
    "context"
    "fmt"
    "net/http"
    "strings"
    "time"

    "github.com/golang-jwt/jwt/v4"
    "your-project/internal/models"
)

// JWT 声明
type Claims struct {
    UserID   int64  `json:"user_id"`
    Username string `json:"username"`
    jwt.RegisteredClaims
}

// JWT 管理器
type JWTManager struct {
    secretKey []byte
    issuer    string
    duration  time.Duration
}

// 创建 JWT 管理器
func NewJWTManager(secretKey, issuer string, duration time.Duration) *JWTManager {
    return &JWTManager{
        secretKey: []byte(secretKey),
        issuer:    issuer,
        duration:  duration,
    }
}

// 生成 JWT 令牌
func (m *JWTManager) Generate(user *models.User) (string, error) {
    claims := &Claims{
        UserID:   user.ID,
        Username: user.Username,
        RegisteredClaims: jwt.RegisteredClaims{
            ExpiresAt: jwt.NewNumericDate(time.Now().Add(m.duration)),
            IssuedAt:  jwt.NewNumericDate(time.Now()),
            Issuer:    m.issuer,
        },
    }

    token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
    return token.SignedString(m.secretKey)
}

// 验证 JWT 令牌
func (m *JWTManager) Verify(tokenString string) (*Claims, error) {
    token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
        if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
            return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
        }
        return m.secretKey, nil
    })

    if err != nil {
        return nil, err
    }

    if claims, ok := token.Claims.(*Claims); ok && token.Valid {
        return claims, nil
    }

    return nil, fmt.Errorf("invalid token")
}

// HTTP 认证中间件
func (m *JWTManager) HTTPMiddleware(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        authHeader := r.Header.Get("Authorization")
        if authHeader == "" {
            http.Error(w, "Missing authorization header", http.StatusUnauthorized)
            return
        }

        tokenString := strings.TrimPrefix(authHeader, "Bearer ")
        if tokenString == authHeader {
            http.Error(w, "Invalid authorization header format", http.StatusUnauthorized)
            return
        }

        claims, err := m.Verify(tokenString)
        if err != nil {
            http.Error(w, "Invalid token", http.StatusUnauthorized)
            return
        }

        // 将用户信息添加到上下文
        ctx := context.WithValue(r.Context(), "user_id", claims.UserID)
        ctx = context.WithValue(ctx, "username", claims.Username)

        next.ServeHTTP(w, r.WithContext(ctx))
    }
}

// WebSocket 认证
func (m *JWTManager) AuthenticateWebSocket(r *http.Request) (*Claims, error) {
    // 从查询参数获取令牌
    token := r.URL.Query().Get("token")
    if token == "" {
        // 从 Header 获取令牌
        authHeader := r.Header.Get("Authorization")
        if authHeader != "" {
            token = strings.TrimPrefix(authHeader, "Bearer ")
        }
    }

    if token == "" {
        return nil, fmt.Errorf("missing token")
    }

    return m.Verify(token)
}

房间管理系统 #

房间服务 #

// internal/service/room.go
package service

import (
    "fmt"
    "time"

    "your-project/internal/models"
    "your-project/internal/repository"
)

// 房间服务
type RoomService struct {
    roomRepo       *repository.RoomRepository
    roomMemberRepo *repository.RoomMemberRepository
}

// 创建房间服务
func NewRoomService(roomRepo *repository.RoomRepository, roomMemberRepo *repository.RoomMemberRepository) *RoomService {
    return &RoomService{
        roomRepo:       roomRepo,
        roomMemberRepo: roomMemberRepo,
    }
}

// 创建房间
func (s *RoomService) CreateRoom(ownerID int64, name, description string, roomType models.RoomType, maxMembers int) (*models.Room, error) {
    // 检查房间名是否已存在(对于公共房间)
    if roomType == models.RoomTypePublic {
        if exists, err := s.roomRepo.ExistsByName(name); err != nil {
            return nil, err
        } else if exists {
            return nil, fmt.Errorf("房间名已存在")
        }
    }

    // 创建房间
    room := &models.Room{
        Name:        name,
        Description: description,
        Type:        roomType,
        OwnerID:     ownerID,
        MaxMembers:  maxMembers,
        IsActive:    true,
        CreatedAt:   time.Now(),
        UpdatedAt:   time.Now(),
    }

    if err := s.roomRepo.Create(room); err != nil {
        return nil, fmt.Errorf("创建房间失败: %v", err)
    }

    // 将创建者添加为房间成员
    member := &models.RoomMember{
        RoomID:   room.ID,
        UserID:   ownerID,
        Role:     "owner",
        JoinedAt: time.Now(),
    }

    if err := s.roomMemberRepo.Create(member); err != nil {
        return nil, fmt.Errorf("添加房间成员失败: %v", err)
    }

    return room, nil
}

// 加入房间
func (s *RoomService) JoinRoom(userID, roomID int64) error {
    // 检查房间是否存在
    room, err := s.roomRepo.GetByID(roomID)
    if err != nil {
        return fmt.Errorf("房间不存在")
    }

    if !room.IsActive {
        return fmt.Errorf("房间已禁用")
    }

    // 检查是否已经是成员
    if exists, err := s.roomMemberRepo.IsMember(roomID, userID); err != nil {
        return err
    } else if exists {
        return fmt.Errorf("已经是房间成员")
    }

    // 检查房间人数限制
    if room.MaxMembers > 0 {
        memberCount, err := s.roomMemberRepo.GetMemberCount(roomID)
        if err != nil {
            return err
        }
        if memberCount >= room.MaxMembers {
            return fmt.Errorf("房间已满")
        }
    }

    // 添加成员
    member := &models.RoomMember{
        RoomID:   roomID,
        UserID:   userID,
        Role:     "member",
        JoinedAt: time.Now(),
    }

    return s.roomMemberRepo.Create(member)
}

// 离开房间
func (s *RoomService) LeaveRoom(userID, roomID int64) error {
    // 检查是否是房间成员
    if exists, err := s.roomMemberRepo.IsMember(roomID, userID); err != nil {
        return err
    } else if !exists {
        return fmt.Errorf("不是房间成员")
    }

    // 检查是否是房间所有者
    room, err := s.roomRepo.GetByID(roomID)
    if err != nil {
        return err
    }

    if room.OwnerID == userID {
        // 如果是所有者,需要转移所有权或删除房间
        memberCount, err := s.roomMemberRepo.GetMemberCount(roomID)
        if err != nil {
            return err
        }

        if memberCount > 1 {
            // 转移所有权给第一个管理员或成员
            members, err := s.roomMemberRepo.GetByRoomID(roomID)
            if err != nil {
                return err
            }

            var newOwner *models.RoomMember
            for _, member := range members {
                if member.UserID != userID {
                    if member.Role == "admin" {
                        newOwner = member
                        break
                    } else if newOwner == nil {
                        newOwner = member
                    }
                }
            }

            if newOwner != nil {
                room.OwnerID = newOwner.UserID
                if err := s.roomRepo.Update(room); err != nil {
                    return err
                }

                newOwner.Role = "owner"
                if err := s.roomMemberRepo.Update(newOwner); err != nil {
                    return err
                }
            }
        } else {
            // 删除房间
            room.IsActive = false
            if err := s.roomRepo.Update(room); err != nil {
                return err
            }
        }
    }

    // 移除成员
    return s.roomMemberRepo.Delete(roomID, userID)
}

// 获取用户的房间列表
func (s *RoomService) GetUserRooms(userID int64) ([]*models.RoomInfo, error) {
    rooms, err := s.roomRepo.GetByUserID(userID)
    if err != nil {
        return nil, err
    }

    roomInfos := make([]*models.RoomInfo, len(rooms))
    for i, room := range rooms {
        memberCount, err := s.roomMemberRepo.GetMemberCount(room.ID)
        if err != nil {
            memberCount = 0
        }

        roomInfos[i] = &models.RoomInfo{
            Room:        room,
            MemberCount: memberCount,
            IsJoined:    true,
        }
    }

    return roomInfos, nil
}

// 搜索公共房间
func (s *RoomService) SearchPublicRooms(query string, userID int64, limit int) ([]*models.RoomInfo, error) {
    rooms, err := s.roomRepo.SearchPublic(query, limit)
    if err != nil {
        return nil, err
    }

    roomInfos := make([]*models.RoomInfo, len(rooms))
    for i, room := range rooms {
        memberCount, err := s.roomMemberRepo.GetMemberCount(room.ID)
        if err != nil {
            memberCount = 0
        }

        isJoined, err := s.roomMemberRepo.IsMember(room.ID, userID)
        if err != nil {
            isJoined = false
        }

        roomInfos[i] = &models.RoomInfo{
            Room:        room,
            MemberCount: memberCount,
            IsJoined:    isJoined,
        }
    }

    return roomInfos, nil
}

// 获取房间成员
func (s *RoomService) GetRoomMembers(roomID int64) ([]*models.UserInfo, error) {
    return s.roomMemberRepo.GetMembersInfo(roomID)
}

消息系统 #

消息服务 #

// internal/service/message.go
package service

import (
    "fmt"
    "time"

    "your-project/internal/models"
    "your-project/internal/repository"
)

// 消息服务
type MessageService struct {
    messageRepo *repository.MessageRepository
    userRepo    *repository.UserRepository
}

// 创建消息服务
func NewMessageService(messageRepo *repository.MessageRepository, userRepo *repository.UserRepository) *MessageService {
    return &MessageService{
        messageRepo: messageRepo,
        userRepo:    userRepo,
    }
}

// 发送房间消息
func (s *MessageService) SendRoomMessage(fromID, roomID int64, msgType models.MessageType, content string, extra *models.MessageExtra) (*models.Message, error) {
    message := &models.Message{
        Type:      msgType,
        FromID:    fromID,
        RoomID:    &roomID,
        Content:   content,
        CreatedAt: time.Now(),
    }

    if extra != nil {
        if err := message.SetExtra(extra); err != nil {
            return nil, fmt.Errorf("设置消息额外数据失败: %v", err)
        }
    }

    if err := s.messageRepo.Create(message); err != nil {
        return nil, fmt.Errorf("保存消息失败: %v", err)
    }

    // 加载发送者信息
    if err := s.loadMessageUserInfo(message); err != nil {
        return nil, err
    }

    return message, nil
}

// 发送私人消息
func (s *MessageService) SendPrivateMessage(fromID, toID int64, msgType models.MessageType, content string, extra *models.MessageExtra) (*models.Message, error) {
    message := &models.Message{
        Type:      msgType,
        FromID:    fromID,
        ToID:      &toID,
        Content:   content,
        CreatedAt: time.Now(),
    }

    if extra != nil {
        if err := message.SetExtra(extra); err != nil {
            return nil, fmt.Errorf("设置消息额外数据失败: %v", err)
        }
    }

    if err := s.messageRepo.Create(message); err != nil {
        return nil, fmt.Errorf("保存消息失败: %v", err)
    }

    // 加载用户信息
    if err := s.loadMessageUserInfo(message); err != nil {
        return nil, err
    }

    return message, nil
}

// 获取房间历史消息
func (s *MessageService) GetRoomHistory(roomID int64, limit, offset int) ([]*models.Message, error) {
    messages, err := s.messageRepo.GetRoomHistory(roomID, limit, offset)
    if err != nil {
        return nil, err
    }

    // 加载用户信息
    for _, message := range messages {
        if err := s.loadMessageUserInfo(message); err != nil {
            continue // 忽略加载失败的消息
        }
    }

    return messages, nil
}

// 获取私聊历史消息
func (s *MessageService) GetPrivateHistory(user1ID, user2ID int64, limit, offset int) ([]*models.Message, error) {
    messages, err := s.messageRepo.GetPrivateHistory(user1ID, user2ID, limit, offset)
    if err != nil {
        return nil, err
    }

    // 加载用户信息
    for _, message := range messages {
        if err := s.loadMessageUserInfo(message); err != nil {
            continue // 忽略加载失败的消息
        }
    }

    return messages, nil
}

// 加载消息的用户信息
func (s *MessageService) loadMessageUserInfo(message *models.Message) error {
    // 加载发送者信息
    if message.FromID > 0 {
        fromUser, err := s.userRepo.GetByID(message.FromID)
        if err == nil {
            message.From = fromUser.ToUserInfo()
        }
    }

    // 加载接收者信息(私聊消息)
    if message.ToID != nil && *message.ToID > 0 {
        toUser, err := s.userRepo.GetByID(*message.ToID)
        if err == nil {
            message.To = toUser.ToUserInfo()
        }
    }

    return nil
}

WebSocket 消息处理 #

聊天消息处理器 #

// internal/handler/chat.go
package handler

import (
    "encoding/json"
    "fmt"
    "log"
    "strconv"
    "time"

    "your-project/internal/models"
    "your-project/internal/service"
    "your-project/internal/websocket"
)

// 聊天消息处理器
type ChatHandler struct {
    userService    *service.UserService
    roomService    *service.RoomService
    messageService *service.MessageService
    hub            *websocket.Hub
}

// 创建聊天处理器
func NewChatHandler(
    userService *service.UserService,
    roomService *service.RoomService,
    messageService *service.MessageService,
    hub *websocket.Hub,
) *ChatHandler {
    return &ChatHandler{
        userService:    userService,
        roomService:    roomService,
        messageService: messageService,
        hub:            hub,
    }
}

// WebSocket 消息
type WSMessage struct {
    Type      string      `json:"type"`
    Data      interface{} `json:"data"`
    Timestamp time.Time   `json:"timestamp"`
}

// 处理 WebSocket 消息
func (h *ChatHandler) HandleMessage(client *websocket.Client, msgType string, data []byte) error {
    switch msgType {
    case "chat_message":
        return h.handleChatMessage(client, data)
    case "join_room":
        return h.handleJoinRoom(client, data)
    case "leave_room":
        return h.handleLeaveRoom(client, data)
    case "private_message":
        return h.handlePrivateMessage(client, data)
    case "get_room_history":
        return h.handleGetRoomHistory(client, data)
    case "get_private_history":
        return h.handleGetPrivateHistory(client, data)
    case "get_online_users":
        return h.handleGetOnlineUsers(client, data)
    case "get_user_rooms":
        return h.handleGetUserRooms(client, data)
    case "search_rooms":
        return h.handleSearchRooms(client, data)
    case "create_room":
        return h.handleCreateRoom(client, data)
    default:
        return fmt.Errorf("unknown message type: %s", msgType)
    }
}

// 处理聊天消息
func (h *ChatHandler) handleChatMessage(client *websocket.Client, data []byte) error {
    var req struct {
        RoomID  int64  `json:"room_id"`
        Content string `json:"content"`
    }

    if err := json.Unmarshal(data, &req); err != nil {
        return err
    }

    if req.Content == "" {
        return fmt.Errorf("消息内容不能为空")
    }

    // 检查是否是房间成员
    // 这里简化处理,实际应该检查权限

    // 保存消息
    message, err := h.messageService.SendRoomMessage(
        client.UserID,
        req.RoomID,
        models.MessageTypeText,
        req.Content,
        nil,
    )
    if err != nil {
        return err
    }

    // 广播消息到房间
    wsMsg := &WSMessage{
        Type:      "chat_message",
        Data:      message,
        Timestamp: time.Now(),
    }

    msgData, err := json.Marshal(wsMsg)
    if err != nil {
        return err
    }

    h.hub.BroadcastToRoom(strconv.FormatInt(req.RoomID, 10), msgData)
    return nil
}

// 处理加入房间
func (h *ChatHandler) handleJoinRoom(client *websocket.Client, data []byte) error {
    var req struct {
        RoomID int64 `json:"room_id"`
    }

    if err := json.Unmarshal(data, &req); err != nil {
        return err
    }

    // 加入房间
    if err := h.roomService.JoinRoom(client.UserID, req.RoomID); err != nil {
        return err
    }

    // 将客户端添加到房间
    roomKey := strconv.FormatInt(req.RoomID, 10)
    h.hub.JoinRoom(client, roomKey)

    // 通知房间其他成员
    wsMsg := &WSMessage{
        Type: "user_joined",
        Data: map[string]interface{}{
            "room_id": req.RoomID,
            "user":    client.UserInfo,
        },
        Timestamp: time.Now(),
    }

    msgData, err := json.Marshal(wsMsg)
    if err != nil {
        return err
    }

    h.hub.BroadcastToRoom(roomKey, msgData)

    // 发送确认消息给客户端
    confirmMsg := &WSMessage{
        Type: "room_joined",
        Data: map[string]interface{}{
            "room_id": req.RoomID,
            "message": "成功加入房间",
        },
        Timestamp: time.Now(),
    }

    return client.SendMessage(confirmMsg)
}

// 处理离开房间
func (h *ChatHandler) handleLeaveRoom(client *websocket.Client, data []byte) error {
    var req struct {
        RoomID int64 `json:"room_id"`
    }

    if err := json.Unmarshal(data, &req); err != nil {
        return err
    }

    // 离开房间
    if err := h.roomService.LeaveRoom(client.UserID, req.RoomID); err != nil {
        return err
    }

    // 从房间移除客户端
    roomKey := strconv.FormatInt(req.RoomID, 10)
    h.hub.LeaveRoom(client, roomKey)

    // 通知房间其他成员
    wsMsg := &WSMessage{
        Type: "user_left",
        Data: map[string]interface{}{
            "room_id": req.RoomID,
            "user":    client.UserInfo,
        },
        Timestamp: time.Now(),
    }

    msgData, err := json.Marshal(wsMsg)
    if err != nil {
        return err
    }

    h.hub.BroadcastToRoom(roomKey, msgData)

    // 发送确认消息给客户端
    confirmMsg := &WSMessage{
        Type: "room_left",
        Data: map[string]interface{}{
            "room_id": req.RoomID,
            "message": "已离开房间",
        },
        Timestamp: time.Now(),
    }

    return client.SendMessage(confirmMsg)
}

// 处理私人消息
func (h *ChatHandler) handlePrivateMessage(client *websocket.Client, data []byte) error {
    var req struct {
        ToUserID int64  `json:"to_user_id"`
        Content  string `json:"content"`
    }

    if err := json.Unmarshal(data, &req); err != nil {
        return err
    }

    if req.Content == "" {
        return fmt.Errorf("消息内容不能为空")
    }

    // 保存消息
    message, err := h.messageService.SendPrivateMessage(
        client.UserID,
        req.ToUserID,
        models.MessageTypeText,
        req.Content,
        nil,
    )
    if err != nil {
        return err
    }

    // 发送消息给目标用户
    wsMsg := &WSMessage{
        Type:      "private_message",
        Data:      message,
        Timestamp: time.Now(),
    }

    msgData, err := json.Marshal(wsMsg)
    if err != nil {
        return err
    }

    // 发送给目标用户
    h.hub.SendToUser(req.ToUserID, msgData)

    // 发送确认给发送者
    confirmMsg := &WSMessage{
        Type: "message_sent",
        Data: map[string]interface{}{
            "message_id": message.ID,
            "to_user_id": req.ToUserID,
        },
        Timestamp: time.Now(),
    }

    return client.SendMessage(confirmMsg)
}

// 处理获取房间历史消息
func (h *ChatHandler) handleGetRoomHistory(client *websocket.Client, data []byte) error {
    var req struct {
        RoomID int64 `json:"room_id"`
        Limit  int   `json:"limit"`
        Offset int   `json:"offset"`
    }

    if err := json.Unmarshal(data, &req); err != nil {
        return err
    }

    if req.Limit <= 0 {
        req.Limit = 50
    }

    messages, err := h.messageService.GetRoomHistory(req.RoomID, req.Limit, req.Offset)
    if err != nil {
        return err
    }

    wsMsg := &WSMessage{
        Type: "room_history",
        Data: map[string]interface{}{
            "room_id":  req.RoomID,
            "messages": messages,
        },
        Timestamp: time.Now(),
    }

    return client.SendMessage(wsMsg)
}

// 处理获取私聊历史消息
func (h *ChatHandler) handleGetPrivateHistory(client *websocket.Client, data []byte) error {
    var req struct {
        WithUserID int64 `json:"with_user_id"`
        Limit      int   `json:"limit"`
        Offset     int   `json:"offset"`
    }

    if err := json.Unmarshal(data, &req); err != nil {
        return err
    }

    if req.Limit <= 0 {
        req.Limit = 50
    }

    messages, err := h.messageService.GetPrivateHistory(client.UserID, req.WithUserID, req.Limit, req.Offset)
    if err != nil {
        return err
    }

    wsMsg := &WSMessage{
        Type: "private_history",
        Data: map[string]interface{}{
            "with_user_id": req.WithUserID,
            "messages":     messages,
        },
        Timestamp: time.Now(),
    }

    return client.SendMessage(wsMsg)
}

// 处理获取在线用户
func (h *ChatHandler) handleGetOnlineUsers(client *websocket.Client, data []byte) error {
    users, err := h.userService.GetOnlineUsers()
    if err != nil {
        return err
    }

    wsMsg := &WSMessage{
        Type: "online_users",
        Data: map[string]interface{}{
            "users": users,
            "count": len(users),
        },
        Timestamp: time.Now(),
    }

    return client.SendMessage(wsMsg)
}

// 处理获取用户房间列表
func (h *ChatHandler) handleGetUserRooms(client *websocket.Client, data []byte) error {
    rooms, err := h.roomService.GetUserRooms(client.UserID)
    if err != nil {
        return err
    }

    wsMsg := &WSMessage{
        Type: "user_rooms",
        Data: map[string]interface{}{
            "rooms": rooms,
        },
        Timestamp: time.Now(),
    }

    return client.SendMessage(wsMsg)
}

// 处理搜索房间
func (h *ChatHandler) handleSearchRooms(client *websocket.Client, data []byte) error {
    var req struct {
        Query string `json:"query"`
        Limit int    `json:"limit"`
    }

    if err := json.Unmarshal(data, &req); err != nil {
        return err
    }

    if req.Limit <= 0 {
        req.Limit = 20
    }

    rooms, err := h.roomService.SearchPublicRooms(req.Query, client.UserID, req.Limit)
    if err != nil {
        return err
    }

    wsMsg := &WSMessage{
        Type: "search_rooms_result",
        Data: map[string]interface{}{
            "query": req.Query,
            "rooms": rooms,
        },
        Timestamp: time.Now(),
    }

    return client.SendMessage(wsMsg)
}

// 处理创建房间
func (h *ChatHandler) handleCreateRoom(client *websocket.Client, data []byte) error {
    var req struct {
        Name        string           `json:"name"`
        Description string           `json:"description"`
        Type        models.RoomType  `json:"type"`
        MaxMembers  int              `json:"max_members"`
    }

    if err := json.Unmarshal(data, &req); err != nil {
        return err
    }

    if req.Name == "" {
        return fmt.Errorf("房间名不能为空")
    }

    room, err := h.roomService.CreateRoom(
        client.UserID,
        req.Name,
        req.Description,
        req.Type,
        req.MaxMembers,
    )
    if err != nil {
        return err
    }

    wsMsg := &WSMessage{
        Type: "room_created",
        Data: map[string]interface{}{
            "room": room,
        },
        Timestamp: time.Now(),
    }

    return client.SendMessage(wsMsg)
}

小结 #

本节通过构建一个完整的实时聊天系统,展示了如何综合运用 WebSocket 技术:

  1. 系统架构:分层设计、模块化开发
  2. 数据模型:用户、房间、消息的完整建模
  3. 用户管理:注册登录、状态管理、JWT 认证
  4. 房间管理:房间创建、成员管理、权限控制
  5. 消息系统:实时消息、历史记录、私聊群聊
  6. WebSocket 处理:消息路由、事件处理、状态同步

通过这个实战项目,您可以深入理解 WebSocket 在实际应用中的使用方法,为开发更复杂的实时应用打下坚实基础。