3.7.4 gRPC 客户端开发

3.7.4 gRPC 客户端开发 #

gRPC 客户端是与 gRPC 服务端通信的重要组件,负责发起 RPC 调用、处理响应和管理连接。本节将详细介绍如何开发高效、可靠的 gRPC 客户端应用。

客户端架构 #

基本组件 #

┌─────────────────────────────────────────┐
│              gRPC Client                │
├─────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────────┐   │
│  │ Interceptors│  │  Client Stub    │   │
│  │             │  │                 │   │
│  │ - Retry     │  │ - Method Calls  │   │
│  │ - Auth      │  │ - Stream Mgmt   │   │
│  │ - Metrics   │  │ - Error Handle  │   │
│  └─────────────┘  └─────────────────┘   │
├─────────────────────────────────────────┤
│           Connection Pool               │
│              (HTTP/2)                   │
└─────────────────────────────────────────┘

客户端生命周期 #

  1. 连接建立:创建到服务端的连接
  2. 调用执行:发起 RPC 调用
  3. 响应处理:处理服务端响应
  4. 连接管理:维护连接状态
  5. 资源清理:关闭连接和清理资源

基础客户端实现 #

1. 简单客户端 #

// cmd/client/main.go
package main

import (
    "context"
    "log"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"

    pb "github.com/example/grpc-client/proto/user/v1"
)

func main() {
    // 建立连接
    conn, err := grpc.Dial(
        "localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()

    // 创建客户端
    client := pb.NewUserServiceClient(conn)

    // 一元 RPC 调用
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // 创建用户
    createResp, err := client.CreateUser(ctx, &pb.CreateUserRequest{
        Name:  "Alice",
        Email: "[email protected]",
        Age:   25,
    })
    if err != nil {
        log.Fatalf("CreateUser failed: %v", err)
    }
    log.Printf("Created user: %+v", createResp.User)

    // 获取用户
    getResp, err := client.GetUser(ctx, &pb.GetUserRequest{
        UserId: createResp.User.Id,
    })
    if err != nil {
        log.Fatalf("GetUser failed: %v", err)
    }
    log.Printf("Retrieved user: %+v", getResp.User)
}

2. 客户端封装 #

// internal/client/user_client.go
package client

import (
    "context"
    "fmt"
    "io"
    "log"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/keepalive"
    "google.golang.org/grpc/status"

    pb "github.com/example/grpc-client/proto/user/v1"
)

type UserClient struct {
    conn   *grpc.ClientConn
    client pb.UserServiceClient
}

type ClientConfig struct {
    Address     string
    Timeout     time.Duration
    KeepAlive   time.Duration
    MaxRetries  int
    EnableTLS   bool
}

func NewUserClient(config ClientConfig) (*UserClient, error) {
    var opts []grpc.DialOption

    // 传输凭证
    if config.EnableTLS {
        // 这里可以配置 TLS 凭证
        // opts = append(opts, grpc.WithTransportCredentials(creds))
    } else {
        opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
    }

    // 连接保活
    opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
        Time:                config.KeepAlive,
        Timeout:             time.Second,
        PermitWithoutStream: true,
    }))

    // 建立连接
    conn, err := grpc.Dial(config.Address, opts...)
    if err != nil {
        return nil, fmt.Errorf("failed to connect to %s: %w", config.Address, err)
    }

    return &UserClient{
        conn:   conn,
        client: pb.NewUserServiceClient(conn),
    }, nil
}

func (c *UserClient) Close() error {
    return c.conn.Close()
}

// 一元 RPC 方法
func (c *UserClient) CreateUser(ctx context.Context, name, email string, age int32) (*pb.User, error) {
    req := &pb.CreateUserRequest{
        Name:  name,
        Email: email,
        Age:   age,
    }

    resp, err := c.client.CreateUser(ctx, req)
    if err != nil {
        return nil, c.handleError("CreateUser", err)
    }

    return resp.User, nil
}

func (c *UserClient) GetUser(ctx context.Context, userID int64) (*pb.User, error) {
    req := &pb.GetUserRequest{
        UserId: userID,
    }

    resp, err := c.client.GetUser(ctx, req)
    if err != nil {
        return nil, c.handleError("GetUser", err)
    }

    return resp.User, nil
}

func (c *UserClient) UpdateUser(ctx context.Context, userID int64, name, email string, age int32) (*pb.User, error) {
    req := &pb.UpdateUserRequest{
        UserId: userID,
        Name:   name,
        Email:  email,
        Age:    age,
    }

    resp, err := c.client.UpdateUser(ctx, req)
    if err != nil {
        return nil, c.handleError("UpdateUser", err)
    }

    return resp.User, nil
}

