3.8.4 实时聊天系统 #
实时聊天系统是 WebSocket 技术的经典应用场景,它综合运用了前面学习的所有知识点。本节将通过构建一个完整的实时聊天系统,展示如何将 WebSocket 服务端、客户端、消息路由、用户管理等组件整合在一起。
系统架构设计 #
整体架构 #
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Web Client │ │ Go Client │ │ Mobile Client │
│ (Browser) │ │ (Terminal) │ │ (App) │
└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘
│ │ │
└──────────────────────┼──────────────────────┘
│
┌─────────────┴─────────────┐
│ WebSocket Server │
│ │
│ ┌─────────────────────┐ │
│ │ Message Hub │ │
│ │ │ │
│ │ ┌─────┐ ┌─────────┐ │ │
│ │ │Users│ │ Rooms │ │ │
│ │ └─────┘ └─────────┘ │ │
│ └─────────────────────┘ │
└─────────────┬─────────────┘
│
┌─────────────┴─────────────┐
│ Data Layer │
│ │
│ ┌─────────┐ ┌───────────┐ │
│ │Database │ │ Redis │ │
│ │(Messages│ │ (Sessions)│ │
│ │ History)│ │ │ │
│ └─────────┘ └───────────┘ │
└───────────────────────────┘
核心组件 #
- 用户管理:用户注册、登录、在线状态
- 房间管理:房间创建、加入、离开、权限控制
- 消息系统:实时消息、历史消息、私聊、群聊
- 通知系统:用户上线下线、系统通知
- 文件传输:图片、文件分享功能
数据模型设计 #
用户模型 #
// internal/models/user.go
package models
import (
"time"
"golang.org/x/crypto/bcrypt"
)
// 用户状态
type UserStatus int
const (
UserStatusOffline UserStatus = iota
UserStatusOnline
UserStatusAway
UserStatusBusy
)
func (s UserStatus) String() string {
switch s {
case UserStatusOffline:
return "offline"
case UserStatusOnline:
return "online"
case UserStatusAway:
return "away"
case UserStatusBusy:
return "busy"
default:
return "unknown"
}
}
// 用户模型
type User struct {
ID int64 `json:"id" db:"id"`
Username string `json:"username" db:"username"`
Email string `json:"email" db:"email"`
Password string `json:"-" db:"password_hash"`
Nickname string `json:"nickname" db:"nickname"`
Avatar string `json:"avatar" db:"avatar"`
Status UserStatus `json:"status" db:"status"`
LastSeen time.Time `json:"last_seen" db:"last_seen"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
}
// 设置密码
func (u *User) SetPassword(password string) error {
hash, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
if err != nil {
return err
}
u.Password = string(hash)
return nil
}
// 验证密码
func (u *User) CheckPassword(password string) bool {
err := bcrypt.CompareHashAndPassword([]byte(u.Password), []byte(password))
return err == nil
}
// 用户简要信息
type UserInfo struct {
ID int64 `json:"id"`
Username string `json:"username"`
Nickname string `json:"nickname"`
Avatar string `json:"avatar"`
Status UserStatus `json:"status"`
}
// 转换为用户信息
func (u *User) ToUserInfo() *UserInfo {
return &UserInfo{
ID: u.ID,
Username: u.Username,
Nickname: u.Nickname,
Avatar: u.Avatar,
Status: u.Status,
}
}
房间模型 #
// internal/models/room.go
package models
import (
"time"
)
// 房间类型
type RoomType int
const (
RoomTypePublic RoomType = iota
RoomTypePrivate
RoomTypeDirect
)
func (t RoomType) String() string {
switch t {
case RoomTypePublic:
return "public"
case RoomTypePrivate:
return "private"
case RoomTypeDirect:
return "direct"
default:
return "unknown"
}
}
// 房间模型
type Room struct {
ID int64 `json:"id" db:"id"`
Name string `json:"name" db:"name"`
Description string `json:"description" db:"description"`
Type RoomType `json:"type" db:"type"`
OwnerID int64 `json:"owner_id" db:"owner_id"`
MaxMembers int `json:"max_members" db:"max_members"`
IsActive bool `json:"is_active" db:"is_active"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
}
// 房间成员
type RoomMember struct {
ID int64 `json:"id" db:"id"`
RoomID int64 `json:"room_id" db:"room_id"`
UserID int64 `json:"user_id" db:"user_id"`
Role string `json:"role" db:"role"` // owner, admin, member
JoinedAt time.Time `json:"joined_at" db:"joined_at"`
}
// 房间信息(包含成员数量)
type RoomInfo struct {
*Room
MemberCount int `json:"member_count"`
IsJoined bool `json:"is_joined"`
}
消息模型 #
// internal/models/message.go
package models
import (
"encoding/json"
"time"
)
// 消息类型
type MessageType string
const (
MessageTypeText MessageType = "text"
MessageTypeImage MessageType = "image"
MessageTypeFile MessageType = "file"
MessageTypeSystem MessageType = "system"
MessageTypeNotice MessageType = "notice"
)
// 消息模型
type Message struct {
ID int64 `json:"id" db:"id"`
Type MessageType `json:"type" db:"type"`
FromID int64 `json:"from_id" db:"from_id"`
ToID *int64 `json:"to_id,omitempty" db:"to_id"`
RoomID *int64 `json:"room_id,omitempty" db:"room_id"`
Content string `json:"content" db:"content"`
Extra string `json:"extra,omitempty" db:"extra"` // JSON 格式的额外数据
CreatedAt time.Time `json:"created_at" db:"created_at"`
// 关联数据(不存储在数据库)
From *UserInfo `json:"from,omitempty" db:"-"`
To *UserInfo `json:"to,omitempty" db:"-"`
}
// 消息额外数据
type MessageExtra struct {
FileName string `json:"file_name,omitempty"`
FileSize int64 `json:"file_size,omitempty"`
FileURL string `json:"file_url,omitempty"`
Width int `json:"width,omitempty"`
Height int `json:"height,omitempty"`
}
// 设置额外数据
func (m *Message) SetExtra(extra *MessageExtra) error {
if extra == nil {
m.Extra = ""
return nil
}
data, err := json.Marshal(extra)
if err != nil {
return err
}
m.Extra = string(data)
return nil
}
// 获取额外数据
func (m *Message) GetExtra() (*MessageExtra, error) {
if m.Extra == "" {
return nil, nil
}
var extra MessageExtra
err := json.Unmarshal([]byte(m.Extra), &extra)
if err != nil {
return nil, err
}
return &extra, nil
}
用户管理系统 #
用户服务 #
// internal/service/user.go
package service
import (
"database/sql"
"fmt"
"time"
"your-project/internal/models"
"your-project/internal/repository"
)
// 用户服务
type UserService struct {
userRepo *repository.UserRepository
}
// 创建用户服务
func NewUserService(userRepo *repository.UserRepository) *UserService {
return &UserService{
userRepo: userRepo,
}
}
// 用户注册
func (s *UserService) Register(username, email, password, nickname string) (*models.User, error) {
// 检查用户名是否已存在
if exists, err := s.userRepo.ExistsByUsername(username); err != nil {
return nil, err
} else if exists {
return nil, fmt.Errorf("用户名已存在")
}
// 检查邮箱是否已存在
if exists, err := s.userRepo.ExistsByEmail(email); err != nil {
return nil, err
} else if exists {
return nil, fmt.Errorf("邮箱已存在")
}
// 创建用户
user := &models.User{
Username: username,
Email: email,
Nickname: nickname,
Status: models.UserStatusOffline,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
// 设置密码
if err := user.SetPassword(password); err != nil {
return nil, fmt.Errorf("密码加密失败: %v", err)
}
// 保存用户
if err := s.userRepo.Create(user); err != nil {
return nil, fmt.Errorf("创建用户失败: %v", err)
}
return user, nil
}
// 用户登录
func (s *UserService) Login(username, password string) (*models.User, error) {
// 查找用户
user, err := s.userRepo.GetByUsername(username)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("用户不存在")
}
return nil, err
}
// 验证密码
if !user.CheckPassword(password) {
return nil, fmt.Errorf("密码错误")
}
// 更新最后登录时间和状态
user.Status = models.UserStatusOnline
user.LastSeen = time.Now()
user.UpdatedAt = time.Now()
if err := s.userRepo.Update(user); err != nil {
return nil, fmt.Errorf("更新用户状态失败: %v", err)
}
return user, nil
}
// 用户登出
func (s *UserService) Logout(userID int64) error {
user, err := s.userRepo.GetByID(userID)
if err != nil {
return err
}
user.Status = models.UserStatusOffline
user.LastSeen = time.Now()
user.UpdatedAt = time.Now()
return s.userRepo.Update(user)
}
// 更新用户状态
func (s *UserService) UpdateStatus(userID int64, status models.UserStatus) error {
user, err := s.userRepo.GetByID(userID)
if err != nil {
return err
}
user.Status = status
user.LastSeen = time.Now()
user.UpdatedAt = time.Now()
return s.userRepo.Update(user)
}
// 搜索用户
func (s *UserService) SearchUsers(query string, limit int) ([]*models.UserInfo, error) {
users, err := s.userRepo.Search(query, limit)
if err != nil {
return nil, err
}
userInfos := make([]*models.UserInfo, len(users))
for i, user := range users {
userInfos[i] = user.ToUserInfo()
}
return userInfos, nil
}
// 获取在线用户
func (s *UserService) GetOnlineUsers() ([]*models.UserInfo, error) {
users, err := s.userRepo.GetByStatus(models.UserStatusOnline)
if err != nil {
return nil, err
}
userInfos := make([]*models.UserInfo, len(users))
for i, user := range users {
userInfos[i] = user.ToUserInfo()
}
return userInfos, nil
}
认证中间件 #
// internal/middleware/auth.go
package middleware
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/golang-jwt/jwt/v4"
"your-project/internal/models"
)
// JWT 声明
type Claims struct {
UserID int64 `json:"user_id"`
Username string `json:"username"`
jwt.RegisteredClaims
}
// JWT 管理器
type JWTManager struct {
secretKey []byte
issuer string
duration time.Duration
}
// 创建 JWT 管理器
func NewJWTManager(secretKey, issuer string, duration time.Duration) *JWTManager {
return &JWTManager{
secretKey: []byte(secretKey),
issuer: issuer,
duration: duration,
}
}
// 生成 JWT 令牌
func (m *JWTManager) Generate(user *models.User) (string, error) {
claims := &Claims{
UserID: user.ID,
Username: user.Username,
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(m.duration)),
IssuedAt: jwt.NewNumericDate(time.Now()),
Issuer: m.issuer,
},
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString(m.secretKey)
}
// 验证 JWT 令牌
func (m *JWTManager) Verify(tokenString string) (*Claims, error) {
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
}
return m.secretKey, nil
})
if err != nil {
return nil, err
}
if claims, ok := token.Claims.(*Claims); ok && token.Valid {
return claims, nil
}
return nil, fmt.Errorf("invalid token")
}
// HTTP 认证中间件
func (m *JWTManager) HTTPMiddleware(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
http.Error(w, "Missing authorization header", http.StatusUnauthorized)
return
}
tokenString := strings.TrimPrefix(authHeader, "Bearer ")
if tokenString == authHeader {
http.Error(w, "Invalid authorization header format", http.StatusUnauthorized)
return
}
claims, err := m.Verify(tokenString)
if err != nil {
http.Error(w, "Invalid token", http.StatusUnauthorized)
return
}
// 将用户信息添加到上下文
ctx := context.WithValue(r.Context(), "user_id", claims.UserID)
ctx = context.WithValue(ctx, "username", claims.Username)
next.ServeHTTP(w, r.WithContext(ctx))
}
}
// WebSocket 认证
func (m *JWTManager) AuthenticateWebSocket(r *http.Request) (*Claims, error) {
// 从查询参数获取令牌
token := r.URL.Query().Get("token")
if token == "" {
// 从 Header 获取令牌
authHeader := r.Header.Get("Authorization")
if authHeader != "" {
token = strings.TrimPrefix(authHeader, "Bearer ")
}
}
if token == "" {
return nil, fmt.Errorf("missing token")
}
return m.Verify(token)
}
房间管理系统 #
房间服务 #
// internal/service/room.go
package service
import (
"fmt"
"time"
"your-project/internal/models"
"your-project/internal/repository"
)
// 房间服务
type RoomService struct {
roomRepo *repository.RoomRepository
roomMemberRepo *repository.RoomMemberRepository
}
// 创建房间服务
func NewRoomService(roomRepo *repository.RoomRepository, roomMemberRepo *repository.RoomMemberRepository) *RoomService {
return &RoomService{
roomRepo: roomRepo,
roomMemberRepo: roomMemberRepo,
}
}
// 创建房间
func (s *RoomService) CreateRoom(ownerID int64, name, description string, roomType models.RoomType, maxMembers int) (*models.Room, error) {
// 检查房间名是否已存在(对于公共房间)
if roomType == models.RoomTypePublic {
if exists, err := s.roomRepo.ExistsByName(name); err != nil {
return nil, err
} else if exists {
return nil, fmt.Errorf("房间名已存在")
}
}
// 创建房间
room := &models.Room{
Name: name,
Description: description,
Type: roomType,
OwnerID: ownerID,
MaxMembers: maxMembers,
IsActive: true,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if err := s.roomRepo.Create(room); err != nil {
return nil, fmt.Errorf("创建房间失败: %v", err)
}
// 将创建者添加为房间成员
member := &models.RoomMember{
RoomID: room.ID,
UserID: ownerID,
Role: "owner",
JoinedAt: time.Now(),
}
if err := s.roomMemberRepo.Create(member); err != nil {
return nil, fmt.Errorf("添加房间成员失败: %v", err)
}
return room, nil
}
// 加入房间
func (s *RoomService) JoinRoom(userID, roomID int64) error {
// 检查房间是否存在
room, err := s.roomRepo.GetByID(roomID)
if err != nil {
return fmt.Errorf("房间不存在")
}
if !room.IsActive {
return fmt.Errorf("房间已禁用")
}
// 检查是否已经是成员
if exists, err := s.roomMemberRepo.IsMember(roomID, userID); err != nil {
return err
} else if exists {
return fmt.Errorf("已经是房间成员")
}
// 检查房间人数限制
if room.MaxMembers > 0 {
memberCount, err := s.roomMemberRepo.GetMemberCount(roomID)
if err != nil {
return err
}
if memberCount >= room.MaxMembers {
return fmt.Errorf("房间已满")
}
}
// 添加成员
member := &models.RoomMember{
RoomID: roomID,
UserID: userID,
Role: "member",
JoinedAt: time.Now(),
}
return s.roomMemberRepo.Create(member)
}
// 离开房间
func (s *RoomService) LeaveRoom(userID, roomID int64) error {
// 检查是否是房间成员
if exists, err := s.roomMemberRepo.IsMember(roomID, userID); err != nil {
return err
} else if !exists {
return fmt.Errorf("不是房间成员")
}
// 检查是否是房间所有者
room, err := s.roomRepo.GetByID(roomID)
if err != nil {
return err
}
if room.OwnerID == userID {
// 如果是所有者,需要转移所有权或删除房间
memberCount, err := s.roomMemberRepo.GetMemberCount(roomID)
if err != nil {
return err
}
if memberCount > 1 {
// 转移所有权给第一个管理员或成员
members, err := s.roomMemberRepo.GetByRoomID(roomID)
if err != nil {
return err
}
var newOwner *models.RoomMember
for _, member := range members {
if member.UserID != userID {
if member.Role == "admin" {
newOwner = member
break
} else if newOwner == nil {
newOwner = member
}
}
}
if newOwner != nil {
room.OwnerID = newOwner.UserID
if err := s.roomRepo.Update(room); err != nil {
return err
}
newOwner.Role = "owner"
if err := s.roomMemberRepo.Update(newOwner); err != nil {
return err
}
}
} else {
// 删除房间
room.IsActive = false
if err := s.roomRepo.Update(room); err != nil {
return err
}
}
}
// 移除成员
return s.roomMemberRepo.Delete(roomID, userID)
}
// 获取用户的房间列表
func (s *RoomService) GetUserRooms(userID int64) ([]*models.RoomInfo, error) {
rooms, err := s.roomRepo.GetByUserID(userID)
if err != nil {
return nil, err
}
roomInfos := make([]*models.RoomInfo, len(rooms))
for i, room := range rooms {
memberCount, err := s.roomMemberRepo.GetMemberCount(room.ID)
if err != nil {
memberCount = 0
}
roomInfos[i] = &models.RoomInfo{
Room: room,
MemberCount: memberCount,
IsJoined: true,
}
}
return roomInfos, nil
}
// 搜索公共房间
func (s *RoomService) SearchPublicRooms(query string, userID int64, limit int) ([]*models.RoomInfo, error) {
rooms, err := s.roomRepo.SearchPublic(query, limit)
if err != nil {
return nil, err
}
roomInfos := make([]*models.RoomInfo, len(rooms))
for i, room := range rooms {
memberCount, err := s.roomMemberRepo.GetMemberCount(room.ID)
if err != nil {
memberCount = 0
}
isJoined, err := s.roomMemberRepo.IsMember(room.ID, userID)
if err != nil {
isJoined = false
}
roomInfos[i] = &models.RoomInfo{
Room: room,
MemberCount: memberCount,
IsJoined: isJoined,
}
}
return roomInfos, nil
}
// 获取房间成员
func (s *RoomService) GetRoomMembers(roomID int64) ([]*models.UserInfo, error) {
return s.roomMemberRepo.GetMembersInfo(roomID)
}
消息系统 #
消息服务 #
// internal/service/message.go
package service
import (
"fmt"
"time"
"your-project/internal/models"
"your-project/internal/repository"
)
// 消息服务
type MessageService struct {
messageRepo *repository.MessageRepository
userRepo *repository.UserRepository
}
// 创建消息服务
func NewMessageService(messageRepo *repository.MessageRepository, userRepo *repository.UserRepository) *MessageService {
return &MessageService{
messageRepo: messageRepo,
userRepo: userRepo,
}
}
// 发送房间消息
func (s *MessageService) SendRoomMessage(fromID, roomID int64, msgType models.MessageType, content string, extra *models.MessageExtra) (*models.Message, error) {
message := &models.Message{
Type: msgType,
FromID: fromID,
RoomID: &roomID,
Content: content,
CreatedAt: time.Now(),
}
if extra != nil {
if err := message.SetExtra(extra); err != nil {
return nil, fmt.Errorf("设置消息额外数据失败: %v", err)
}
}
if err := s.messageRepo.Create(message); err != nil {
return nil, fmt.Errorf("保存消息失败: %v", err)
}
// 加载发送者信息
if err := s.loadMessageUserInfo(message); err != nil {
return nil, err
}
return message, nil
}
// 发送私人消息
func (s *MessageService) SendPrivateMessage(fromID, toID int64, msgType models.MessageType, content string, extra *models.MessageExtra) (*models.Message, error) {
message := &models.Message{
Type: msgType,
FromID: fromID,
ToID: &toID,
Content: content,
CreatedAt: time.Now(),
}
if extra != nil {
if err := message.SetExtra(extra); err != nil {
return nil, fmt.Errorf("设置消息额外数据失败: %v", err)
}
}
if err := s.messageRepo.Create(message); err != nil {
return nil, fmt.Errorf("保存消息失败: %v", err)
}
// 加载用户信息
if err := s.loadMessageUserInfo(message); err != nil {
return nil, err
}
return message, nil
}
// 获取房间历史消息
func (s *MessageService) GetRoomHistory(roomID int64, limit, offset int) ([]*models.Message, error) {
messages, err := s.messageRepo.GetRoomHistory(roomID, limit, offset)
if err != nil {
return nil, err
}
// 加载用户信息
for _, message := range messages {
if err := s.loadMessageUserInfo(message); err != nil {
continue // 忽略加载失败的消息
}
}
return messages, nil
}
// 获取私聊历史消息
func (s *MessageService) GetPrivateHistory(user1ID, user2ID int64, limit, offset int) ([]*models.Message, error) {
messages, err := s.messageRepo.GetPrivateHistory(user1ID, user2ID, limit, offset)
if err != nil {
return nil, err
}
// 加载用户信息
for _, message := range messages {
if err := s.loadMessageUserInfo(message); err != nil {
continue // 忽略加载失败的消息
}
}
return messages, nil
}
// 加载消息的用户信息
func (s *MessageService) loadMessageUserInfo(message *models.Message) error {
// 加载发送者信息
if message.FromID > 0 {
fromUser, err := s.userRepo.GetByID(message.FromID)
if err == nil {
message.From = fromUser.ToUserInfo()
}
}
// 加载接收者信息(私聊消息)
if message.ToID != nil && *message.ToID > 0 {
toUser, err := s.userRepo.GetByID(*message.ToID)
if err == nil {
message.To = toUser.ToUserInfo()
}
}
return nil
}
WebSocket 消息处理 #
聊天消息处理器 #
// internal/handler/chat.go
package handler
import (
"encoding/json"
"fmt"
"log"
"strconv"
"time"
"your-project/internal/models"
"your-project/internal/service"
"your-project/internal/websocket"
)
// 聊天消息处理器
type ChatHandler struct {
userService *service.UserService
roomService *service.RoomService
messageService *service.MessageService
hub *websocket.Hub
}
// 创建聊天处理器
func NewChatHandler(
userService *service.UserService,
roomService *service.RoomService,
messageService *service.MessageService,
hub *websocket.Hub,
) *ChatHandler {
return &ChatHandler{
userService: userService,
roomService: roomService,
messageService: messageService,
hub: hub,
}
}
// WebSocket 消息
type WSMessage struct {
Type string `json:"type"`
Data interface{} `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
// 处理 WebSocket 消息
func (h *ChatHandler) HandleMessage(client *websocket.Client, msgType string, data []byte) error {
switch msgType {
case "chat_message":
return h.handleChatMessage(client, data)
case "join_room":
return h.handleJoinRoom(client, data)
case "leave_room":
return h.handleLeaveRoom(client, data)
case "private_message":
return h.handlePrivateMessage(client, data)
case "get_room_history":
return h.handleGetRoomHistory(client, data)
case "get_private_history":
return h.handleGetPrivateHistory(client, data)
case "get_online_users":
return h.handleGetOnlineUsers(client, data)
case "get_user_rooms":
return h.handleGetUserRooms(client, data)
case "search_rooms":
return h.handleSearchRooms(client, data)
case "create_room":
return h.handleCreateRoom(client, data)
default:
return fmt.Errorf("unknown message type: %s", msgType)
}
}
// 处理聊天消息
func (h *ChatHandler) handleChatMessage(client *websocket.Client, data []byte) error {
var req struct {
RoomID int64 `json:"room_id"`
Content string `json:"content"`
}
if err := json.Unmarshal(data, &req); err != nil {
return err
}
if req.Content == "" {
return fmt.Errorf("消息内容不能为空")
}
// 检查是否是房间成员
// 这里简化处理,实际应该检查权限
// 保存消息
message, err := h.messageService.SendRoomMessage(
client.UserID,
req.RoomID,
models.MessageTypeText,
req.Content,
nil,
)
if err != nil {
return err
}
// 广播消息到房间
wsMsg := &WSMessage{
Type: "chat_message",
Data: message,
Timestamp: time.Now(),
}
msgData, err := json.Marshal(wsMsg)
if err != nil {
return err
}
h.hub.BroadcastToRoom(strconv.FormatInt(req.RoomID, 10), msgData)
return nil
}
// 处理加入房间
func (h *ChatHandler) handleJoinRoom(client *websocket.Client, data []byte) error {
var req struct {
RoomID int64 `json:"room_id"`
}
if err := json.Unmarshal(data, &req); err != nil {
return err
}
// 加入房间
if err := h.roomService.JoinRoom(client.UserID, req.RoomID); err != nil {
return err
}
// 将客户端添加到房间
roomKey := strconv.FormatInt(req.RoomID, 10)
h.hub.JoinRoom(client, roomKey)
// 通知房间其他成员
wsMsg := &WSMessage{
Type: "user_joined",
Data: map[string]interface{}{
"room_id": req.RoomID,
"user": client.UserInfo,
},
Timestamp: time.Now(),
}
msgData, err := json.Marshal(wsMsg)
if err != nil {
return err
}
h.hub.BroadcastToRoom(roomKey, msgData)
// 发送确认消息给客户端
confirmMsg := &WSMessage{
Type: "room_joined",
Data: map[string]interface{}{
"room_id": req.RoomID,
"message": "成功加入房间",
},
Timestamp: time.Now(),
}
return client.SendMessage(confirmMsg)
}
// 处理离开房间
func (h *ChatHandler) handleLeaveRoom(client *websocket.Client, data []byte) error {
var req struct {
RoomID int64 `json:"room_id"`
}
if err := json.Unmarshal(data, &req); err != nil {
return err
}
// 离开房间
if err := h.roomService.LeaveRoom(client.UserID, req.RoomID); err != nil {
return err
}
// 从房间移除客户端
roomKey := strconv.FormatInt(req.RoomID, 10)
h.hub.LeaveRoom(client, roomKey)
// 通知房间其他成员
wsMsg := &WSMessage{
Type: "user_left",
Data: map[string]interface{}{
"room_id": req.RoomID,
"user": client.UserInfo,
},
Timestamp: time.Now(),
}
msgData, err := json.Marshal(wsMsg)
if err != nil {
return err
}
h.hub.BroadcastToRoom(roomKey, msgData)
// 发送确认消息给客户端
confirmMsg := &WSMessage{
Type: "room_left",
Data: map[string]interface{}{
"room_id": req.RoomID,
"message": "已离开房间",
},
Timestamp: time.Now(),
}
return client.SendMessage(confirmMsg)
}
// 处理私人消息
func (h *ChatHandler) handlePrivateMessage(client *websocket.Client, data []byte) error {
var req struct {
ToUserID int64 `json:"to_user_id"`
Content string `json:"content"`
}
if err := json.Unmarshal(data, &req); err != nil {
return err
}
if req.Content == "" {
return fmt.Errorf("消息内容不能为空")
}
// 保存消息
message, err := h.messageService.SendPrivateMessage(
client.UserID,
req.ToUserID,
models.MessageTypeText,
req.Content,
nil,
)
if err != nil {
return err
}
// 发送消息给目标用户
wsMsg := &WSMessage{
Type: "private_message",
Data: message,
Timestamp: time.Now(),
}
msgData, err := json.Marshal(wsMsg)
if err != nil {
return err
}
// 发送给目标用户
h.hub.SendToUser(req.ToUserID, msgData)
// 发送确认给发送者
confirmMsg := &WSMessage{
Type: "message_sent",
Data: map[string]interface{}{
"message_id": message.ID,
"to_user_id": req.ToUserID,
},
Timestamp: time.Now(),
}
return client.SendMessage(confirmMsg)
}
// 处理获取房间历史消息
func (h *ChatHandler) handleGetRoomHistory(client *websocket.Client, data []byte) error {
var req struct {
RoomID int64 `json:"room_id"`
Limit int `json:"limit"`
Offset int `json:"offset"`
}
if err := json.Unmarshal(data, &req); err != nil {
return err
}
if req.Limit <= 0 {
req.Limit = 50
}
messages, err := h.messageService.GetRoomHistory(req.RoomID, req.Limit, req.Offset)
if err != nil {
return err
}
wsMsg := &WSMessage{
Type: "room_history",
Data: map[string]interface{}{
"room_id": req.RoomID,
"messages": messages,
},
Timestamp: time.Now(),
}
return client.SendMessage(wsMsg)
}
// 处理获取私聊历史消息
func (h *ChatHandler) handleGetPrivateHistory(client *websocket.Client, data []byte) error {
var req struct {
WithUserID int64 `json:"with_user_id"`
Limit int `json:"limit"`
Offset int `json:"offset"`
}
if err := json.Unmarshal(data, &req); err != nil {
return err
}
if req.Limit <= 0 {
req.Limit = 50
}
messages, err := h.messageService.GetPrivateHistory(client.UserID, req.WithUserID, req.Limit, req.Offset)
if err != nil {
return err
}
wsMsg := &WSMessage{
Type: "private_history",
Data: map[string]interface{}{
"with_user_id": req.WithUserID,
"messages": messages,
},
Timestamp: time.Now(),
}
return client.SendMessage(wsMsg)
}
// 处理获取在线用户
func (h *ChatHandler) handleGetOnlineUsers(client *websocket.Client, data []byte) error {
users, err := h.userService.GetOnlineUsers()
if err != nil {
return err
}
wsMsg := &WSMessage{
Type: "online_users",
Data: map[string]interface{}{
"users": users,
"count": len(users),
},
Timestamp: time.Now(),
}
return client.SendMessage(wsMsg)
}
// 处理获取用户房间列表
func (h *ChatHandler) handleGetUserRooms(client *websocket.Client, data []byte) error {
rooms, err := h.roomService.GetUserRooms(client.UserID)
if err != nil {
return err
}
wsMsg := &WSMessage{
Type: "user_rooms",
Data: map[string]interface{}{
"rooms": rooms,
},
Timestamp: time.Now(),
}
return client.SendMessage(wsMsg)
}
// 处理搜索房间
func (h *ChatHandler) handleSearchRooms(client *websocket.Client, data []byte) error {
var req struct {
Query string `json:"query"`
Limit int `json:"limit"`
}
if err := json.Unmarshal(data, &req); err != nil {
return err
}
if req.Limit <= 0 {
req.Limit = 20
}
rooms, err := h.roomService.SearchPublicRooms(req.Query, client.UserID, req.Limit)
if err != nil {
return err
}
wsMsg := &WSMessage{
Type: "search_rooms_result",
Data: map[string]interface{}{
"query": req.Query,
"rooms": rooms,
},
Timestamp: time.Now(),
}
return client.SendMessage(wsMsg)
}
// 处理创建房间
func (h *ChatHandler) handleCreateRoom(client *websocket.Client, data []byte) error {
var req struct {
Name string `json:"name"`
Description string `json:"description"`
Type models.RoomType `json:"type"`
MaxMembers int `json:"max_members"`
}
if err := json.Unmarshal(data, &req); err != nil {
return err
}
if req.Name == "" {
return fmt.Errorf("房间名不能为空")
}
room, err := h.roomService.CreateRoom(
client.UserID,
req.Name,
req.Description,
req.Type,
req.MaxMembers,
)
if err != nil {
return err
}
wsMsg := &WSMessage{
Type: "room_created",
Data: map[string]interface{}{
"room": room,
},
Timestamp: time.Now(),
}
return client.SendMessage(wsMsg)
}
小结 #
本节通过构建一个完整的实时聊天系统,展示了如何综合运用 WebSocket 技术:
- 系统架构:分层设计、模块化开发
- 数据模型:用户、房间、消息的完整建模
- 用户管理:注册登录、状态管理、JWT 认证
- 房间管理:房间创建、成员管理、权限控制
- 消息系统:实时消息、历史记录、私聊群聊
- WebSocket 处理:消息路由、事件处理、状态同步
通过这个实战项目,您可以深入理解 WebSocket 在实际应用中的使用方法,为开发更复杂的实时应用打下坚实基础。