3.7.3 gRPC 服务端开发

3.7.3 gRPC 服务端开发 #

gRPC 服务端是整个 gRPC 系统的核心,负责处理客户端请求、执行业务逻辑并返回响应。本节将详细介绍如何构建高性能、可扩展的 gRPC 服务端应用。

服务端架构 #

基本组件 #

┌─────────────────────────────────────────┐
│              gRPC Server                │
├─────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────────┐   │
│  │ Interceptors│  │   Service Impl  │   │
│  │             │  │                 │   │
│  │ - Auth      │  │ - Business      │   │
│  │ - Logging   │  │   Logic         │   │
│  │ - Metrics   │  │ - Data Access   │   │
│  └─────────────┘  └─────────────────┘   │
├─────────────────────────────────────────┤
│           gRPC Transport Layer          │
│              (HTTP/2)                   │
└─────────────────────────────────────────┘

服务端生命周期 #

  1. 初始化阶段:创建服务器实例、注册服务
  2. 启动阶段:绑定端口、开始监听
  3. 运行阶段:处理客户端请求
  4. 关闭阶段:优雅关闭、清理资源

基础服务端实现 #

1. 定义服务接口 #

首先定义 proto/user.proto

syntax = "proto3";

package user.v1;

option go_package = "github.com/example/grpc-server/proto/user/v1";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

service UserService {
  // 一元 RPC
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
  rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
  rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);

  // 服务端流式 RPC
  rpc ListUsers(ListUsersRequest) returns (stream User);

  // 客户端流式 RPC
  rpc BatchCreateUsers(stream CreateUserRequest) returns (BatchCreateUsersResponse);

  // 双向流式 RPC
  rpc SyncUsers(stream UserSyncRequest) returns (stream UserSyncResponse);
}

message User {
  int64 id = 1;
  string name = 2;
  string email = 3;
  int32 age = 4;
  google.protobuf.Timestamp created_at = 5;
  google.protobuf.Timestamp updated_at = 6;
}

message GetUserRequest {
  int64 user_id = 1;
}

message GetUserResponse {
  User user = 1;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
  int32 age = 3;
}

message CreateUserResponse {
  User user = 1;
}

message UpdateUserRequest {
  int64 user_id = 1;
  string name = 2;
  string email = 3;
  int32 age = 4;
}

message UpdateUserResponse {
  User user = 1;
}

message DeleteUserRequest {
  int64 user_id = 1;
}

message ListUsersRequest {
  int32 page_size = 1;
  string page_token = 2;
  string filter = 3;
}

message BatchCreateUsersResponse {
  repeated User users = 1;
  int32 created_count = 2;
  repeated string errors = 3;
}

message UserSyncRequest {
  oneof action {
    User create_user = 1;
    UpdateUserRequest update_user = 2;
    DeleteUserRequest delete_user = 3;
  }
}

message UserSyncResponse {
  bool success = 1;
  string message = 2;
  User user = 3;
}

2. 数据层实现 #

// internal/repository/user.go
package repository

import (
    "context"
    "database/sql"
    "fmt"
    "time"

    _ "github.com/lib/pq"
)

type User struct {
    ID        int64     `db:"id"`
    Name      string    `db:"name"`
    Email     string    `db:"email"`
    Age       int32     `db:"age"`
    CreatedAt time.Time `db:"created_at"`
    UpdatedAt time.Time `db:"updated_at"`
}

type UserRepository struct {
    db *sql.DB
}

func NewUserRepository(db *sql.DB) *UserRepository {
    return &UserRepository{db: db}
}

func (r *UserRepository) GetUser(ctx context.Context, userID int64) (*User, error) {
    query := `
        SELECT id, name, email, age, created_at, updated_at
        FROM users
        WHERE id = $1
    `

    var user User
    err := r.db.QueryRowContext(ctx, query, userID).Scan(
        &user.ID,
        &user.Name,
        &user.Email,
        &user.Age,
        &user.CreatedAt,
        &user.UpdatedAt,
    )

    if err != nil {
        if err == sql.ErrNoRows {
            return nil, fmt.Errorf("user not found: %d", userID)
        }
        return nil, fmt.Errorf("failed to get user: %w", err)
    }

    return &user, nil
}