func (c *UserClient) DeleteUser(ctx context.Context, userID int64) error {
    req := &pb.DeleteUserRequest{
        UserId: userID,
    }

    _, err := c.client.DeleteUser(ctx, req)
    if err != nil {
        return c.handleError("DeleteUser", err)
    }

    return nil
}

// 服务端流式 RPC
func (c *UserClient) ListUsers(ctx context.Context, pageSize int32, pageToken, filter string) ([]*pb.User, error) {
    req := &pb.ListUsersRequest{
        PageSize:  pageSize,
        PageToken: pageToken,
        Filter:    filter,
    }

    stream, err := c.client.ListUsers(ctx, req)
    if err != nil {
        return nil, c.handleError("ListUsers", err)
    }

    var users []*pb.User
    for {
        user, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return nil, c.handleError("ListUsers.Recv", err)
        }
        users = append(users, user)
    }

    return users, nil
}

// 客户端流式 RPC
func (c *UserClient) BatchCreateUsers(ctx context.Context, users []CreateUserData) (*pb.BatchCreateUsersResponse, error) {
    stream, err := c.client.BatchCreateUsers(ctx)
    if err != nil {
        return nil, c.handleError("BatchCreateUsers", err)
    }

    // 发送用户数据
    for _, userData := range users {
        req := &pb.CreateUserRequest{
            Name:  userData.Name,
            Email: userData.Email,
            Age:   userData.Age,
        }

        if err := stream.Send(req); err != nil {
            return nil, c.handleError("BatchCreateUsers.Send", err)
        }
    }

    // 关闭发送并接收响应
    resp, err := stream.CloseAndRecv()
    if err != nil {
        return nil, c.handleError("BatchCreateUsers.CloseAndRecv", err)
    }

    return resp, nil
}

// 双向流式 RPC
func (c *UserClient) SyncUsers(ctx context.Context) (*UserSyncStream, error) {
    stream, err := c.client.SyncUsers(ctx)
    if err != nil {
        return nil, c.handleError("SyncUsers", err)
    }

    return &UserSyncStream{
        stream: stream,
        client: c,
    }, nil
}

// 错误处理
func (c *UserClient) handleError(method string, err error) error {
    if st, ok := status.FromError(err); ok {
        switch st.Code() {
        case codes.NotFound:
            return fmt.Errorf("%s: resource not found: %s", method, st.Message())
        case codes.InvalidArgument:
            return fmt.Errorf("%s: invalid argument: %s", method, st.Message())
        case codes.Unauthenticated:
            return fmt.Errorf("%s: authentication required: %s", method, st.Message())
        case codes.PermissionDenied:
            return fmt.Errorf("%s: permission denied: %s", method, st.Message())
        case codes.Unavailable:
            return fmt.Errorf("%s: service unavailable: %s", method, st.Message())
        default:
            return fmt.Errorf("%s: %s (%s)", method, st.Message(), st.Code())
        }
    }
    return fmt.Errorf("%s: %w", method, err)
}

// 辅助类型
type CreateUserData struct {
    Name  string
    Email string
    Age   int32
}

// 双向流封装
type UserSyncStream struct {
    stream pb.UserService_SyncUsersClient
    client *UserClient
}

func (s *UserSyncStream) CreateUser(user *pb.User) error {
    req := &pb.UserSyncRequest{
        Action: &pb.UserSyncRequest_CreateUser{
            CreateUser: user,
        },
    }
    return s.stream.Send(req)
}

func (s *UserSyncStream) UpdateUser(req *pb.UpdateUserRequest) error {
    syncReq := &pb.UserSyncRequest{
        Action: &pb.UserSyncRequest_UpdateUser{
            UpdateUser: req,
        },
    }
    return s.stream.Send(syncReq)
}

func (s *UserSyncStream) DeleteUser(userID int64) error {
    req := &pb.UserSyncRequest{
        Action: &pb.UserSyncRequest_DeleteUser{
            DeleteUser: &pb.DeleteUserRequest{
                UserId: userID,
            },
        },
    }
    return s.stream.Send(req)
}

func (s *UserSyncStream) Recv() (*pb.UserSyncResponse, error) {
    return s.stream.Recv()
}

func (s *UserSyncStream) Close() error {
    return s.stream.CloseSend()
}

高级客户端特性 #

1. 连接池管理 #

