3.7.3 gRPC 服务端开发 #
gRPC 服务端是整个 gRPC 系统的核心,负责处理客户端请求、执行业务逻辑并返回响应。本节将详细介绍如何构建高性能、可扩展的 gRPC 服务端应用。
服务端架构 #
基本组件 #
┌─────────────────────────────────────────┐
│ gRPC Server │
├─────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ Interceptors│ │ Service Impl │ │
│ │ │ │ │ │
│ │ - Auth │ │ - Business │ │
│ │ - Logging │ │ Logic │ │
│ │ - Metrics │ │ - Data Access │ │
│ └─────────────┘ └─────────────────┘ │
├─────────────────────────────────────────┤
│ gRPC Transport Layer │
│ (HTTP/2) │
└─────────────────────────────────────────┘
服务端生命周期 #
- 初始化阶段:创建服务器实例、注册服务
- 启动阶段:绑定端口、开始监听
- 运行阶段:处理客户端请求
- 关闭阶段:优雅关闭、清理资源
基础服务端实现 #
1. 定义服务接口 #
首先定义 proto/user.proto
:
syntax = "proto3";
package user.v1;
option go_package = "github.com/example/grpc-server/proto/user/v1";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
service UserService {
// 一元 RPC
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
// 服务端流式 RPC
rpc ListUsers(ListUsersRequest) returns (stream User);
// 客户端流式 RPC
rpc BatchCreateUsers(stream CreateUserRequest) returns (BatchCreateUsersResponse);
// 双向流式 RPC
rpc SyncUsers(stream UserSyncRequest) returns (stream UserSyncResponse);
}
message User {
int64 id = 1;
string name = 2;
string email = 3;
int32 age = 4;
google.protobuf.Timestamp created_at = 5;
google.protobuf.Timestamp updated_at = 6;
}
message GetUserRequest {
int64 user_id = 1;
}
message GetUserResponse {
User user = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
int32 age = 3;
}
message CreateUserResponse {
User user = 1;
}
message UpdateUserRequest {
int64 user_id = 1;
string name = 2;
string email = 3;
int32 age = 4;
}
message UpdateUserResponse {
User user = 1;
}
message DeleteUserRequest {
int64 user_id = 1;
}
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
string filter = 3;
}
message BatchCreateUsersResponse {
repeated User users = 1;
int32 created_count = 2;
repeated string errors = 3;
}
message UserSyncRequest {
oneof action {
User create_user = 1;
UpdateUserRequest update_user = 2;
DeleteUserRequest delete_user = 3;
}
}
message UserSyncResponse {
bool success = 1;
string message = 2;
User user = 3;
}
2. 数据层实现 #
// internal/repository/user.go
package repository
import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/lib/pq"
)
type User struct {
ID int64 `db:"id"`
Name string `db:"name"`
Email string `db:"email"`
Age int32 `db:"age"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
}
type UserRepository struct {
db *sql.DB
}
func NewUserRepository(db *sql.DB) *UserRepository {
return &UserRepository{db: db}
}
func (r *UserRepository) GetUser(ctx context.Context, userID int64) (*User, error) {
query := `
SELECT id, name, email, age, created_at, updated_at
FROM users
WHERE id = $1
`
var user User
err := r.db.QueryRowContext(ctx, query, userID).Scan(
&user.ID,
&user.Name,
&user.Email,
&user.Age,
&user.CreatedAt,
&user.UpdatedAt,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("user not found: %d", userID)
}
return nil, fmt.Errorf("failed to get user: %w", err)
}
return &user, nil
}
func (r *UserRepository) CreateUser(ctx context.Context, name, email string, age int32) (*User, error) {
query := `
INSERT INTO users (name, email, age, created_at, updated_at)
VALUES ($1, $2, $3, NOW(), NOW())
RETURNING id, name, email, age, created_at, updated_at
`
var user User
err := r.db.QueryRowContext(ctx, query, name, email, age).Scan(
&user.ID,
&user.Name,
&user.Email,
&user.Age,
&user.CreatedAt,
&user.UpdatedAt,
)
if err != nil {
return nil, fmt.Errorf("failed to create user: %w", err)
}
return &user, nil
}
func (r *UserRepository) UpdateUser(ctx context.Context, userID int64, name, email string, age int32) (*User, error) {
query := `
UPDATE users
SET name = $2, email = $3, age = $4, updated_at = NOW()
WHERE id = $1
RETURNING id, name, email, age, created_at, updated_at
`
var user User
err := r.db.QueryRowContext(ctx, query, userID, name, email, age).Scan(
&user.ID,
&user.Name,
&user.Email,
&user.Age,
&user.CreatedAt,
&user.UpdatedAt,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("user not found: %d", userID)
}
return nil, fmt.Errorf("failed to update user: %w", err)
}
return &user, nil
}
func (r *UserRepository) DeleteUser(ctx context.Context, userID int64) error {
query := `DELETE FROM users WHERE id = $1`
result, err := r.db.ExecContext(ctx, query, userID)
if err != nil {
return fmt.Errorf("failed to delete user: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rowsAffected == 0 {
return fmt.Errorf("user not found: %d", userID)
}
return nil
}
func (r *UserRepository) ListUsers(ctx context.Context, limit int32, offset int32, filter string) ([]*User, error) {
query := `
SELECT id, name, email, age, created_at, updated_at
FROM users
WHERE ($3 = '' OR name ILIKE '%' || $3 || '%' OR email ILIKE '%' || $3 || '%')
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
`
rows, err := r.db.QueryContext(ctx, query, limit, offset, filter)
if err != nil {
return nil, fmt.Errorf("failed to list users: %w", err)
}
defer rows.Close()
var users []*User
for rows.Next() {
var user User
err := rows.Scan(
&user.ID,
&user.Name,
&user.Email,
&user.Age,
&user.CreatedAt,
&user.UpdatedAt,
)
if err != nil {
return nil, fmt.Errorf("failed to scan user: %w", err)
}
users = append(users, &user)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows iteration error: %w", err)
}
return users, nil
}
3. 服务层实现 #
// internal/service/user.go
package service
import (
"context"
"fmt"
"io"
"log"
"strconv"
"strings"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
pb "github.com/example/grpc-server/proto/user/v1"
"github.com/example/grpc-server/internal/repository"
)
type UserService struct {
pb.UnimplementedUserServiceServer
userRepo *repository.UserRepository
}
func NewUserService(userRepo *repository.UserRepository) *UserService {
return &UserService{
userRepo: userRepo,
}
}
// 一元 RPC 实现
func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
if req.GetUserId() <= 0 {
return nil, status.Errorf(codes.InvalidArgument, "user_id must be positive")
}
user, err := s.userRepo.GetUser(ctx, req.GetUserId())
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, status.Errorf(codes.NotFound, "user not found")
}
log.Printf("Failed to get user: %v", err)
return nil, status.Errorf(codes.Internal, "internal server error")
}
return &pb.GetUserResponse{
User: s.convertToProtoUser(user),
}, nil
}
func (s *UserService) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
// 验证输入
if req.GetName() == "" {
return nil, status.Errorf(codes.InvalidArgument, "name is required")
}
if req.GetEmail() == "" {
return nil, status.Errorf(codes.InvalidArgument, "email is required")
}
if req.GetAge() < 0 {
return nil, status.Errorf(codes.InvalidArgument, "age must be non-negative")
}
user, err := s.userRepo.CreateUser(ctx, req.GetName(), req.GetEmail(), req.GetAge())
if err != nil {
log.Printf("Failed to create user: %v", err)
return nil, status.Errorf(codes.Internal, "failed to create user")
}
return &pb.CreateUserResponse{
User: s.convertToProtoUser(user),
}, nil
}
func (s *UserService) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*pb.UpdateUserResponse, error) {
if req.GetUserId() <= 0 {
return nil, status.Errorf(codes.InvalidArgument, "user_id must be positive")
}
user, err := s.userRepo.UpdateUser(ctx, req.GetUserId(), req.GetName(), req.GetEmail(), req.GetAge())
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, status.Errorf(codes.NotFound, "user not found")
}
log.Printf("Failed to update user: %v", err)
return nil, status.Errorf(codes.Internal, "failed to update user")
}
return &pb.UpdateUserResponse{
User: s.convertToProtoUser(user),
}, nil
}
func (s *UserService) DeleteUser(ctx context.Context, req *pb.DeleteUserRequest) (*emptypb.Empty, error) {
if req.GetUserId() <= 0 {
return nil, status.Errorf(codes.InvalidArgument, "user_id must be positive")
}
err := s.userRepo.DeleteUser(ctx, req.GetUserId())
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, status.Errorf(codes.NotFound, "user not found")
}
log.Printf("Failed to delete user: %v", err)
return nil, status.Errorf(codes.Internal, "failed to delete user")
}
return &emptypb.Empty{}, nil
}
// 服务端流式 RPC 实现
func (s *UserService) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
pageSize := req.GetPageSize()
if pageSize <= 0 {
pageSize = 10
}
if pageSize > 100 {
pageSize = 100
}
offset := int32(0)
if req.GetPageToken() != "" {
if parsedOffset, err := strconv.ParseInt(req.GetPageToken(), 10, 32); err == nil {
offset = int32(parsedOffset)
}
}
users, err := s.userRepo.ListUsers(stream.Context(), pageSize, offset, req.GetFilter())
if err != nil {
log.Printf("Failed to list users: %v", err)
return status.Errorf(codes.Internal, "failed to list users")
}
for _, user := range users {
if err := stream.Send(s.convertToProtoUser(user)); err != nil {
log.Printf("Failed to send user: %v", err)
return status.Errorf(codes.Internal, "failed to send user")
}
}
return nil
}
// 客户端流式 RPC 实现
func (s *UserService) BatchCreateUsers(stream pb.UserService_BatchCreateUsersServer) error {
var users []*pb.User
var errors []string
createdCount := int32(0)
for {
req, err := stream.Recv()
if err == io.EOF {
// 客户端完成发送
return stream.SendAndClose(&pb.BatchCreateUsersResponse{
Users: users,
CreatedCount: createdCount,
Errors: errors,
})
}
if err != nil {
log.Printf("Failed to receive create user request: %v", err)
return status.Errorf(codes.Internal, "failed to receive request")
}
// 验证并创建用户
if req.GetName() == "" || req.GetEmail() == "" {
errors = append(errors, fmt.Sprintf("invalid user data: name=%s, email=%s", req.GetName(), req.GetEmail()))
continue
}
user, err := s.userRepo.CreateUser(stream.Context(), req.GetName(), req.GetEmail(), req.GetAge())
if err != nil {
errors = append(errors, fmt.Sprintf("failed to create user %s: %v", req.GetName(), err))
continue
}
users = append(users, s.convertToProtoUser(user))
createdCount++
}
}
// 双向流式 RPC 实现
func (s *UserService) SyncUsers(stream pb.UserService_SyncUsersServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
log.Printf("Failed to receive sync request: %v", err)
return status.Errorf(codes.Internal, "failed to receive request")
}
var response *pb.UserSyncResponse
switch action := req.Action.(type) {
case *pb.UserSyncRequest_CreateUser:
user, err := s.userRepo.CreateUser(
stream.Context(),
action.CreateUser.GetName(),
action.CreateUser.GetEmail(),
action.CreateUser.GetAge(),
)
if err != nil {
response = &pb.UserSyncResponse{
Success: false,
Message: fmt.Sprintf("failed to create user: %v", err),
}
} else {
response = &pb.UserSyncResponse{
Success: true,
Message: "user created successfully",
User: s.convertToProtoUser(user),
}
}
case *pb.UserSyncRequest_UpdateUser:
user, err := s.userRepo.UpdateUser(
stream.Context(),
action.UpdateUser.GetUserId(),
action.UpdateUser.GetName(),
action.UpdateUser.GetEmail(),
action.UpdateUser.GetAge(),
)
if err != nil {
response = &pb.UserSyncResponse{
Success: false,
Message: fmt.Sprintf("failed to update user: %v", err),
}
} else {
response = &pb.UserSyncResponse{
Success: true,
Message: "user updated successfully",
User: s.convertToProtoUser(user),
}
}
case *pb.UserSyncRequest_DeleteUser:
err := s.userRepo.DeleteUser(stream.Context(), action.DeleteUser.GetUserId())
if err != nil {
response = &pb.UserSyncResponse{
Success: false,
Message: fmt.Sprintf("failed to delete user: %v", err),
}
} else {
response = &pb.UserSyncResponse{
Success: true,
Message: "user deleted successfully",
}
}
default:
response = &pb.UserSyncResponse{
Success: false,
Message: "unknown action",
}
}
if err := stream.Send(response); err != nil {
log.Printf("Failed to send sync response: %v", err)
return status.Errorf(codes.Internal, "failed to send response")
}
}
}
// 辅助方法:转换数据模型
func (s *UserService) convertToProtoUser(user *repository.User) *pb.User {
return &pb.User{
Id: user.ID,
Name: user.Name,
Email: user.Email,
Age: user.Age,
CreatedAt: timestamppb.New(user.CreatedAt),
UpdatedAt: timestamppb.New(user.UpdatedAt),
}
}
4. 服务器启动 #
// cmd/server/main.go
package main
import (
"context"
"database/sql"
"log"
"net"
"os"
"os/signal"
"syscall"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
pb "github.com/example/grpc-server/proto/user/v1"
"github.com/example/grpc-server/internal/repository"
"github.com/example/grpc-server/internal/service"
)
func main() {
// 数据库连接
db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer db.Close()
// 验证数据库连接
if err := db.Ping(); err != nil {
log.Fatalf("Failed to ping database: %v", err)
}
// 创建仓库和服务
userRepo := repository.NewUserRepository(db)
userService := service.NewUserService(userRepo)
// 创建监听器
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
// 创建 gRPC 服务器
s := grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Second,
MaxConnectionAge: 30 * time.Second,
MaxConnectionAgeGrace: 5 * time.Second,
Time: 5 * time.Second,
Timeout: 1 * time.Second,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 5 * time.Second,
PermitWithoutStream: true,
}),
)
// 注册服务
pb.RegisterUserServiceServer(s, userService)
// 启用反射(用于调试)
reflection.Register(s)
// 优雅关闭
go func() {
log.Println("gRPC server listening on :50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}()
// 等待中断信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down gRPC server...")
// 优雅关闭
done := make(chan struct{})
go func() {
s.GracefulStop()
close(done)
}()
// 等待关闭完成或超时
select {
case <-done:
log.Println("gRPC server stopped gracefully")
case <-time.After(30 * time.Second):
log.Println("gRPC server shutdown timeout, forcing stop")
s.Stop()
}
}
高级特性 #
1. 健康检查 #
// internal/health/health.go
package health
import (
"context"
"database/sql"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)
type HealthChecker struct {
db *sql.DB
}
func NewHealthChecker(db *sql.DB) *HealthChecker {
return &HealthChecker{db: db}
}
func (h *HealthChecker) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
// 检查数据库连接
if err := h.db.PingContext(ctx); err != nil {
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
}, nil
}
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
}, nil
}
func (h *HealthChecker) Watch(req *grpc_health_v1.HealthCheckRequest, stream grpc_health_v1.Health_WatchServer) error {
return status.Errorf(codes.Unimplemented, "health check watch not implemented")
}
// 在 main.go 中注册
// grpc_health_v1.RegisterHealthServer(s, health.NewHealthChecker(db))
2. 服务器配置 #
// internal/config/config.go
package config
import (
"time"
)
type Config struct {
Server ServerConfig `yaml:"server"`
Database DatabaseConfig `yaml:"database"`
Logging LoggingConfig `yaml:"logging"`
}
type ServerConfig struct {
Port int `yaml:"port"`
MaxConnectionIdle time.Duration `yaml:"max_connection_idle"`
MaxConnectionAge time.Duration `yaml:"max_connection_age"`
MaxConnectionAgeGrace time.Duration `yaml:"max_connection_age_grace"`
KeepAliveTime time.Duration `yaml:"keep_alive_time"`
KeepAliveTimeout time.Duration `yaml:"keep_alive_timeout"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
MaxSendMsgSize int `yaml:"max_send_msg_size"`
MaxConcurrentStreams uint32 `yaml:"max_concurrent_streams"`
}
type DatabaseConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
User string `yaml:"user"`
Password string `yaml:"password"`
Database string `yaml:"database"`
SSLMode string `yaml:"ssl_mode"`
MaxOpenConns int `yaml:"max_open_conns"`
MaxIdleConns int `yaml:"max_idle_conns"`
ConnMaxLifetime time.Duration `yaml:"conn_max_lifetime"`
}
type LoggingConfig struct {
Level string `yaml:"level"`
Format string `yaml:"format"`
}
3. 中间件集成 #
// internal/middleware/logging.go
package middleware
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func LoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
log.Printf("gRPC call: %s started", info.FullMethod)
resp, err := handler(ctx, req)
duration := time.Since(start)
code := codes.OK
if err != nil {
if st, ok := status.FromError(err); ok {
code = st.Code()
} else {
code = codes.Unknown
}
}
log.Printf("gRPC call: %s completed in %v with code %s", info.FullMethod, duration, code)
return resp, err
}
func StreamLoggingInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
start := time.Now()
log.Printf("gRPC stream: %s started", info.FullMethod)
err := handler(srv, stream)
duration := time.Since(start)
code := codes.OK
if err != nil {
if st, ok := status.FromError(err); ok {
code = st.Code()
} else {
code = codes.Unknown
}
}
log.Printf("gRPC stream: %s completed in %v with code %s", info.FullMethod, duration, code)
return err
}
4. 错误处理中间件 #
// internal/middleware/recovery.go
package middleware
import (
"context"
"log"
"runtime/debug"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func RecoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic recovered in %s: %v\n%s", info.FullMethod, r, debug.Stack())
err = status.Errorf(codes.Internal, "internal server error")
}
}()
return handler(ctx, req)
}
func StreamRecoveryInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic recovered in stream %s: %v\n%s", info.FullMethod, r, debug.Stack())
err = status.Errorf(codes.Internal, "internal server error")
}
}()
return handler(srv, stream)
}
性能优化 #
1. 连接池配置 #
func createOptimizedServer() *grpc.Server {
return grpc.NewServer(
// 连接保活设置
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Second, // 连接空闲超时
MaxConnectionAge: 30 * time.Second, // 连接最大存活时间
MaxConnectionAgeGrace: 5 * time.Second, // 连接优雅关闭时间
Time: 5 * time.Second, // ping 间隔
Timeout: 1 * time.Second, // ping 超时
}),
// 连接保活策略
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // 最小 ping 间隔
PermitWithoutStream: true, // 允许无流时 ping
}),
// 消息大小限制
grpc.MaxRecvMsgSize(4 * 1024 * 1024), // 4MB
grpc.MaxSendMsgSize(4 * 1024 * 1024), // 4MB
// 并发流限制
grpc.MaxConcurrentStreams(1000),
)
}
2. 数据库连接优化 #
func setupDatabase(config DatabaseConfig) (*sql.DB, error) {
db, err := sql.Open("postgres", buildDSN(config))
if err != nil {
return nil, err
}
// 连接池配置
db.SetMaxOpenConns(config.MaxOpenConns) // 最大打开连接数
db.SetMaxIdleConns(config.MaxIdleConns) // 最大空闲连接数
db.SetConnMaxLifetime(config.ConnMaxLifetime) // 连接最大生存时间
return db, nil
}
3. 内存优化 #
// 使用对象池减少内存分配
var userPool = sync.Pool{
New: func() interface{} {
return &pb.User{}
},
}
func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
// 从池中获取对象
user := userPool.Get().(*pb.User)
defer func() {
// 重置对象并放回池中
user.Reset()
userPool.Put(user)
}()
// 使用 user 对象...
}
监控和指标 #
1. Prometheus 指标 #
// internal/metrics/metrics.go
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
RequestsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "grpc_requests_total",
Help: "Total number of gRPC requests",
},
[]string{"method", "status"},
)
RequestDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "grpc_request_duration_seconds",
Help: "Duration of gRPC requests",
Buckets: prometheus.DefBuckets,
},
[]string{"method"},
)
ActiveConnections = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "grpc_active_connections",
Help: "Number of active gRPC connections",
},
)
)
2. 指标中间件 #
func MetricsInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
duration := time.Since(start)
status := "success"
if err != nil {
status = "error"
}
RequestsTotal.WithLabelValues(info.FullMethod, status).Inc()
RequestDuration.WithLabelValues(info.FullMethod).Observe(duration.Seconds())
return resp, err
}
小结 #
本节详细介绍了 gRPC 服务端开发的各个方面:
- 基础实现:服务接口定义、数据层、服务层实现
- 服务器配置:连接管理、健康检查、优雅关闭
- 高级特性:中间件、错误处理、恢复机制
- 性能优化:连接池、内存管理、并发控制
- 监控指标:Prometheus 集成、性能监控
通过这些内容,您可以构建出高性能、可扩展的 gRPC 服务端应用。在下一节中,我们将学习如何开发 gRPC 客户端。