func (r *UserRepository) CreateUser(ctx context.Context, name, email string, age int32) (*User, error) {
    query := `
        INSERT INTO users (name, email, age, created_at, updated_at)
        VALUES ($1, $2, $3, NOW(), NOW())
        RETURNING id, name, email, age, created_at, updated_at
    `

    var user User
    err := r.db.QueryRowContext(ctx, query, name, email, age).Scan(
        &user.ID,
        &user.Name,
        &user.Email,
        &user.Age,
        &user.CreatedAt,
        &user.UpdatedAt,
    )

    if err != nil {
        return nil, fmt.Errorf("failed to create user: %w", err)
    }

    return &user, nil
}

func (r *UserRepository) UpdateUser(ctx context.Context, userID int64, name, email string, age int32) (*User, error) {
    query := `
        UPDATE users
        SET name = $2, email = $3, age = $4, updated_at = NOW()
        WHERE id = $1
        RETURNING id, name, email, age, created_at, updated_at
    `

    var user User
    err := r.db.QueryRowContext(ctx, query, userID, name, email, age).Scan(
        &user.ID,
        &user.Name,
        &user.Email,
        &user.Age,
        &user.CreatedAt,
        &user.UpdatedAt,
    )

    if err != nil {
        if err == sql.ErrNoRows {
            return nil, fmt.Errorf("user not found: %d", userID)
        }
        return nil, fmt.Errorf("failed to update user: %w", err)
    }

    return &user, nil
}

func (r *UserRepository) DeleteUser(ctx context.Context, userID int64) error {
    query := `DELETE FROM users WHERE id = $1`

    result, err := r.db.ExecContext(ctx, query, userID)
    if err != nil {
        return fmt.Errorf("failed to delete user: %w", err)
    }

    rowsAffected, err := result.RowsAffected()
    if err != nil {
        return fmt.Errorf("failed to get rows affected: %w", err)
    }

    if rowsAffected == 0 {
        return fmt.Errorf("user not found: %d", userID)
    }

    return nil
}

func (r *UserRepository) ListUsers(ctx context.Context, limit int32, offset int32, filter string) ([]*User, error) {
    query := `
        SELECT id, name, email, age, created_at, updated_at
        FROM users
        WHERE ($3 = '' OR name ILIKE '%' || $3 || '%' OR email ILIKE '%' || $3 || '%')
        ORDER BY created_at DESC
        LIMIT $1 OFFSET $2
    `

    rows, err := r.db.QueryContext(ctx, query, limit, offset, filter)
    if err != nil {
        return nil, fmt.Errorf("failed to list users: %w", err)
    }
    defer rows.Close()

    var users []*User
    for rows.Next() {
        var user User
        err := rows.Scan(
            &user.ID,
            &user.Name,
            &user.Email,
            &user.Age,
            &user.CreatedAt,
            &user.UpdatedAt,
        )
        if err != nil {
            return nil, fmt.Errorf("failed to scan user: %w", err)
        }
        users = append(users, &user)
    }

    if err := rows.Err(); err != nil {
        return nil, fmt.Errorf("rows iteration error: %w", err)
    }

    return users, nil
}

3. 服务层实现 #

// internal/service/user.go
package service

import (
    "context"
    "fmt"
    "io"
    "log"
    "strconv"
    "strings"
    "time"

    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "google.golang.org/protobuf/types/known/emptypb"
    "google.golang.org/protobuf/types/known/timestamppb"

    pb "github.com/example/grpc-server/proto/user/v1"
    "github.com/example/grpc-server/internal/repository"
)

type UserService struct {
    pb.UnimplementedUserServiceServer
    userRepo *repository.UserRepository
}

func NewUserService(userRepo *repository.UserRepository) *UserService {
    return &UserService{
        userRepo: userRepo,
    }
}

// 一元 RPC 实现
func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    if req.GetUserId() <= 0 {
        return nil, status.Errorf(codes.InvalidArgument, "user_id must be positive")
    }

    user, err := s.userRepo.GetUser(ctx, req.GetUserId())
    if err != nil {
        if strings.Contains(err.Error(), "not found") {
            return nil, status.Errorf(codes.NotFound, "user not found")
        }
        log.Printf("Failed to get user: %v", err)
        return nil, status.Errorf(codes.Internal, "internal server error")
    }

    return &pb.GetUserResponse{
        User: s.convertToProtoUser(user),
    }, nil
}