// internal/client/pool.go
package client

import (
    "context"
    "fmt"
    "sync"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/connectivity"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/keepalive"
)

type ConnectionPool struct {
    mu          sync.RWMutex
    connections map[string]*grpc.ClientConn
    config      PoolConfig
}

type PoolConfig struct {
    MaxConnections  int
    IdleTimeout     time.Duration
    MaxLifetime     time.Duration
    KeepAliveTime   time.Duration
    KeepAliveTimeout time.Duration
}

func NewConnectionPool(config PoolConfig) *ConnectionPool {
    pool := &ConnectionPool{
        connections: make(map[string]*grpc.ClientConn),
        config:      config,
    }

    // 启动连接清理协程
    go pool.cleanup()

    return pool
}

func (p *ConnectionPool) GetConnection(address string) (*grpc.ClientConn, error) {
    p.mu.RLock()
    conn, exists := p.connections[address]
    p.mu.RUnlock()

    if exists && conn.GetState() == connectivity.Ready {
        return conn, nil
    }

    return p.createConnection(address)
}

func (p *ConnectionPool) createConnection(address string) (*grpc.ClientConn, error) {
    p.mu.Lock()
    defer p.mu.Unlock()

    // 双重检查
    if conn, exists := p.connections[address]; exists && conn.GetState() == connectivity.Ready {
        return conn, nil
    }

    // 检查连接数限制
    if len(p.connections) >= p.config.MaxConnections {
        return nil, fmt.Errorf("connection pool is full")
    }

    // 创建新连接
    opts := []grpc.DialOption{
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithKeepaliveParams(keepalive.ClientParameters{
            Time:                p.config.KeepAliveTime,
            Timeout:             p.config.KeepAliveTimeout,
            PermitWithoutStream: true,
        }),
    }

    conn, err := grpc.Dial(address, opts...)
    if err != nil {
        return nil, fmt.Errorf("failed to create connection: %w", err)
    }

    p.connections[address] = conn
    return conn, nil
}

func (p *ConnectionPool) cleanup() {
    ticker := time.NewTicker(time.Minute)
    defer ticker.Stop()

    for range ticker.C {
        p.mu.Lock()
        for address, conn := range p.connections {
            state := conn.GetState()
            if state == connectivity.Shutdown || state == connectivity.TransientFailure {
                delete(p.connections, address)
                conn.Close()
            }
        }
        p.mu.Unlock()
    }
}

func (p *ConnectionPool) Close() {
    p.mu.Lock()
    defer p.mu.Unlock()

    for _, conn := range p.connections {
        conn.Close()
    }
    p.connections = make(map[string]*grpc.ClientConn)
}

2. 重试机制 #

// internal/client/retry.go
package client

import (
    "context"
    "math"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

type RetryConfig struct {
    MaxAttempts     int
    InitialBackoff  time.Duration
    MaxBackoff      time.Duration
    BackoffMultiplier float64
    RetryableCodes  []codes.Code
}

func DefaultRetryConfig() RetryConfig {
    return RetryConfig{
        MaxAttempts:       3,
        InitialBackoff:    100 * time.Millisecond,
        MaxBackoff:        30 * time.Second,
        BackoffMultiplier: 2.0,
        RetryableCodes: []codes.Code{
            codes.Unavailable,
            codes.DeadlineExceeded,
            codes.ResourceExhausted,
            codes.Aborted,
            codes.Internal,
        },
    }
}

func RetryInterceptor(config RetryConfig) grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        var lastErr error

        for attempt := 0; attempt < config.MaxAttempts; attempt++ {
            if attempt > 0 {
                // 计算退避时间
                backoff := time.Duration(float64(config.InitialBackoff) * math.Pow(config.BackoffMultiplier, float64(attempt-1)))
                if backoff > config.MaxBackoff {
                    backoff = config.MaxBackoff
                }

                // 等待退避时间
                select {
                case <-time.After(backoff):
                case <-ctx.Done():
                    return ctx.Err()
                }
            }

            // 执行调用
            err := invoker(ctx, method, req, reply, cc, opts...)
            if err == nil {
                return nil
            }

            lastErr = err

            // 检查是否可重试
            if !isRetryable(err, config.RetryableCodes) {
                break
            }

            // 检查上下文是否已取消
            if ctx.Err() != nil {
                break
            }
        }

        return lastErr
    }
}

func isRetryable(err error, retryableCodes []codes.Code) bool {
    st, ok := status.FromError(err)
    if !ok {
        return false
    }

    for _, code := range retryableCodes {
        if st.Code() == code {
            return true
        }
    }

    return false
}

