3.7.4 gRPC 客户端开发 #
gRPC 客户端是与 gRPC 服务端通信的重要组件,负责发起 RPC 调用、处理响应和管理连接。本节将详细介绍如何开发高效、可靠的 gRPC 客户端应用。
客户端架构 #
基本组件 #
┌─────────────────────────────────────────┐
│ gRPC Client │
├─────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ Interceptors│ │ Client Stub │ │
│ │ │ │ │ │
│ │ - Retry │ │ - Method Calls │ │
│ │ - Auth │ │ - Stream Mgmt │ │
│ │ - Metrics │ │ - Error Handle │ │
│ └─────────────┘ └─────────────────┘ │
├─────────────────────────────────────────┤
│ Connection Pool │
│ (HTTP/2) │
└─────────────────────────────────────────┘
客户端生命周期 #
- 连接建立:创建到服务端的连接
- 调用执行:发起 RPC 调用
- 响应处理:处理服务端响应
- 连接管理:维护连接状态
- 资源清理:关闭连接和清理资源
基础客户端实现 #
1. 简单客户端 #
// cmd/client/main.go
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "github.com/example/grpc-client/proto/user/v1"
)
func main() {
// 建立连接
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
// 创建客户端
client := pb.NewUserServiceClient(conn)
// 一元 RPC 调用
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 创建用户
createResp, err := client.CreateUser(ctx, &pb.CreateUserRequest{
Name: "Alice",
Email: "[email protected]",
Age: 25,
})
if err != nil {
log.Fatalf("CreateUser failed: %v", err)
}
log.Printf("Created user: %+v", createResp.User)
// 获取用户
getResp, err := client.GetUser(ctx, &pb.GetUserRequest{
UserId: createResp.User.Id,
})
if err != nil {
log.Fatalf("GetUser failed: %v", err)
}
log.Printf("Retrieved user: %+v", getResp.User)
}
2. 客户端封装 #
// internal/client/user_client.go
package client
import (
"context"
"fmt"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
pb "github.com/example/grpc-client/proto/user/v1"
)
type UserClient struct {
conn *grpc.ClientConn
client pb.UserServiceClient
}
type ClientConfig struct {
Address string
Timeout time.Duration
KeepAlive time.Duration
MaxRetries int
EnableTLS bool
}
func NewUserClient(config ClientConfig) (*UserClient, error) {
var opts []grpc.DialOption
// 传输凭证
if config.EnableTLS {
// 这里可以配置 TLS 凭证
// opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
// 连接保活
opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: config.KeepAlive,
Timeout: time.Second,
PermitWithoutStream: true,
}))
// 建立连接
conn, err := grpc.Dial(config.Address, opts...)
if err != nil {
return nil, fmt.Errorf("failed to connect to %s: %w", config.Address, err)
}
return &UserClient{
conn: conn,
client: pb.NewUserServiceClient(conn),
}, nil
}
func (c *UserClient) Close() error {
return c.conn.Close()
}
// 一元 RPC 方法
func (c *UserClient) CreateUser(ctx context.Context, name, email string, age int32) (*pb.User, error) {
req := &pb.CreateUserRequest{
Name: name,
Email: email,
Age: age,
}
resp, err := c.client.CreateUser(ctx, req)
if err != nil {
return nil, c.handleError("CreateUser", err)
}
return resp.User, nil
}
func (c *UserClient) GetUser(ctx context.Context, userID int64) (*pb.User, error) {
req := &pb.GetUserRequest{
UserId: userID,
}
resp, err := c.client.GetUser(ctx, req)
if err != nil {
return nil, c.handleError("GetUser", err)
}
return resp.User, nil
}
func (c *UserClient) UpdateUser(ctx context.Context, userID int64, name, email string, age int32) (*pb.User, error) {
req := &pb.UpdateUserRequest{
UserId: userID,
Name: name,
Email: email,
Age: age,
}
resp, err := c.client.UpdateUser(ctx, req)
if err != nil {
return nil, c.handleError("UpdateUser", err)
}
return resp.User, nil
}
func (c *UserClient) DeleteUser(ctx context.Context, userID int64) error {
req := &pb.DeleteUserRequest{
UserId: userID,
}
_, err := c.client.DeleteUser(ctx, req)
if err != nil {
return c.handleError("DeleteUser", err)
}
return nil
}
// 服务端流式 RPC
func (c *UserClient) ListUsers(ctx context.Context, pageSize int32, pageToken, filter string) ([]*pb.User, error) {
req := &pb.ListUsersRequest{
PageSize: pageSize,
PageToken: pageToken,
Filter: filter,
}
stream, err := c.client.ListUsers(ctx, req)
if err != nil {
return nil, c.handleError("ListUsers", err)
}
var users []*pb.User
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, c.handleError("ListUsers.Recv", err)
}
users = append(users, user)
}
return users, nil
}
// 客户端流式 RPC
func (c *UserClient) BatchCreateUsers(ctx context.Context, users []CreateUserData) (*pb.BatchCreateUsersResponse, error) {
stream, err := c.client.BatchCreateUsers(ctx)
if err != nil {
return nil, c.handleError("BatchCreateUsers", err)
}
// 发送用户数据
for _, userData := range users {
req := &pb.CreateUserRequest{
Name: userData.Name,
Email: userData.Email,
Age: userData.Age,
}
if err := stream.Send(req); err != nil {
return nil, c.handleError("BatchCreateUsers.Send", err)
}
}
// 关闭发送并接收响应
resp, err := stream.CloseAndRecv()
if err != nil {
return nil, c.handleError("BatchCreateUsers.CloseAndRecv", err)
}
return resp, nil
}
// 双向流式 RPC
func (c *UserClient) SyncUsers(ctx context.Context) (*UserSyncStream, error) {
stream, err := c.client.SyncUsers(ctx)
if err != nil {
return nil, c.handleError("SyncUsers", err)
}
return &UserSyncStream{
stream: stream,
client: c,
}, nil
}
// 错误处理
func (c *UserClient) handleError(method string, err error) error {
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.NotFound:
return fmt.Errorf("%s: resource not found: %s", method, st.Message())
case codes.InvalidArgument:
return fmt.Errorf("%s: invalid argument: %s", method, st.Message())
case codes.Unauthenticated:
return fmt.Errorf("%s: authentication required: %s", method, st.Message())
case codes.PermissionDenied:
return fmt.Errorf("%s: permission denied: %s", method, st.Message())
case codes.Unavailable:
return fmt.Errorf("%s: service unavailable: %s", method, st.Message())
default:
return fmt.Errorf("%s: %s (%s)", method, st.Message(), st.Code())
}
}
return fmt.Errorf("%s: %w", method, err)
}
// 辅助类型
type CreateUserData struct {
Name string
Email string
Age int32
}
// 双向流封装
type UserSyncStream struct {
stream pb.UserService_SyncUsersClient
client *UserClient
}
func (s *UserSyncStream) CreateUser(user *pb.User) error {
req := &pb.UserSyncRequest{
Action: &pb.UserSyncRequest_CreateUser{
CreateUser: user,
},
}
return s.stream.Send(req)
}
func (s *UserSyncStream) UpdateUser(req *pb.UpdateUserRequest) error {
syncReq := &pb.UserSyncRequest{
Action: &pb.UserSyncRequest_UpdateUser{
UpdateUser: req,
},
}
return s.stream.Send(syncReq)
}
func (s *UserSyncStream) DeleteUser(userID int64) error {
req := &pb.UserSyncRequest{
Action: &pb.UserSyncRequest_DeleteUser{
DeleteUser: &pb.DeleteUserRequest{
UserId: userID,
},
},
}
return s.stream.Send(req)
}
func (s *UserSyncStream) Recv() (*pb.UserSyncResponse, error) {
return s.stream.Recv()
}
func (s *UserSyncStream) Close() error {
return s.stream.CloseSend()
}
高级客户端特性 #
1. 连接池管理 #
// internal/client/pool.go
package client
import (
"context"
"fmt"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)
type ConnectionPool struct {
mu sync.RWMutex
connections map[string]*grpc.ClientConn
config PoolConfig
}
type PoolConfig struct {
MaxConnections int
IdleTimeout time.Duration
MaxLifetime time.Duration
KeepAliveTime time.Duration
KeepAliveTimeout time.Duration
}
func NewConnectionPool(config PoolConfig) *ConnectionPool {
pool := &ConnectionPool{
connections: make(map[string]*grpc.ClientConn),
config: config,
}
// 启动连接清理协程
go pool.cleanup()
return pool
}
func (p *ConnectionPool) GetConnection(address string) (*grpc.ClientConn, error) {
p.mu.RLock()
conn, exists := p.connections[address]
p.mu.RUnlock()
if exists && conn.GetState() == connectivity.Ready {
return conn, nil
}
return p.createConnection(address)
}
func (p *ConnectionPool) createConnection(address string) (*grpc.ClientConn, error) {
p.mu.Lock()
defer p.mu.Unlock()
// 双重检查
if conn, exists := p.connections[address]; exists && conn.GetState() == connectivity.Ready {
return conn, nil
}
// 检查连接数限制
if len(p.connections) >= p.config.MaxConnections {
return nil, fmt.Errorf("connection pool is full")
}
// 创建新连接
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: p.config.KeepAliveTime,
Timeout: p.config.KeepAliveTimeout,
PermitWithoutStream: true,
}),
}
conn, err := grpc.Dial(address, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create connection: %w", err)
}
p.connections[address] = conn
return conn, nil
}
func (p *ConnectionPool) cleanup() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
p.mu.Lock()
for address, conn := range p.connections {
state := conn.GetState()
if state == connectivity.Shutdown || state == connectivity.TransientFailure {
delete(p.connections, address)
conn.Close()
}
}
p.mu.Unlock()
}
}
func (p *ConnectionPool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
for _, conn := range p.connections {
conn.Close()
}
p.connections = make(map[string]*grpc.ClientConn)
}
2. 重试机制 #
// internal/client/retry.go
package client
import (
"context"
"math"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type RetryConfig struct {
MaxAttempts int
InitialBackoff time.Duration
MaxBackoff time.Duration
BackoffMultiplier float64
RetryableCodes []codes.Code
}
func DefaultRetryConfig() RetryConfig {
return RetryConfig{
MaxAttempts: 3,
InitialBackoff: 100 * time.Millisecond,
MaxBackoff: 30 * time.Second,
BackoffMultiplier: 2.0,
RetryableCodes: []codes.Code{
codes.Unavailable,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.Internal,
},
}
}
func RetryInterceptor(config RetryConfig) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var lastErr error
for attempt := 0; attempt < config.MaxAttempts; attempt++ {
if attempt > 0 {
// 计算退避时间
backoff := time.Duration(float64(config.InitialBackoff) * math.Pow(config.BackoffMultiplier, float64(attempt-1)))
if backoff > config.MaxBackoff {
backoff = config.MaxBackoff
}
// 等待退避时间
select {
case <-time.After(backoff):
case <-ctx.Done():
return ctx.Err()
}
}
// 执行调用
err := invoker(ctx, method, req, reply, cc, opts...)
if err == nil {
return nil
}
lastErr = err
// 检查是否可重试
if !isRetryable(err, config.RetryableCodes) {
break
}
// 检查上下文是否已取消
if ctx.Err() != nil {
break
}
}
return lastErr
}
}
func isRetryable(err error, retryableCodes []codes.Code) bool {
st, ok := status.FromError(err)
if !ok {
return false
}
for _, code := range retryableCodes {
if st.Code() == code {
return true
}
}
return false
}
// 流式 RPC 重试拦截器
func StreamRetryInterceptor(config RetryConfig) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
var lastErr error
for attempt := 0; attempt < config.MaxAttempts; attempt++ {
if attempt > 0 {
backoff := time.Duration(float64(config.InitialBackoff) * math.Pow(config.BackoffMultiplier, float64(attempt-1)))
if backoff > config.MaxBackoff {
backoff = config.MaxBackoff
}
select {
case <-time.After(backoff):
case <-ctx.Done():
return nil, ctx.Err()
}
}
stream, err := streamer(ctx, desc, cc, method, opts...)
if err == nil {
return stream, nil
}
lastErr = err
if !isRetryable(err, config.RetryableCodes) {
break
}
if ctx.Err() != nil {
break
}
}
return nil, lastErr
}
}
3. 负载均衡 #
// internal/client/loadbalancer.go
package client
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
)
type LoadBalancer struct {
mu sync.RWMutex
connections []*grpc.ClientConn
addresses []string
current int
strategy BalanceStrategy
}
type BalanceStrategy int
const (
RoundRobin BalanceStrategy = iota
Random
LeastConnections
)
func NewLoadBalancer(addresses []string, strategy BalanceStrategy) (*LoadBalancer, error) {
lb := &LoadBalancer{
addresses: addresses,
strategy: strategy,
}
// 创建到所有地址的连接
for _, addr := range addresses {
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
// 清理已创建的连接
lb.Close()
return nil, fmt.Errorf("failed to connect to %s: %w", addr, err)
}
lb.connections = append(lb.connections, conn)
}
// 启动健康检查
go lb.healthCheck()
return lb, nil
}
func (lb *LoadBalancer) GetConnection() (*grpc.ClientConn, error) {
lb.mu.RLock()
defer lb.mu.RUnlock()
if len(lb.connections) == 0 {
return nil, fmt.Errorf("no available connections")
}
var conn *grpc.ClientConn
switch lb.strategy {
case RoundRobin:
conn = lb.roundRobin()
case Random:
conn = lb.random()
case LeastConnections:
conn = lb.leastConnections()
default:
conn = lb.roundRobin()
}
if conn == nil || conn.GetState() != connectivity.Ready {
return nil, fmt.Errorf("no healthy connections available")
}
return conn, nil
}
func (lb *LoadBalancer) roundRobin() *grpc.ClientConn {
for i := 0; i < len(lb.connections); i++ {
idx := (lb.current + i) % len(lb.connections)
conn := lb.connections[idx]
if conn.GetState() == connectivity.Ready {
lb.current = (idx + 1) % len(lb.connections)
return conn
}
}
return nil
}
func (lb *LoadBalancer) random() *grpc.ClientConn {
healthyConns := make([]*grpc.ClientConn, 0)
for _, conn := range lb.connections {
if conn.GetState() == connectivity.Ready {
healthyConns = append(healthyConns, conn)
}
}
if len(healthyConns) == 0 {
return nil
}
return healthyConns[rand.Intn(len(healthyConns))]
}
func (lb *LoadBalancer) leastConnections() *grpc.ClientConn {
// 简化实现,实际应该跟踪每个连接的活跃请求数
return lb.roundRobin()
}
func (lb *LoadBalancer) healthCheck() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
lb.mu.Lock()
for i, conn := range lb.connections {
state := conn.GetState()
if state == connectivity.Shutdown || state == connectivity.TransientFailure {
// 尝试重新连接
newConn, err := grpc.Dial(lb.addresses[i], grpc.WithTransportCredentials(insecure.NewCredentials()))
if err == nil {
conn.Close()
lb.connections[i] = newConn
}
}
}
lb.mu.Unlock()
}
}
func (lb *LoadBalancer) Close() {
lb.mu.Lock()
defer lb.mu.Unlock()
for _, conn := range lb.connections {
conn.Close()
}
lb.connections = nil
}
4. 熔断器 #
// internal/client/circuitbreaker.go
package client
import (
"context"
"errors"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type CircuitBreakerState int
const (
StateClosed CircuitBreakerState = iota
StateOpen
StateHalfOpen
)
type CircuitBreaker struct {
mu sync.RWMutex
state CircuitBreakerState
failureCount int
successCount int
lastFailureTime time.Time
config CircuitBreakerConfig
}
type CircuitBreakerConfig struct {
MaxFailures int
ResetTimeout time.Duration
HalfOpenMaxCalls int
}
func NewCircuitBreaker(config CircuitBreakerConfig) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
config: config,
}
}
func (cb *CircuitBreaker) Call(ctx context.Context, fn func() error) error {
if !cb.allowRequest() {
return errors.New("circuit breaker is open")
}
err := fn()
cb.recordResult(err)
return err
}
func (cb *CircuitBreaker) allowRequest() bool {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case StateClosed:
return true
case StateOpen:
if time.Since(cb.lastFailureTime) > cb.config.ResetTimeout {
cb.state = StateHalfOpen
cb.successCount = 0
return true
}
return false
case StateHalfOpen:
return cb.successCount < cb.config.HalfOpenMaxCalls
default:
return false
}
}
func (cb *CircuitBreaker) recordResult(err error) {
cb.mu.Lock()
defer cb.mu.Unlock()
if err == nil {
cb.onSuccess()
} else {
cb.onFailure()
}
}
func (cb *CircuitBreaker) onSuccess() {
switch cb.state {
case StateClosed:
cb.failureCount = 0
case StateHalfOpen:
cb.successCount++
if cb.successCount >= cb.config.HalfOpenMaxCalls {
cb.state = StateClosed
cb.failureCount = 0
}
}
}
func (cb *CircuitBreaker) onFailure() {
cb.failureCount++
cb.lastFailureTime = time.Now()
switch cb.state {
case StateClosed:
if cb.failureCount >= cb.config.MaxFailures {
cb.state = StateOpen
}
case StateHalfOpen:
cb.state = StateOpen
}
}
// gRPC 拦截器
func CircuitBreakerInterceptor(cb *CircuitBreaker) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return cb.Call(ctx, func() error {
return invoker(ctx, method, req, reply, cc, opts...)
})
}
}
func isFailure(err error) bool {
if err == nil {
return false
}
st, ok := status.FromError(err)
if !ok {
return true
}
// 只有某些错误码才算失败
switch st.Code() {
case codes.Unavailable, codes.DeadlineExceeded, codes.Internal:
return true
default:
return false
}
}
客户端使用示例 #
1. 完整客户端应用 #
// cmd/client/main.go
package main
import (
"context"
"log"
"time"
"github.com/example/grpc-client/internal/client"
)
func main() {
// 客户端配置
config := client.ClientConfig{
Address: "localhost:50051",
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
EnableTLS: false,
}
// 创建客户端
userClient, err := client.NewUserClient(config)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer userClient.Close()
ctx := context.Background()
// 创建用户
user, err := userClient.CreateUser(ctx, "Alice", "[email protected]", 25)
if err != nil {
log.Fatalf("Failed to create user: %v", err)
}
log.Printf("Created user: %+v", user)
// 获取用户
retrievedUser, err := userClient.GetUser(ctx, user.Id)
if err != nil {
log.Fatalf("Failed to get user: %v", err)
}
log.Printf("Retrieved user: %+v", retrievedUser)
// 更新用户
updatedUser, err := userClient.UpdateUser(ctx, user.Id, "Alice Smith", "[email protected]", 26)
if err != nil {
log.Fatalf("Failed to update user: %v", err)
}
log.Printf("Updated user: %+v", updatedUser)
// 列出用户
users, err := userClient.ListUsers(ctx, 10, "", "")
if err != nil {
log.Fatalf("Failed to list users: %v", err)
}
log.Printf("Listed %d users", len(users))
// 批量创建用户
batchUsers := []client.CreateUserData{
{Name: "Bob", Email: "[email protected]", Age: 30},
{Name: "Charlie", Email: "[email protected]", Age: 35},
}
batchResp, err := userClient.BatchCreateUsers(ctx, batchUsers)
if err != nil {
log.Fatalf("Failed to batch create users: %v", err)
}
log.Printf("Batch created %d users, errors: %v", batchResp.CreatedCount, batchResp.Errors)
// 双向流示例
syncStream, err := userClient.SyncUsers(ctx)
if err != nil {
log.Fatalf("Failed to create sync stream: %v", err)
}
// 在单独的 goroutine 中接收响应
go func() {
for {
resp, err := syncStream.Recv()
if err != nil {
log.Printf("Sync stream receive error: %v", err)
return
}
log.Printf("Sync response: success=%t, message=%s", resp.Success, resp.Message)
}
}()
// 发送同步请求
err = syncStream.CreateUser(&pb.User{
Name: "Dave",
Email: "[email protected]",
Age: 40,
})
if err != nil {
log.Printf("Failed to send create user: %v", err)
}
time.Sleep(2 * time.Second)
syncStream.Close()
// 删除用户
err = userClient.DeleteUser(ctx, user.Id)
if err != nil {
log.Fatalf("Failed to delete user: %v", err)
}
log.Println("User deleted successfully")
}
2. 带中间件的客户端 #
// cmd/advanced_client/main.go
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/example/grpc-client/internal/client"
pb "github.com/example/grpc-client/proto/user/v1"
)
func main() {
// 配置重试
retryConfig := client.DefaultRetryConfig()
// 配置熔断器
cbConfig := client.CircuitBreakerConfig{
MaxFailures: 5,
ResetTimeout: 30 * time.Second,
HalfOpenMaxCalls: 3,
}
circuitBreaker := client.NewCircuitBreaker(cbConfig)
// 创建连接
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(
client.RetryInterceptor(retryConfig),
client.CircuitBreakerInterceptor(circuitBreaker),
loggingInterceptor,
),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewUserServiceClient(conn)
// 使用客户端
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := client.GetUser(ctx, &pb.GetUserRequest{UserId: 1})
if err != nil {
log.Printf("GetUser failed: %v", err)
} else {
log.Printf("GetUser success: %+v", resp.User)
}
}
func loggingInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("gRPC call %s took %v, error: %v", method, time.Since(start), err)
return err
}
最佳实践 #
1. 连接管理 #
// 使用连接池
pool := client.NewConnectionPool(client.PoolConfig{
MaxConnections: 10,
IdleTimeout: 5 * time.Minute,
MaxLifetime: 30 * time.Minute,
KeepAliveTime: 30 * time.Second,
KeepAliveTimeout: 5 * time.Second,
})
conn, err := pool.GetConnection("localhost:50051")
2. 超时控制 #
// 为每个调用设置超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.GetUser(ctx, req)
3. 错误处理 #
func handleGRPCError(err error) {
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.NotFound:
log.Println("Resource not found")
case codes.InvalidArgument:
log.Println("Invalid argument")
case codes.Unauthenticated:
log.Println("Authentication required")
default:
log.Printf("gRPC error: %s", st.Message())
}
} else {
log.Printf("Non-gRPC error: %v", err)
}
}
4. 资源清理 #
// 确保连接被正确关闭
defer func() {
if err := conn.Close(); err != nil {
log.Printf("Failed to close connection: %v", err)
}
}()
小结 #
本节详细介绍了 gRPC 客户端开发的各个方面:
- 基础实现:简单客户端、客户端封装
- 高级特性:连接池、重试机制、负载均衡、熔断器
- 流式处理:服务端流、客户端流、双向流
- 错误处理:状态码处理、错误分类
- 最佳实践:连接管理、超时控制、资源清理
通过这些内容,您可以构建出高效、可靠的 gRPC 客户端应用。在下一节中,我们将学习如何实现 gRPC 拦截器和中间件。