func (s *UserService) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
    // 验证输入
    if req.GetName() == "" {
        return nil, status.Errorf(codes.InvalidArgument, "name is required")
    }
    if req.GetEmail() == "" {
        return nil, status.Errorf(codes.InvalidArgument, "email is required")
    }
    if req.GetAge() < 0 {
        return nil, status.Errorf(codes.InvalidArgument, "age must be non-negative")
    }

    user, err := s.userRepo.CreateUser(ctx, req.GetName(), req.GetEmail(), req.GetAge())
    if err != nil {
        log.Printf("Failed to create user: %v", err)
        return nil, status.Errorf(codes.Internal, "failed to create user")
    }

    return &pb.CreateUserResponse{
        User: s.convertToProtoUser(user),
    }, nil
}

func (s *UserService) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*pb.UpdateUserResponse, error) {
    if req.GetUserId() <= 0 {
        return nil, status.Errorf(codes.InvalidArgument, "user_id must be positive")
    }

    user, err := s.userRepo.UpdateUser(ctx, req.GetUserId(), req.GetName(), req.GetEmail(), req.GetAge())
    if err != nil {
        if strings.Contains(err.Error(), "not found") {
            return nil, status.Errorf(codes.NotFound, "user not found")
        }
        log.Printf("Failed to update user: %v", err)
        return nil, status.Errorf(codes.Internal, "failed to update user")
    }

    return &pb.UpdateUserResponse{
        User: s.convertToProtoUser(user),
    }, nil
}

func (s *UserService) DeleteUser(ctx context.Context, req *pb.DeleteUserRequest) (*emptypb.Empty, error) {
    if req.GetUserId() <= 0 {
        return nil, status.Errorf(codes.InvalidArgument, "user_id must be positive")
    }

    err := s.userRepo.DeleteUser(ctx, req.GetUserId())
    if err != nil {
        if strings.Contains(err.Error(), "not found") {
            return nil, status.Errorf(codes.NotFound, "user not found")
        }
        log.Printf("Failed to delete user: %v", err)
        return nil, status.Errorf(codes.Internal, "failed to delete user")
    }

    return &emptypb.Empty{}, nil
}

// 服务端流式 RPC 实现
func (s *UserService) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
    pageSize := req.GetPageSize()
    if pageSize <= 0 {
        pageSize = 10
    }
    if pageSize > 100 {
        pageSize = 100
    }

    offset := int32(0)
    if req.GetPageToken() != "" {
        if parsedOffset, err := strconv.ParseInt(req.GetPageToken(), 10, 32); err == nil {
            offset = int32(parsedOffset)
        }
    }

    users, err := s.userRepo.ListUsers(stream.Context(), pageSize, offset, req.GetFilter())
    if err != nil {
        log.Printf("Failed to list users: %v", err)
        return status.Errorf(codes.Internal, "failed to list users")
    }

    for _, user := range users {
        if err := stream.Send(s.convertToProtoUser(user)); err != nil {
            log.Printf("Failed to send user: %v", err)
            return status.Errorf(codes.Internal, "failed to send user")
        }
    }

    return nil
}

// 客户端流式 RPC 实现
func (s *UserService) BatchCreateUsers(stream pb.UserService_BatchCreateUsersServer) error {
    var users []*pb.User
    var errors []string
    createdCount := int32(0)

    for {
        req, err := stream.Recv()
        if err == io.EOF {
            // 客户端完成发送
            return stream.SendAndClose(&pb.BatchCreateUsersResponse{
                Users:        users,
                CreatedCount: createdCount,
                Errors:       errors,
            })
        }
        if err != nil {
            log.Printf("Failed to receive create user request: %v", err)
            return status.Errorf(codes.Internal, "failed to receive request")
        }

        // 验证并创建用户
        if req.GetName() == "" || req.GetEmail() == "" {
            errors = append(errors, fmt.Sprintf("invalid user data: name=%s, email=%s", req.GetName(), req.GetEmail()))
            continue
        }

        user, err := s.userRepo.CreateUser(stream.Context(), req.GetName(), req.GetEmail(), req.GetAge())
        if err != nil {
            errors = append(errors, fmt.Sprintf("failed to create user %s: %v", req.GetName(), err))
            continue
        }

        users = append(users, s.convertToProtoUser(user))
        createdCount++
    }
}