// 流式 RPC 重试拦截器
func StreamRetryInterceptor(config RetryConfig) grpc.StreamClientInterceptor {
    return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
        var lastErr error

        for attempt := 0; attempt < config.MaxAttempts; attempt++ {
            if attempt > 0 {
                backoff := time.Duration(float64(config.InitialBackoff) * math.Pow(config.BackoffMultiplier, float64(attempt-1)))
                if backoff > config.MaxBackoff {
                    backoff = config.MaxBackoff
                }

                select {
                case <-time.After(backoff):
                case <-ctx.Done():
                    return nil, ctx.Err()
                }
            }

            stream, err := streamer(ctx, desc, cc, method, opts...)
            if err == nil {
                return stream, nil
            }

            lastErr = err

            if !isRetryable(err, config.RetryableCodes) {
                break
            }

            if ctx.Err() != nil {
                break
            }
        }

        return nil, lastErr
    }
}

3. 负载均衡 #

// internal/client/loadbalancer.go
package client

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/connectivity"
    "google.golang.org/grpc/credentials/insecure"
)

type LoadBalancer struct {
    mu          sync.RWMutex
    connections []*grpc.ClientConn
    addresses   []string
    current     int
    strategy    BalanceStrategy
}

type BalanceStrategy int

const (
    RoundRobin BalanceStrategy = iota
    Random
    LeastConnections
)

func NewLoadBalancer(addresses []string, strategy BalanceStrategy) (*LoadBalancer, error) {
    lb := &LoadBalancer{
        addresses: addresses,
        strategy:  strategy,
    }

    // 创建到所有地址的连接
    for _, addr := range addresses {
        conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
        if err != nil {
            // 清理已创建的连接
            lb.Close()
            return nil, fmt.Errorf("failed to connect to %s: %w", addr, err)
        }
        lb.connections = append(lb.connections, conn)
    }

    // 启动健康检查
    go lb.healthCheck()

    return lb, nil
}

func (lb *LoadBalancer) GetConnection() (*grpc.ClientConn, error) {
    lb.mu.RLock()
    defer lb.mu.RUnlock()

    if len(lb.connections) == 0 {
        return nil, fmt.Errorf("no available connections")
    }

    var conn *grpc.ClientConn

    switch lb.strategy {
    case RoundRobin:
        conn = lb.roundRobin()
    case Random:
        conn = lb.random()
    case LeastConnections:
        conn = lb.leastConnections()
    default:
        conn = lb.roundRobin()
    }

    if conn == nil || conn.GetState() != connectivity.Ready {
        return nil, fmt.Errorf("no healthy connections available")
    }

    return conn, nil
}

func (lb *LoadBalancer) roundRobin() *grpc.ClientConn {
    for i := 0; i < len(lb.connections); i++ {
        idx := (lb.current + i) % len(lb.connections)
        conn := lb.connections[idx]
        if conn.GetState() == connectivity.Ready {
            lb.current = (idx + 1) % len(lb.connections)
            return conn
        }
    }
    return nil
}

func (lb *LoadBalancer) random() *grpc.ClientConn {
    healthyConns := make([]*grpc.ClientConn, 0)
    for _, conn := range lb.connections {
        if conn.GetState() == connectivity.Ready {
            healthyConns = append(healthyConns, conn)
        }
    }

    if len(healthyConns) == 0 {
        return nil
    }

    return healthyConns[rand.Intn(len(healthyConns))]
}

func (lb *LoadBalancer) leastConnections() *grpc.ClientConn {
    // 简化实现,实际应该跟踪每个连接的活跃请求数
    return lb.roundRobin()
}

func (lb *LoadBalancer) healthCheck() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        lb.mu.Lock()
        for i, conn := range lb.connections {
            state := conn.GetState()
            if state == connectivity.Shutdown || state == connectivity.TransientFailure {
                // 尝试重新连接
                newConn, err := grpc.Dial(lb.addresses[i], grpc.WithTransportCredentials(insecure.NewCredentials()))
                if err == nil {
                    conn.Close()
                    lb.connections[i] = newConn
                }
            }
        }
        lb.mu.Unlock()
    }
}

func (lb *LoadBalancer) Close() {
    lb.mu.Lock()
    defer lb.mu.Unlock()

    for _, conn := range lb.connections {
        conn.Close()
    }
    lb.connections = nil
}

4. 熔断器 #

