3.7.1 gRPC 基础与协议

3.7.1 gRPC 基础与协议 #

gRPC 是一个现代化的开源高性能 RPC 框架,它能够在任何环境中运行。它可以通过可插拔的支持来实现负载均衡、跟踪、运行状况检查和身份验证,并且可以高效地连接数据中心内和跨数据中心的服务。

gRPC 核心概念 #

什么是 RPC #

远程过程调用(Remote Procedure Call,RPC)是一种计算机通信协议,允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。

// 传统的本地函数调用
result := calculateSum(a, b)

// RPC 调用看起来类似,但实际上是跨网络的
result := remoteService.CalculateSum(context.Background(), &pb.SumRequest{
    A: a,
    B: b,
})

gRPC 的特点 #

1. 基于 HTTP/2

  • 多路复用:单个连接可以并发处理多个请求
  • 服务器推送:支持流式数据传输
  • 头部压缩:减少网络开销
  • 二进制协议:更高的传输效率

2. Protocol Buffers

  • 强类型的接口定义语言
  • 高效的序列化和反序列化
  • 向后兼容性
  • 多语言支持

3. 四种调用模式

  • 一元 RPC(Unary RPC)
  • 服务端流式 RPC(Server Streaming RPC)
  • 客户端流式 RPC(Client Streaming RPC)
  • 双向流式 RPC(Bidirectional Streaming RPC)

gRPC vs REST API #

性能对比 #

特性 gRPC REST API
协议 HTTP/2 HTTP/1.1
数据格式 Protocol Buffers JSON
类型安全 强类型 弱类型
流式支持 原生支持 需要额外实现
浏览器支持 有限 完全支持
调试友好性 需要工具 人类可读

适用场景 #

gRPC 适用于:

  • 微服务间的内部通信
  • 高性能要求的场景
  • 需要流式处理的应用
  • 多语言环境的服务集成

REST API 适用于:

  • 公开的 Web API
  • 浏览器直接调用
  • 简单的 CRUD 操作
  • 需要人类可读的接口

gRPC 架构 #

基本架构 #

┌─────────────┐    gRPC Call    ┌─────────────┐
│   Client    │ ──────────────► │   Server    │
│             │                 │             │
│ ┌─────────┐ │                 │ ┌─────────┐ │
│ │  Stub   │ │                 │ │ Service │ │
│ └─────────┘ │                 │ │  Impl   │ │
└─────────────┘                 │ └─────────┘ │
                                 └─────────────┘

组件说明 #

1. 服务定义(Service Definition) 使用 Protocol Buffers 定义服务接口和消息类型。

2. 服务端(Server) 实现服务接口,处理客户端请求。

3. 客户端存根(Client Stub) 自动生成的代码,提供类型安全的调用接口。

4. 传输层(Transport) 基于 HTTP/2 的网络传输。

安装和环境配置 #

安装 Protocol Buffers 编译器 #

# macOS
brew install protobuf

# Ubuntu/Debian
sudo apt-get install protobuf-compiler

# 验证安装
protoc --version

安装 Go 插件 #

# 安装 protoc-gen-go 插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest

# 安装 protoc-gen-go-grpc 插件
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# 确保插件在 PATH 中
export PATH="$PATH:$(go env GOPATH)/bin"

项目依赖 #

# 初始化 Go 模块
go mod init grpc-example

# 添加 gRPC 依赖
go get google.golang.org/grpc
go get google.golang.org/protobuf

第一个 gRPC 服务 #

1. 定义服务接口 #

创建 proto/hello.proto 文件:

syntax = "proto3";

package hello;

option go_package = "github.com/example/grpc-example/proto/hello";

// 定义服务
service HelloService {
  // 一元 RPC
  rpc SayHello(HelloRequest) returns (HelloResponse);

  // 服务端流式 RPC
  rpc SayHelloStream(HelloRequest) returns (stream HelloResponse);
}

// 请求消息
message HelloRequest {
  string name = 1;
  int32 age = 2;
}

// 响应消息
message HelloResponse {
  string message = 1;
  int64 timestamp = 2;
}

2. 生成代码 #

# 生成 Go 代码
protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       proto/hello.proto

这将生成两个文件:

  • proto/hello.pb.go:包含消息类型定义
  • proto/hello_grpc.pb.go:包含服务接口定义

3. 实现服务端 #

package main

import (
    "context"
    "fmt"
    "log"
    "net"
    "time"

    "google.golang.org/grpc"
    pb "github.com/example/grpc-example/proto/hello"
)

// 实现 HelloService 接口
type helloServer struct {
    pb.UnimplementedHelloServiceServer
}