// 双向流式 RPC 实现
func (s *UserService) SyncUsers(stream pb.UserService_SyncUsersServer) error {
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            log.Printf("Failed to receive sync request: %v", err)
            return status.Errorf(codes.Internal, "failed to receive request")
        }

        var response *pb.UserSyncResponse

        switch action := req.Action.(type) {
        case *pb.UserSyncRequest_CreateUser:
            user, err := s.userRepo.CreateUser(
                stream.Context(),
                action.CreateUser.GetName(),
                action.CreateUser.GetEmail(),
                action.CreateUser.GetAge(),
            )
            if err != nil {
                response = &pb.UserSyncResponse{
                    Success: false,
                    Message: fmt.Sprintf("failed to create user: %v", err),
                }
            } else {
                response = &pb.UserSyncResponse{
                    Success: true,
                    Message: "user created successfully",
                    User:    s.convertToProtoUser(user),
                }
            }

        case *pb.UserSyncRequest_UpdateUser:
            user, err := s.userRepo.UpdateUser(
                stream.Context(),
                action.UpdateUser.GetUserId(),
                action.UpdateUser.GetName(),
                action.UpdateUser.GetEmail(),
                action.UpdateUser.GetAge(),
            )
            if err != nil {
                response = &pb.UserSyncResponse{
                    Success: false,
                    Message: fmt.Sprintf("failed to update user: %v", err),
                }
            } else {
                response = &pb.UserSyncResponse{
                    Success: true,
                    Message: "user updated successfully",
                    User:    s.convertToProtoUser(user),
                }
            }

        case *pb.UserSyncRequest_DeleteUser:
            err := s.userRepo.DeleteUser(stream.Context(), action.DeleteUser.GetUserId())
            if err != nil {
                response = &pb.UserSyncResponse{
                    Success: false,
                    Message: fmt.Sprintf("failed to delete user: %v", err),
                }
            } else {
                response = &pb.UserSyncResponse{
                    Success: true,
                    Message: "user deleted successfully",
                }
            }

        default:
            response = &pb.UserSyncResponse{
                Success: false,
                Message: "unknown action",
            }
        }

        if err := stream.Send(response); err != nil {
            log.Printf("Failed to send sync response: %v", err)
            return status.Errorf(codes.Internal, "failed to send response")
        }
    }
}

// 辅助方法:转换数据模型
func (s *UserService) convertToProtoUser(user *repository.User) *pb.User {
    return &pb.User{
        Id:        user.ID,
        Name:      user.Name,
        Email:     user.Email,
        Age:       user.Age,
        CreatedAt: timestamppb.New(user.CreatedAt),
        UpdatedAt: timestamppb.New(user.UpdatedAt),
    }
}

4. 服务器启动 #

// cmd/server/main.go
package main

import (
    "context"
    "database/sql"
    "log"
    "net"
    "os"
    "os/signal"
    "syscall"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/keepalive"
    "google.golang.org/grpc/reflection"

    pb "github.com/example/grpc-server/proto/user/v1"
    "github.com/example/grpc-server/internal/repository"
    "github.com/example/grpc-server/internal/service"
)

func main() {
    // 数据库连接
    db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
    if err != nil {
        log.Fatalf("Failed to connect to database: %v", err)
    }
    defer db.Close()

    // 验证数据库连接
    if err := db.Ping(); err != nil {
        log.Fatalf("Failed to ping database: %v", err)
    }

    // 创建仓库和服务
    userRepo := repository.NewUserRepository(db)
    userService := service.NewUserService(userRepo)

    // 创建监听器
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }

    // 创建 gRPC 服务器
    s := grpc.NewServer(
        grpc.KeepaliveParams(keepalive.ServerParameters{
            MaxConnectionIdle:     15 * time.Second,
            MaxConnectionAge:      30 * time.Second,
            MaxConnectionAgeGrace: 5 * time.Second,
            Time:                  5 * time.Second,
            Timeout:               1 * time.Second,
        }),
        grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
            MinTime:             5 * time.Second,
            PermitWithoutStream: true,
        }),
    )

    // 注册服务
    pb.RegisterUserServiceServer(s, userService)

    // 启用反射(用于调试)
    reflection.Register(s)

    // 优雅关闭
    go func() {
        log.Println("gRPC server listening on :50051")
        if err := s.Serve(lis); err != nil {
            log.Fatalf("Failed to serve: %v", err)
        }
    }()

    // 等待中断信号
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Println("Shutting down gRPC server...")

    // 优雅关闭
    done := make(chan struct{})
    go func() {
        s.GracefulStop()
        close(done)
    }()

    // 等待关闭完成或超时
    select {
    case <-done:
        log.Println("gRPC server stopped gracefully")
    case <-time.After(30 * time.Second):
        log.Println("gRPC server shutdown timeout, forcing stop")
        s.Stop()
    }
}