// internal/client/circuitbreaker.go
package client

import (
    "context"
    "errors"
    "sync"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

type CircuitBreakerState int

const (
    StateClosed CircuitBreakerState = iota
    StateOpen
    StateHalfOpen
)

type CircuitBreaker struct {
    mu                sync.RWMutex
    state             CircuitBreakerState
    failureCount      int
    successCount      int
    lastFailureTime   time.Time
    config            CircuitBreakerConfig
}

type CircuitBreakerConfig struct {
    MaxFailures     int
    ResetTimeout    time.Duration
    HalfOpenMaxCalls int
}

func NewCircuitBreaker(config CircuitBreakerConfig) *CircuitBreaker {
    return &CircuitBreaker{
        state:  StateClosed,
        config: config,
    }
}

func (cb *CircuitBreaker) Call(ctx context.Context, fn func() error) error {
    if !cb.allowRequest() {
        return errors.New("circuit breaker is open")
    }

    err := fn()
    cb.recordResult(err)
    return err
}

func (cb *CircuitBreaker) allowRequest() bool {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    switch cb.state {
    case StateClosed:
        return true
    case StateOpen:
        if time.Since(cb.lastFailureTime) > cb.config.ResetTimeout {
            cb.state = StateHalfOpen
            cb.successCount = 0
            return true
        }
        return false
    case StateHalfOpen:
        return cb.successCount < cb.config.HalfOpenMaxCalls
    default:
        return false
    }
}

func (cb *CircuitBreaker) recordResult(err error) {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    if err == nil {
        cb.onSuccess()
    } else {
        cb.onFailure()
    }
}

func (cb *CircuitBreaker) onSuccess() {
    switch cb.state {
    case StateClosed:
        cb.failureCount = 0
    case StateHalfOpen:
        cb.successCount++
        if cb.successCount >= cb.config.HalfOpenMaxCalls {
            cb.state = StateClosed
            cb.failureCount = 0
        }
    }
}

func (cb *CircuitBreaker) onFailure() {
    cb.failureCount++
    cb.lastFailureTime = time.Now()

    switch cb.state {
    case StateClosed:
        if cb.failureCount >= cb.config.MaxFailures {
            cb.state = StateOpen
        }
    case StateHalfOpen:
        cb.state = StateOpen
    }
}

// gRPC 拦截器
func CircuitBreakerInterceptor(cb *CircuitBreaker) grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        return cb.Call(ctx, func() error {
            return invoker(ctx, method, req, reply, cc, opts...)
        })
    }
}

func isFailure(err error) bool {
    if err == nil {
        return false
    }

    st, ok := status.FromError(err)
    if !ok {
        return true
    }

    // 只有某些错误码才算失败
    switch st.Code() {
    case codes.Unavailable, codes.DeadlineExceeded, codes.Internal:
        return true
    default:
        return false
    }
}

客户端使用示例 #

1. 完整客户端应用 #

// cmd/client/main.go
package main

import (
    "context"
    "log"
    "time"

    "github.com/example/grpc-client/internal/client"
)

func main() {
    // 客户端配置
    config := client.ClientConfig{
        Address:   "localhost:50051",
        Timeout:   10 * time.Second,
        KeepAlive: 30 * time.Second,
        EnableTLS: false,
    }

    // 创建客户端
    userClient, err := client.NewUserClient(config)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }
    defer userClient.Close()

    ctx := context.Background()

    // 创建用户
    user, err := userClient.CreateUser(ctx, "Alice", "[email protected]", 25)
    if err != nil {
        log.Fatalf("Failed to create user: %v", err)
    }
    log.Printf("Created user: %+v", user)

    // 获取用户
    retrievedUser, err := userClient.GetUser(ctx, user.Id)
    if err != nil {
        log.Fatalf("Failed to get user: %v", err)
    }
    log.Printf("Retrieved user: %+v", retrievedUser)

    // 更新用户
    updatedUser, err := userClient.UpdateUser(ctx, user.Id, "Alice Smith", "[email protected]", 26)
    if err != nil {
        log.Fatalf("Failed to update user: %v", err)
    }
    log.Printf("Updated user: %+v", updatedUser)

    // 列出用户
    users, err := userClient.ListUsers(ctx, 10, "", "")
    if err != nil {
        log.Fatalf("Failed to list users: %v", err)
    }
    log.Printf("Listed %d users", len(users))

    // 批量创建用户
    batchUsers := []client.CreateUserData{
        {Name: "Bob", Email: "[email protected]", Age: 30},
        {Name: "Charlie", Email: "[email protected]", Age: 35},
    }

    batchResp, err := userClient.BatchCreateUsers(ctx, batchUsers)
    if err != nil {
        log.Fatalf("Failed to batch create users: %v", err)
    }
    log.Printf("Batch created %d users, errors: %v", batchResp.CreatedCount, batchResp.Errors)

    // 双向流示例
    syncStream, err := userClient.SyncUsers(ctx)
    if err != nil {
        log.Fatalf("Failed to create sync stream: %v", err)
    }

    // 在单独的 goroutine 中接收响应
    go func() {
        for {
            resp, err := syncStream.Recv()
            if err != nil {
                log.Printf("Sync stream receive error: %v", err)
                return
            }
            log.Printf("Sync response: success=%t, message=%s", resp.Success, resp.Message)
        }
    }()

    // 发送同步请求
    err = syncStream.CreateUser(&pb.User{
        Name:  "Dave",
        Email: "[email protected]",
        Age:   40,
    })
    if err != nil {
        log.Printf("Failed to send create user: %v", err)
    }

    time.Sleep(2 * time.Second)
    syncStream.Close()

    // 删除用户
    err = userClient.DeleteUser(ctx, user.Id)
    if err != nil {
        log.Fatalf("Failed to delete user: %v", err)
    }
    log.Println("User deleted successfully")
}