// 实现 SayHello 方法
func (s *helloServer) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloResponse, error) {
    log.Printf("Received request: name=%s, age=%d", req.GetName(), req.GetAge())

    response := &pb.HelloResponse{
        Message:   fmt.Sprintf("Hello %s! You are %d years old.", req.GetName(), req.GetAge()),
        Timestamp: time.Now().Unix(),
    }

    return response, nil
}

// 实现 SayHelloStream 方法
func (s *helloServer) SayHelloStream(req *pb.HelloRequest, stream pb.HelloService_SayHelloStreamServer) error {
    log.Printf("Received stream request: name=%s", req.GetName())

    for i := 0; i < 5; i++ {
        response := &pb.HelloResponse{
            Message:   fmt.Sprintf("Hello %s! Message #%d", req.GetName(), i+1),
            Timestamp: time.Now().Unix(),
        }

        if err := stream.Send(response); err != nil {
            return err
        }

        time.Sleep(1 * time.Second)
    }

    return nil
}

func main() {
    // 创建监听器
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }

    // 创建 gRPC 服务器
    s := grpc.NewServer()

    // 注册服务
    pb.RegisterHelloServiceServer(s, &helloServer{})

    log.Println("gRPC server listening on :50051")

    // 启动服务器
    if err := s.Serve(lis); err != nil {
        log.Fatalf("Failed to serve: %v", err)
    }
}

4. 实现客户端 #

package main

import (
    "context"
    "io"
    "log"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    pb "github.com/example/grpc-example/proto/hello"
)

func main() {
    // 建立连接
    conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()

    // 创建客户端
    client := pb.NewHelloServiceClient(conn)

    // 一元 RPC 调用
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()

    response, err := client.SayHello(ctx, &pb.HelloRequest{
        Name: "Alice",
        Age:  25,
    })
    if err != nil {
        log.Fatalf("SayHello failed: %v", err)
    }
    log.Printf("Response: %s (timestamp: %d)", response.GetMessage(), response.GetTimestamp())

    // 服务端流式 RPC 调用
    stream, err := client.SayHelloStream(context.Background(), &pb.HelloRequest{
        Name: "Bob",
    })
    if err != nil {
        log.Fatalf("SayHelloStream failed: %v", err)
    }

    for {
        response, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("Stream receive failed: %v", err)
        }
        log.Printf("Stream response: %s", response.GetMessage())
    }
}

gRPC 状态码 #

gRPC 使用标准的状态码来表示 RPC 调用的结果:

import (
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

// 返回错误状态
func (s *helloServer) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloResponse, error) {
    if req.GetName() == "" {
        return nil, status.Errorf(codes.InvalidArgument, "name cannot be empty")
    }

    // 正常处理...
    return response, nil
}

// 客户端处理错误
response, err := client.SayHello(ctx, req)
if err != nil {
    if st, ok := status.FromError(err); ok {
        log.Printf("gRPC error: code=%s, message=%s", st.Code(), st.Message())
    }
    return
}

常用状态码:

  • OK:成功
  • INVALID_ARGUMENT:无效参数
  • NOT_FOUND:资源未找到
  • ALREADY_EXISTS:资源已存在
  • PERMISSION_DENIED:权限不足
  • UNAUTHENTICATED:未认证
  • INTERNAL:内部错误
  • UNAVAILABLE:服务不可用

最佳实践 #

1. 错误处理 #

// 服务端:返回结构化错误
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
    if req.GetUserId() <= 0 {
        return nil, status.Error(codes.InvalidArgument, "user_id must be positive")
    }

    user, err := s.userService.GetUser(req.GetUserId())
    if err != nil {
        if errors.Is(err, ErrUserNotFound) {
            return nil, status.Error(codes.NotFound, "user not found")
        }
        return nil, status.Error(codes.Internal, "internal server error")
    }

    return user, nil
}

2. 超时控制 #

// 客户端:设置超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

response, err := client.SayHello(ctx, req)

3. 连接管理 #

// 使用连接池
conn, err := grpc.Dial(
    "localhost:50051",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithKeepaliveParams(keepalive.ClientParameters{
        Time:                10 * time.Second,
        Timeout:             time.Second,
        PermitWithoutStream: true,
    }),
)

小结 #

本节介绍了 gRPC 的基础概念、架构设计和基本使用方法。我们学习了:

  1. gRPC 核心概念:RPC 原理、HTTP/2 基础、Protocol Buffers
  2. 环境配置:安装编译器和插件
  3. 服务开发:定义接口、实现服务、创建客户端
  4. 错误处理:状态码使用和错误传播
  5. 最佳实践:超时控制、连接管理

gRPC 为构建高性能的分布式系统提供了强大的基础。在下一节中,我们将深入学习 Protocol Buffers 的语法和高级特性。