高级特性 #

1. 健康检查 #

// internal/health/health.go
package health

import (
    "context"
    "database/sql"

    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/health/grpc_health_v1"
    "google.golang.org/grpc/status"
)

type HealthChecker struct {
    db *sql.DB
}

func NewHealthChecker(db *sql.DB) *HealthChecker {
    return &HealthChecker{db: db}
}

func (h *HealthChecker) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
    // 检查数据库连接
    if err := h.db.PingContext(ctx); err != nil {
        return &grpc_health_v1.HealthCheckResponse{
            Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
        }, nil
    }

    return &grpc_health_v1.HealthCheckResponse{
        Status: grpc_health_v1.HealthCheckResponse_SERVING,
    }, nil
}

func (h *HealthChecker) Watch(req *grpc_health_v1.HealthCheckRequest, stream grpc_health_v1.Health_WatchServer) error {
    return status.Errorf(codes.Unimplemented, "health check watch not implemented")
}

// 在 main.go 中注册
// grpc_health_v1.RegisterHealthServer(s, health.NewHealthChecker(db))

2. 服务器配置 #

// internal/config/config.go
package config

import (
    "time"
)

type Config struct {
    Server   ServerConfig   `yaml:"server"`
    Database DatabaseConfig `yaml:"database"`
    Logging  LoggingConfig  `yaml:"logging"`
}

type ServerConfig struct {
    Port                    int           `yaml:"port"`
    MaxConnectionIdle       time.Duration `yaml:"max_connection_idle"`
    MaxConnectionAge        time.Duration `yaml:"max_connection_age"`
    MaxConnectionAgeGrace   time.Duration `yaml:"max_connection_age_grace"`
    KeepAliveTime           time.Duration `yaml:"keep_alive_time"`
    KeepAliveTimeout        time.Duration `yaml:"keep_alive_timeout"`
    MaxRecvMsgSize          int           `yaml:"max_recv_msg_size"`
    MaxSendMsgSize          int           `yaml:"max_send_msg_size"`
    MaxConcurrentStreams    uint32        `yaml:"max_concurrent_streams"`
}

type DatabaseConfig struct {
    Host            string        `yaml:"host"`
    Port            int           `yaml:"port"`
    User            string        `yaml:"user"`
    Password        string        `yaml:"password"`
    Database        string        `yaml:"database"`
    SSLMode         string        `yaml:"ssl_mode"`
    MaxOpenConns    int           `yaml:"max_open_conns"`
    MaxIdleConns    int           `yaml:"max_idle_conns"`
    ConnMaxLifetime time.Duration `yaml:"conn_max_lifetime"`
}

type LoggingConfig struct {
    Level  string `yaml:"level"`
    Format string `yaml:"format"`
}

3. 中间件集成 #

// internal/middleware/logging.go
package middleware

import (
    "context"
    "log"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

func LoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    start := time.Now()

    log.Printf("gRPC call: %s started", info.FullMethod)

    resp, err := handler(ctx, req)

    duration := time.Since(start)
    code := codes.OK
    if err != nil {
        if st, ok := status.FromError(err); ok {
            code = st.Code()
        } else {
            code = codes.Unknown
        }
    }

    log.Printf("gRPC call: %s completed in %v with code %s", info.FullMethod, duration, code)

    return resp, err
}

func StreamLoggingInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    start := time.Now()

    log.Printf("gRPC stream: %s started", info.FullMethod)

    err := handler(srv, stream)

    duration := time.Since(start)
    code := codes.OK
    if err != nil {
        if st, ok := status.FromError(err); ok {
            code = st.Code()
        } else {
            code = codes.Unknown
        }
    }

    log.Printf("gRPC stream: %s completed in %v with code %s", info.FullMethod, duration, code)

    return err
}