2. 带中间件的客户端 #

// cmd/advanced_client/main.go
package main

import (
    "context"
    "log"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"

    "github.com/example/grpc-client/internal/client"
    pb "github.com/example/grpc-client/proto/user/v1"
)

func main() {
    // 配置重试
    retryConfig := client.DefaultRetryConfig()

    // 配置熔断器
    cbConfig := client.CircuitBreakerConfig{
        MaxFailures:      5,
        ResetTimeout:     30 * time.Second,
        HalfOpenMaxCalls: 3,
    }
    circuitBreaker := client.NewCircuitBreaker(cbConfig)

    // 创建连接
    conn, err := grpc.Dial(
        "localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithChainUnaryInterceptor(
            client.RetryInterceptor(retryConfig),
            client.CircuitBreakerInterceptor(circuitBreaker),
            loggingInterceptor,
        ),
    )
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()

    client := pb.NewUserServiceClient(conn)

    // 使用客户端
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    resp, err := client.GetUser(ctx, &pb.GetUserRequest{UserId: 1})
    if err != nil {
        log.Printf("GetUser failed: %v", err)
    } else {
        log.Printf("GetUser success: %+v", resp.User)
    }
}

func loggingInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    start := time.Now()
    err := invoker(ctx, method, req, reply, cc, opts...)
    log.Printf("gRPC call %s took %v, error: %v", method, time.Since(start), err)
    return err
}

最佳实践 #

1. 连接管理 #

// 使用连接池
pool := client.NewConnectionPool(client.PoolConfig{
    MaxConnections:   10,
    IdleTimeout:      5 * time.Minute,
    MaxLifetime:      30 * time.Minute,
    KeepAliveTime:    30 * time.Second,
    KeepAliveTimeout: 5 * time.Second,
})

conn, err := pool.GetConnection("localhost:50051")

2. 超时控制 #

// 为每个调用设置超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

resp, err := client.GetUser(ctx, req)

3. 错误处理 #

func handleGRPCError(err error) {
    if st, ok := status.FromError(err); ok {
        switch st.Code() {
        case codes.NotFound:
            log.Println("Resource not found")
        case codes.InvalidArgument:
            log.Println("Invalid argument")
        case codes.Unauthenticated:
            log.Println("Authentication required")
        default:
            log.Printf("gRPC error: %s", st.Message())
        }
    } else {
        log.Printf("Non-gRPC error: %v", err)
    }
}

4. 资源清理 #

// 确保连接被正确关闭
defer func() {
    if err := conn.Close(); err != nil {
        log.Printf("Failed to close connection: %v", err)
    }
}()

小结 #

本节详细介绍了 gRPC 客户端开发的各个方面:

  1. 基础实现:简单客户端、客户端封装
  2. 高级特性:连接池、重试机制、负载均衡、熔断器
  3. 流式处理:服务端流、客户端流、双向流
  4. 错误处理:状态码处理、错误分类
  5. 最佳实践:连接管理、超时控制、资源清理

通过这些内容,您可以构建出高效、可靠的 gRPC 客户端应用。在下一节中,我们将学习如何实现 gRPC 拦截器和中间件。