4. 错误处理中间件 #

// internal/middleware/recovery.go
package middleware

import (
    "context"
    "log"
    "runtime/debug"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

func RecoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("Panic recovered in %s: %v\n%s", info.FullMethod, r, debug.Stack())
            err = status.Errorf(codes.Internal, "internal server error")
        }
    }()

    return handler(ctx, req)
}

func StreamRecoveryInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("Panic recovered in stream %s: %v\n%s", info.FullMethod, r, debug.Stack())
            err = status.Errorf(codes.Internal, "internal server error")
        }
    }()

    return handler(srv, stream)
}

性能优化 #

1. 连接池配置 #

func createOptimizedServer() *grpc.Server {
    return grpc.NewServer(
        // 连接保活设置
        grpc.KeepaliveParams(keepalive.ServerParameters{
            MaxConnectionIdle:     15 * time.Second,  // 连接空闲超时
            MaxConnectionAge:      30 * time.Second,  // 连接最大存活时间
            MaxConnectionAgeGrace: 5 * time.Second,   // 连接优雅关闭时间
            Time:                  5 * time.Second,   // ping 间隔
            Timeout:               1 * time.Second,   // ping 超时
        }),

        // 连接保活策略
        grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
            MinTime:             5 * time.Second,  // 最小 ping 间隔
            PermitWithoutStream: true,             // 允许无流时 ping
        }),

        // 消息大小限制
        grpc.MaxRecvMsgSize(4 * 1024 * 1024),  // 4MB
        grpc.MaxSendMsgSize(4 * 1024 * 1024),  // 4MB

        // 并发流限制
        grpc.MaxConcurrentStreams(1000),
    )
}

2. 数据库连接优化 #

func setupDatabase(config DatabaseConfig) (*sql.DB, error) {
    db, err := sql.Open("postgres", buildDSN(config))
    if err != nil {
        return nil, err
    }

    // 连接池配置
    db.SetMaxOpenConns(config.MaxOpenConns)       // 最大打开连接数
    db.SetMaxIdleConns(config.MaxIdleConns)       // 最大空闲连接数
    db.SetConnMaxLifetime(config.ConnMaxLifetime) // 连接最大生存时间

    return db, nil
}

3. 内存优化 #

// 使用对象池减少内存分配
var userPool = sync.Pool{
    New: func() interface{} {
        return &pb.User{}
    },
}

func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    // 从池中获取对象
    user := userPool.Get().(*pb.User)
    defer func() {
        // 重置对象并放回池中
        user.Reset()
        userPool.Put(user)
    }()

    // 使用 user 对象...
}

监控和指标 #

1. Prometheus 指标 #

// internal/metrics/metrics.go
package metrics

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    RequestsTotal = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "grpc_requests_total",
            Help: "Total number of gRPC requests",
        },
        []string{"method", "status"},
    )

    RequestDuration = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "grpc_request_duration_seconds",
            Help:    "Duration of gRPC requests",
            Buckets: prometheus.DefBuckets,
        },
        []string{"method"},
    )

    ActiveConnections = promauto.NewGauge(
        prometheus.GaugeOpts{
            Name: "grpc_active_connections",
            Help: "Number of active gRPC connections",
        },
    )
)

2. 指标中间件 #

func MetricsInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    start := time.Now()

    resp, err := handler(ctx, req)

    duration := time.Since(start)
    status := "success"
    if err != nil {
        status = "error"
    }

    RequestsTotal.WithLabelValues(info.FullMethod, status).Inc()
    RequestDuration.WithLabelValues(info.FullMethod).Observe(duration.Seconds())

    return resp, err
}

小结 #

本节详细介绍了 gRPC 服务端开发的各个方面:

  1. 基础实现:服务接口定义、数据层、服务层实现
  2. 服务器配置:连接管理、健康检查、优雅关闭
  3. 高级特性:中间件、错误处理、恢复机制
  4. 性能优化:连接池、内存管理、并发控制
  5. 监控指标:Prometheus 集成、性能监控

通过这些内容,您可以构建出高性能、可扩展的 gRPC 服务端应用。在下一节中,我们将学习如何开发 gRPC 客户端。