3.1.4 HTTP 客户端开发

3.1.4 HTTP 客户端开发 #

HTTP 客户端是现代应用程序的重要组成部分,用于与外部 API、微服务或第三方服务进行通信。Go 语言的 net/http 包提供了功能强大且易于使用的 HTTP 客户端实现。本节将详细介绍如何使用 Go 开发高效、可靠的 HTTP 客户端。

基础 HTTP 客户端 #

使用便捷函数 #

Go 提供了一系列便捷函数用于快速发送 HTTP 请求:

package main

import (
    "fmt"
    "io"
    "net/http"
    "strings"
)

func main() {
    // GET 请求
    resp, err := http.Get("https://httpbin.org/get")
    if err != nil {
        fmt.Printf("GET 请求失败: %v\n", err)
        return
    }
    defer resp.Body.Close()

    body, err := io.ReadAll(resp.Body)
    if err != nil {
        fmt.Printf("读取响应失败: %v\n", err)
        return
    }

    fmt.Printf("GET 响应状态: %s\n", resp.Status)
    fmt.Printf("GET 响应内容: %s\n", string(body))

    // POST 请求
    postData := strings.NewReader(`{"name": "张三", "age": 25}`)
    resp, err = http.Post("https://httpbin.org/post", "application/json", postData)
    if err != nil {
        fmt.Printf("POST 请求失败: %v\n", err)
        return
    }
    defer resp.Body.Close()

    body, err = io.ReadAll(resp.Body)
    if err != nil {
        fmt.Printf("读取响应失败: %v\n", err)
        return
    }

    fmt.Printf("POST 响应状态: %s\n", resp.Status)
    fmt.Printf("POST 响应内容: %s\n", string(body))

    // PostForm 请求
    formData := map[string][]string{
        "username": {"zhangsan"},
        "password": {"123456"},
    }
    resp, err = http.PostForm("https://httpbin.org/post", formData)
    if err != nil {
        fmt.Printf("PostForm 请求失败: %v\n", err)
        return
    }
    defer resp.Body.Close()

    fmt.Printf("PostForm 响应状态: %s\n", resp.Status)
}

使用 http.NewRequest #

对于更复杂的请求,可以使用 http.NewRequest 创建自定义请求:

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
)

type User struct {
    Name  string `json:"name"`
    Email string `json:"email"`
    Age   int    `json:"age"`
}

func main() {
    // 创建用户数据
    user := User{
        Name:  "李四",
        Email: "[email protected]",
        Age:   30,
    }

    // 序列化为 JSON
    jsonData, err := json.Marshal(user)
    if err != nil {
        fmt.Printf("JSON 序列化失败: %v\n", err)
        return
    }

    // 创建请求
    req, err := http.NewRequest("POST", "https://httpbin.org/post", bytes.NewBuffer(jsonData))
    if err != nil {
        fmt.Printf("创建请求失败: %v\n", err)
        return
    }

    // 设置请求头
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("User-Agent", "MyApp/1.0")
    req.Header.Set("Authorization", "Bearer token123")

    // 发送请求
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        fmt.Printf("发送请求失败: %v\n", err)
        return
    }
    defer resp.Body.Close()

    // 读取响应
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        fmt.Printf("读取响应失败: %v\n", err)
        return
    }

    fmt.Printf("响应状态: %s\n", resp.Status)
    fmt.Printf("响应头: %v\n", resp.Header)
    fmt.Printf("响应内容: %s\n", string(body))
}

自定义 HTTP 客户端 #

客户端配置 #

package main

import (
    "crypto/tls"
    "fmt"
    "net/http"
    "time"
)

func main() {
    // 创建自定义 Transport
    transport := &http.Transport{
        MaxIdleConns:        100,              // 最大空闲连接数
        MaxIdleConnsPerHost: 10,               // 每个主机的最大空闲连接数
        IdleConnTimeout:     90 * time.Second, // 空闲连接超时
        DisableCompression:  false,            // 启用压缩
        TLSClientConfig: &tls.Config{
            InsecureSkipVerify: false, // 验证 TLS 证书
        },
    }

    // 创建自定义客户端
    client := &http.Client{
        Transport: transport,
        Timeout:   30 * time.Second, // 总超时时间
        CheckRedirect: func(req *http.Request, via []*http.Request) error {
            // 限制重定向次数
            if len(via) >= 5 {
                return fmt.Errorf("重定向次数过多")
            }
            return nil
        },
    }

    // 使用自定义客户端发送请求
    resp, err := client.Get("https://httpbin.org/get")
    if err != nil {
        fmt.Printf("请求失败: %v\n", err)
        return
    }
    defer resp.Body.Close()

    fmt.Printf("响应状态: %s\n", resp.Status)
}

连接池管理 #

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
)

// HTTP 客户端管理器
type HTTPClientManager struct {
    clients map[string]*http.Client
    mutex   sync.RWMutex
}

func NewHTTPClientManager() *HTTPClientManager {
    return &HTTPClientManager{
        clients: make(map[string]*http.Client),
    }
}

// 获取或创建客户端
func (m *HTTPClientManager) GetClient(name string, config *ClientConfig) *http.Client {
    m.mutex.RLock()
    client, exists := m.clients[name]
    m.mutex.RUnlock()

    if exists {
        return client
    }

    m.mutex.Lock()
    defer m.mutex.Unlock()

    // 双重检查
    if client, exists := m.clients[name]; exists {
        return client
    }

    // 创建新客户端
    transport := &http.Transport{
        MaxIdleConns:        config.MaxIdleConns,
        MaxIdleConnsPerHost: config.MaxIdleConnsPerHost,
        IdleConnTimeout:     config.IdleConnTimeout,
    }

    client = &http.Client{
        Transport: transport,
        Timeout:   config.Timeout,
    }

    m.clients[name] = client
    return client
}

type ClientConfig struct {
    MaxIdleConns        int
    MaxIdleConnsPerHost int
    IdleConnTimeout     time.Duration
    Timeout             time.Duration
}

func main() {
    manager := NewHTTPClientManager()

    // 配置不同的客户端
    fastConfig := &ClientConfig{
        MaxIdleConns:        50,
        MaxIdleConnsPerHost: 5,
        IdleConnTimeout:     30 * time.Second,
        Timeout:             5 * time.Second,
    }

    slowConfig := &ClientConfig{
        MaxIdleConns:        20,
        MaxIdleConnsPerHost: 2,
        IdleConnTimeout:     60 * time.Second,
        Timeout:             30 * time.Second,
    }

    // 获取客户端
    fastClient := manager.GetClient("fast", fastConfig)
    slowClient := manager.GetClient("slow", slowConfig)

    // 使用不同的客户端
    resp1, err := fastClient.Get("https://httpbin.org/delay/1")
    if err != nil {
        fmt.Printf("快速客户端请求失败: %v\n", err)
    } else {
        fmt.Printf("快速客户端响应: %s\n", resp1.Status)
        resp1.Body.Close()
    }

    resp2, err := slowClient.Get("https://httpbin.org/delay/10")
    if err != nil {
        fmt.Printf("慢速客户端请求失败: %v\n", err)
    } else {
        fmt.Printf("慢速客户端响应: %s\n", resp2.Status)
        resp2.Body.Close()
    }
}

请求构建和发送 #

RESTful API 客户端 #

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "strconv"
)

// API 客户端
type APIClient struct {
    BaseURL    string
    HTTPClient *http.Client
    APIKey     string
}

// 用户结构
type User struct {
    ID    int    `json:"id,omitempty"`
    Name  string `json:"name"`
    Email string `json:"email"`
    Age   int    `json:"age"`
}

// API 响应结构
type APIResponse struct {
    Success bool        `json:"success"`
    Data    interface{} `json:"data,omitempty"`
    Error   string      `json:"error,omitempty"`
}

// 创建 API 客户端
func NewAPIClient(baseURL, apiKey string) *APIClient {
    return &APIClient{
        BaseURL: baseURL,
        HTTPClient: &http.Client{
            Timeout: 30 * time.Second,
        },
        APIKey: apiKey,
    }
}

// 发送请求的通用方法
func (c *APIClient) sendRequest(method, endpoint string, body interface{}) (*http.Response, error) {
    // 构建 URL
    u, err := url.Parse(c.BaseURL + endpoint)
    if err != nil {
        return nil, err
    }

    // 准备请求体
    var reqBody io.Reader
    if body != nil {
        jsonData, err := json.Marshal(body)
        if err != nil {
            return nil, err
        }
        reqBody = bytes.NewBuffer(jsonData)
    }

    // 创建请求
    req, err := http.NewRequest(method, u.String(), reqBody)
    if err != nil {
        return nil, err
    }

    // 设置请求头
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("Accept", "application/json")
    if c.APIKey != "" {
        req.Header.Set("Authorization", "Bearer "+c.APIKey)
    }

    // 发送请求
    return c.HTTPClient.Do(req)
}

// 获取用户列表
func (c *APIClient) GetUsers() ([]User, error) {
    resp, err := c.sendRequest("GET", "/users", nil)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("API 错误: %s", resp.Status)
    }

    var users []User
    if err := json.NewDecoder(resp.Body).Decode(&users); err != nil {
        return nil, err
    }

    return users, nil
}

// 获取单个用户
func (c *APIClient) GetUser(id int) (*User, error) {
    endpoint := "/users/" + strconv.Itoa(id)
    resp, err := c.sendRequest("GET", endpoint, nil)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    if resp.StatusCode == http.StatusNotFound {
        return nil, fmt.Errorf("用户不存在")
    }

    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("API 错误: %s", resp.Status)
    }

    var user User
    if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
        return nil, err
    }

    return &user, nil
}

// 创建用户
func (c *APIClient) CreateUser(user *User) (*User, error) {
    resp, err := c.sendRequest("POST", "/users", user)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusCreated {
        return nil, fmt.Errorf("创建用户失败: %s", resp.Status)
    }

    var createdUser User
    if err := json.NewDecoder(resp.Body).Decode(&createdUser); err != nil {
        return nil, err
    }

    return &createdUser, nil
}

// 更新用户
func (c *APIClient) UpdateUser(id int, user *User) (*User, error) {
    endpoint := "/users/" + strconv.Itoa(id)
    resp, err := c.sendRequest("PUT", endpoint, user)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("更新用户失败: %s", resp.Status)
    }

    var updatedUser User
    if err := json.NewDecoder(resp.Body).Decode(&updatedUser); err != nil {
        return nil, err
    }

    return &updatedUser, nil
}

// 删除用户
func (c *APIClient) DeleteUser(id int) error {
    endpoint := "/users/" + strconv.Itoa(id)
    resp, err := c.sendRequest("DELETE", endpoint, nil)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusNoContent {
        return fmt.Errorf("删除用户失败: %s", resp.Status)
    }

    return nil
}

func main() {
    // 创建 API 客户端
    client := NewAPIClient("https://jsonplaceholder.typicode.com", "your-api-key")

    // 获取用户列表
    users, err := client.GetUsers()
    if err != nil {
        fmt.Printf("获取用户列表失败: %v\n", err)
    } else {
        fmt.Printf("获取到 %d 个用户\n", len(users))
        if len(users) > 0 {
            fmt.Printf("第一个用户: %+v\n", users[0])
        }
    }

    // 获取单个用户
    user, err := client.GetUser(1)
    if err != nil {
        fmt.Printf("获取用户失败: %v\n", err)
    } else {
        fmt.Printf("用户详情: %+v\n", user)
    }

    // 创建新用户
    newUser := &User{
        Name:  "张三",
        Email: "[email protected]",
        Age:   25,
    }

    createdUser, err := client.CreateUser(newUser)
    if err != nil {
        fmt.Printf("创建用户失败: %v\n", err)
    } else {
        fmt.Printf("创建的用户: %+v\n", createdUser)
    }
}

文件上传客户端 #

package main

import (
    "bytes"
    "fmt"
    "io"
    "mime/multipart"
    "net/http"
    "os"
    "path/filepath"
)

// 上传文件
func uploadFile(url, fieldName, filename string) error {
    // 打开文件
    file, err := os.Open(filename)
    if err != nil {
        return err
    }
    defer file.Close()

    // 创建 multipart writer
    var requestBody bytes.Buffer
    writer := multipart.NewWriter(&requestBody)

    // 创建文件字段
    part, err := writer.CreateFormFile(fieldName, filepath.Base(filename))
    if err != nil {
        return err
    }

    // 复制文件内容
    if _, err := io.Copy(part, file); err != nil {
        return err
    }

    // 添加其他字段
    writer.WriteField("description", "上传的文件")
    writer.WriteField("category", "document")

    // 关闭 writer
    if err := writer.Close(); err != nil {
        return err
    }

    // 创建请求
    req, err := http.NewRequest("POST", url, &requestBody)
    if err != nil {
        return err
    }

    // 设置 Content-Type
    req.Header.Set("Content-Type", writer.FormDataContentType())

    // 发送请求
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    // 读取响应
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return err
    }

    fmt.Printf("上传响应状态: %s\n", resp.Status)
    fmt.Printf("上传响应内容: %s\n", string(body))

    return nil
}

// 下载文件
func downloadFile(url, filename string) error {
    // 发送 GET 请求
    resp, err := http.Get(url)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    // 检查状态码
    if resp.StatusCode != http.StatusOK {
        return fmt.Errorf("下载失败: %s", resp.Status)
    }

    // 创建文件
    file, err := os.Create(filename)
    if err != nil {
        return err
    }
    defer file.Close()

    // 复制内容
    _, err = io.Copy(file, resp.Body)
    if err != nil {
        return err
    }

    fmt.Printf("文件下载完成: %s\n", filename)
    return nil
}

// 带进度的文件下载
type ProgressReader struct {
    io.Reader
    Total    int64
    Current  int64
    Callback func(current, total int64)
}

func (pr *ProgressReader) Read(p []byte) (int, error) {
    n, err := pr.Reader.Read(p)
    pr.Current += int64(n)
    if pr.Callback != nil {
        pr.Callback(pr.Current, pr.Total)
    }
    return n, err
}

func downloadFileWithProgress(url, filename string) error {
    // 发送 HEAD 请求获取文件大小
    headResp, err := http.Head(url)
    if err != nil {
        return err
    }
    headResp.Body.Close()

    fileSize := headResp.ContentLength

    // 发送 GET 请求
    resp, err := http.Get(url)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    // 创建文件
    file, err := os.Create(filename)
    if err != nil {
        return err
    }
    defer file.Close()

    // 创建进度读取器
    progressReader := &ProgressReader{
        Reader: resp.Body,
        Total:  fileSize,
        Callback: func(current, total int64) {
            percent := float64(current) / float64(total) * 100
            fmt.Printf("\r下载进度: %.2f%% (%d/%d bytes)", percent, current, total)
        },
    }

    // 复制内容
    _, err = io.Copy(file, progressReader)
    if err != nil {
        return err
    }

    fmt.Printf("\n文件下载完成: %s\n", filename)
    return nil
}

func main() {
    // 上传文件示例
    if err := uploadFile("https://httpbin.org/post", "file", "test.txt"); err != nil {
        fmt.Printf("上传文件失败: %v\n", err)
    }

    // 下载文件示例
    if err := downloadFile("https://httpbin.org/json", "downloaded.json"); err != nil {
        fmt.Printf("下载文件失败: %v\n", err)
    }

    // 带进度的下载示例
    if err := downloadFileWithProgress("https://httpbin.org/bytes/1024", "large_file.bin"); err != nil {
        fmt.Printf("下载文件失败: %v\n", err)
    }
}

错误处理和重试 #

智能重试机制 #

package main

import (
    "context"
    "fmt"
    "math"
    "net/http"
    "time"
)

// 重试配置
type RetryConfig struct {
    MaxRetries      int
    InitialDelay    time.Duration
    MaxDelay        time.Duration
    BackoffFactor   float64
    RetryableErrors []int // 可重试的状态码
}

// 默认重试配置
func DefaultRetryConfig() *RetryConfig {
    return &RetryConfig{
        MaxRetries:    3,
        InitialDelay:  1 * time.Second,
        MaxDelay:      30 * time.Second,
        BackoffFactor: 2.0,
        RetryableErrors: []int{
            http.StatusInternalServerError,
            http.StatusBadGateway,
            http.StatusServiceUnavailable,
            http.StatusGatewayTimeout,
        },
    }
}

// 重试客户端
type RetryClient struct {
    HTTPClient *http.Client
    Config     *RetryConfig
}

func NewRetryClient(config *RetryConfig) *RetryClient {
    if config == nil {
        config = DefaultRetryConfig()
    }

    return &RetryClient{
        HTTPClient: &http.Client{
            Timeout: 30 * time.Second,
        },
        Config: config,
    }
}

// 检查是否可重试
func (rc *RetryClient) isRetryable(statusCode int, err error) bool {
    // 网络错误通常可以重试
    if err != nil {
        return true
    }

    // 检查状态码是否可重试
    for _, code := range rc.Config.RetryableErrors {
        if statusCode == code {
            return true
        }
    }

    return false
}

// 计算退避延迟
func (rc *RetryClient) calculateDelay(attempt int) time.Duration {
    delay := float64(rc.Config.InitialDelay) * math.Pow(rc.Config.BackoffFactor, float64(attempt))
    maxDelay := float64(rc.Config.MaxDelay)

    if delay > maxDelay {
        delay = maxDelay
    }

    return time.Duration(delay)
}

// 带重试的请求
func (rc *RetryClient) Do(req *http.Request) (*http.Response, error) {
    var lastErr error
    var resp *http.Response

    for attempt := 0; attempt <= rc.Config.MaxRetries; attempt++ {
        // 克隆请求(因为请求体可能被消费)
        reqClone := req.Clone(req.Context())

        // 发送请求
        resp, lastErr = rc.HTTPClient.Do(reqClone)

        // 如果成功或不可重试,直接返回
        if lastErr == nil && !rc.isRetryable(resp.StatusCode, nil) {
            return resp, nil
        }

        // 如果是最后一次尝试,不再等待
        if attempt == rc.Config.MaxRetries {
            break
        }

        // 计算延迟时间
        delay := rc.calculateDelay(attempt)
        fmt.Printf("请求失败,%v 后重试 (尝试 %d/%d)\n", delay, attempt+1, rc.Config.MaxRetries)

        // 等待重试
        select {
        case <-time.After(delay):
        case <-req.Context().Done():
            return nil, req.Context().Err()
        }

        // 关闭之前的响应体
        if resp != nil {
            resp.Body.Close()
        }
    }

    return resp, lastErr
}

// 带超时和重试的 GET 请求
func (rc *RetryClient) GetWithTimeout(url string, timeout time.Duration) (*http.Response, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return nil, err
    }

    return rc.Do(req)
}

func main() {
    // 创建重试客户端
    retryClient := NewRetryClient(nil)

    // 测试正常请求
    resp, err := retryClient.GetWithTimeout("https://httpbin.org/get", 10*time.Second)
    if err != nil {
        fmt.Printf("请求失败: %v\n", err)
    } else {
        fmt.Printf("请求成功: %s\n", resp.Status)
        resp.Body.Close()
    }

    // 测试会失败的请求(触发重试)
    resp, err = retryClient.GetWithTimeout("https://httpbin.org/status/500", 30*time.Second)
    if err != nil {
        fmt.Printf("重试后仍然失败: %v\n", err)
    } else {
        fmt.Printf("最终响应: %s\n", resp.Status)
        resp.Body.Close()
    }
}

熔断器模式 #

package main

import (
    "errors"
    "fmt"
    "net/http"
    "sync"
    "time"
)

// 熔断器状态
type CircuitState int

const (
    StateClosed CircuitState = iota
    StateOpen
    StateHalfOpen
)

// 熔断器配置
type CircuitBreakerConfig struct {
    MaxRequests      uint32        // 半开状态下的最大请求数
    Interval         time.Duration // 统计间隔
    Timeout          time.Duration // 开启状态的超时时间
    ReadyToTrip      func(counts Counts) bool // 判断是否应该开启熔断器
    OnStateChange    func(name string, from CircuitState, to CircuitState) // 状态变化回调
}

// 计数器
type Counts struct {
    Requests         uint32
    TotalSuccesses   uint32
    TotalFailures    uint32
    ConsecutiveSuccesses uint32
    ConsecutiveFailures  uint32
}

// 熔断器
type CircuitBreaker struct {
    name          string
    maxRequests   uint32
    interval      time.Duration
    timeout       time.Duration
    readyToTrip   func(counts Counts) bool
    onStateChange func(name string, from CircuitState, to CircuitState)

    mutex      sync.Mutex
    state      CircuitState
    generation uint64
    counts     Counts
    expiry     time.Time
}

// 创建熔断器
func NewCircuitBreaker(name string, config CircuitBreakerConfig) *CircuitBreaker {
    cb := &CircuitBreaker{
        name:          name,
        maxRequests:   config.MaxRequests,
        interval:      config.Interval,
        timeout:       config.Timeout,
        readyToTrip:   config.ReadyToTrip,
        onStateChange: config.OnStateChange,
    }

    cb.toNewGeneration(time.Now())
    return cb
}

// 执行请求
func (cb *CircuitBreaker) Execute(req func() (*http.Response, error)) (*http.Response, error) {
    generation, err := cb.beforeRequest()
    if err != nil {
        return nil, err
    }

    defer func() {
        if r := recover(); r != nil {
            cb.afterRequest(generation, false)
            panic(r)
        }
    }()

    resp, err := req()
    cb.afterRequest(generation, err == nil && resp.StatusCode < 500)
    return resp, err
}

// 请求前检查
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()

    now := time.Now()
    state, generation := cb.currentState(now)

    if state == StateOpen {
        return generation, errors.New("熔断器开启")
    } else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
        return generation, errors.New("熔断器半开状态请求数过多")
    }

    cb.counts.Requests++
    return generation, nil
}

// 请求后处理
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()

    now := time.Now()
    state, generation := cb.currentState(now)
    if generation != before {
        return
    }

    if success {
        cb.onSuccess(state, now)
    } else {
        cb.onFailure(state, now)
    }
}

// 成功处理
func (cb *CircuitBreaker) onSuccess(state CircuitState, now time.Time) {
    cb.counts.TotalSuccesses++
    cb.counts.ConsecutiveSuccesses++
    cb.counts.ConsecutiveFailures = 0

    if state == StateHalfOpen {
        cb.setState(StateClosed, now)
    }
}

// 失败处理
func (cb *CircuitBreaker) onFailure(state CircuitState, now time.Time) {
    cb.counts.TotalFailures++
    cb.counts.ConsecutiveFailures++
    cb.counts.ConsecutiveSuccesses = 0

    if cb.readyToTrip(cb.counts) {
        cb.setState(StateOpen, now)
    }
}

// 获取当前状态
func (cb *CircuitBreaker) currentState(now time.Time) (CircuitState, uint64) {
    switch cb.state {
    case StateClosed:
        if !cb.expiry.IsZero() && cb.expiry.Before(now) {
            cb.toNewGeneration(now)
        }
    case StateOpen:
        if cb.expiry.Before(now) {
            cb.setState(StateHalfOpen, now)
        }
    }
    return cb.state, cb.generation
}

// 设置状态
func (cb *CircuitBreaker) setState(state CircuitState, now time.Time) {
    if cb.state == state {
        return
    }

    prev := cb.state
    cb.state = state
    cb.toNewGeneration(now)

    if cb.onStateChange != nil {
        cb.onStateChange(cb.name, prev, state)
    }
}

// 新的统计周期
func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
    cb.generation++
    cb.counts = Counts{}

    var zero time.Time
    switch cb.state {
    case StateClosed:
        if cb.interval == 0 {
            cb.expiry = zero
        } else {
            cb.expiry = now.Add(cb.interval)
        }
    case StateOpen:
        cb.expiry = now.Add(cb.timeout)
    default: // StateHalfOpen
        cb.expiry = zero
    }
}

// 带熔断器的 HTTP 客户端
type CircuitBreakerClient struct {
    client   *http.Client
    breakers map[string]*CircuitBreaker
    mutex    sync.RWMutex
}

func NewCircuitBreakerClient() *CircuitBreakerClient {
    return &CircuitBreakerClient{
        client:   &http.Client{Timeout: 10 * time.Second},
        breakers: make(map[string]*CircuitBreaker),
    }
}

// 获取或创建熔断器
func (cbc *CircuitBreakerClient) getBreaker(host string) *CircuitBreaker {
    cbc.mutex.RLock()
    breaker, exists := cbc.breakers[host]
    cbc.mutex.RUnlock()

    if exists {
        return breaker
    }

    cbc.mutex.Lock()
    defer cbc.mutex.Unlock()

    if breaker, exists := cbc.breakers[host]; exists {
        return breaker
    }

    config := CircuitBreakerConfig{
        MaxRequests: 3,
        Interval:    10 * time.Second,
        Timeout:     30 * time.Second,
        ReadyToTrip: func(counts Counts) bool {
            return counts.Requests >= 5 && counts.ConsecutiveFailures >= 3
        },
        OnStateChange: func(name string, from CircuitState, to CircuitState) {
            fmt.Printf("熔断器 %s 状态变化: %v -> %v\n", name, from, to)
        },
    }

    breaker = NewCircuitBreaker(host, config)
    cbc.breakers[host] = breaker
    return breaker
}

// 发送请求
func (cbc *CircuitBreakerClient) Get(url string) (*http.Response, error) {
    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        return nil, err
    }

    host := req.URL.Host
    breaker := cbc.getBreaker(host)

    return breaker.Execute(func() (*http.Response, error) {
        return cbc.client.Do(req)
    })
}

func main() {
    client := NewCircuitBreakerClient()

    // 测试正常请求
    for i := 0; i < 10; i++ {
        resp, err := client.Get("https://httpbin.org/get")
        if err != nil {
            fmt.Printf("请求 %d 失败: %v\n", i+1, err)
        } else {
            fmt.Printf("请求 %d 成功: %s\n", i+1, resp.Status)
            resp.Body.Close()
        }
        time.Sleep(1 * time.Second)
    }

    // 测试失败请求(触发熔断器)
    fmt.Println("\n测试失败请求:")
    for i := 0; i < 10; i++ {
        resp, err := client.Get("https://httpbin.org/status/500")
        if err != nil {
            fmt.Printf("请求 %d 失败: %v\n", i+1, err)
        } else {
            fmt.Printf("请求 %d 响应: %s\n", i+1, resp.Status)
            resp.Body.Close()
        }
        time.Sleep(1 * time.Second)
    }
}

并发请求处理 #

并发请求和结果聚合 #

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "sync"
    "time"
)

// 请求结果
type RequestResult struct {
    URL      string
    Response *http.Response
    Error    error
    Duration time.Duration
}

// 并发 HTTP 客户端
type ConcurrentClient struct {
    client     *http.Client
    maxWorkers int
}

func NewConcurrentClient(maxWorkers int) *ConcurrentClient {
    return &ConcurrentClient{
        client: &http.Client{
            Timeout: 10 * time.Second,
        },
        maxWorkers: maxWorkers,
    }
}

// 并发发送多个请求
func (cc *ConcurrentClient) GetMultiple(urls []string) []RequestResult {
    results := make([]RequestResult, len(urls))
    jobs := make(chan int, len(urls))

    // 启动工作协程
    var wg sync.WaitGroup
    for i := 0; i < cc.maxWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for index := range jobs {
                url := urls[index]
                start := time.Now()

                resp, err := cc.client.Get(url)
                duration := time.Since(start)

                results[index] = RequestResult{
                    URL:      url,
                    Response: resp,
                    Error:    err,
                    Duration: duration,
                }
            }
        }()
    }

    // 发送任务
    for i := range urls {
        jobs <- i
    }
    close(jobs)

    // 等待完成
    wg.Wait()

    return results
}

// 带超时的并发请求
func (cc *ConcurrentClient) GetMultipleWithTimeout(urls []string, timeout time.Duration) []RequestResult {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    results := make([]RequestResult, len(urls))
    jobs := make(chan int, len(urls))

    var wg sync.WaitGroup
    for i := 0; i < cc.maxWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case index, ok := <-jobs:
                    if !ok {
                        return
                    }

                    url := urls[index]
                    start := time.Now()

                    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
                    if err != nil {
                        results[index] = RequestResult{
                            URL:      url,
                            Error:    err,
                            Duration: time.Since(start),
                        }
                        continue
                    }

                    resp, err := cc.client.Do(req)
                    duration := time.Since(start)

                    results[index] = RequestResult{
                        URL:      url,
                        Response: resp,
                        Error:    err,
                        Duration: duration,
                    }

                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    // 发送任务
    go func() {
        for i := range urls {
            select {
            case jobs <- i:
            case <-ctx.Done():
                break
            }
        }
        close(jobs)
    }()

    // 等待完成或超时
    wg.Wait()

    return results
}

// API 聚合示例
type UserInfo struct {
    ID       int    `json:"id"`
    Name     string `json:"name"`
    Username string `json:"username"`
    Email    string `json:"email"`
}

type PostInfo struct {
    ID     int    `json:"id"`
    UserID int    `json:"userId"`
    Title  string `json:"title"`
    Body   string `json:"body"`
}

type UserProfile struct {
    User  *UserInfo   `json:"user"`
    Posts []PostInfo  `json:"posts"`
    Error string      `json:"error,omitempty"`
}

// 聚合用户信息
func (cc *ConcurrentClient) GetUserProfile(userID int) (*UserProfile, error) {
    urls := []string{
        fmt.Sprintf("https://jsonplaceholder.typicode.com/users/%d", userID),
        fmt.Sprintf("https://jsonplaceholder.typicode.com/users/%d/posts", userID),
    }

    results := cc.GetMultipleWithTimeout(urls, 15*time.Second)

    profile := &UserProfile{}

    // 处理用户信息
    if results[0].Error != nil {
        profile.Error = fmt.Sprintf("获取用户信息失败: %v", results[0].Error)
    } else if results[0].Response.StatusCode != http.StatusOK {
        profile.Error = fmt.Sprintf("用户信息请求失败: %s", results[0].Response.Status)
        results[0].Response.Body.Close()
    } else {
        var user UserInfo
        if err := json.NewDecoder(results[0].Response.Body).Decode(&user); err != nil {
            profile.Error = fmt.Sprintf("解析用户信息失败: %v", err)
        } else {
            profile.User = &user
        }
        results[0].Response.Body.Close()
    }

    // 处理用户文章
    if results[1].Error != nil {
        if profile.Error == "" {
            profile.Error = fmt.Sprintf("获取用户文章失败: %v", results[1].Error)
        }
    } else if results[1].Response.StatusCode != http.StatusOK {
        if profile.Error == "" {
            profile.Error = fmt.Sprintf("用户文章请求失败: %s", results[1].Response.Status)
        }
        results[1].Response.Body.Close()
    } else {
        var posts []PostInfo
        if err := json.NewDecoder(results[1].Response.Body).Decode(&posts); err != nil {
            if profile.Error == "" {
                profile.Error = fmt.Sprintf("解析用户文章失败: %v", err)
            }
        } else {
            profile.Posts = posts
        }
        results[1].Response.Body.Close()
    }

    return profile, nil
}

func main() {
    client := NewConcurrentClient(5)

    // 测试并发请求
    urls := []string{
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/3",
        "https://httpbin.org/get",
        "https://httpbin.org/json",
    }

    fmt.Println("发送并发请求...")
    start := time.Now()
    results := client.GetMultipleWithTimeout(urls, 10*time.Second)
    totalDuration := time.Since(start)

    fmt.Printf("总耗时: %v\n", totalDuration)
    for i, result := range results {
        if result.Error != nil {
            fmt.Printf("请求 %d (%s) 失败: %v (耗时: %v)\n",
                i+1, result.URL, result.Error, result.Duration)
        } else {
            fmt.Printf("请求 %d (%s) 成功: %s (耗时: %v)\n",
                i+1, result.URL, result.Response.Status, result.Duration)
            result.Response.Body.Close()
        }
    }

    // 测试 API 聚合
    fmt.Println("\n测试 API 聚合...")
    profile, err := client.GetUserProfile(1)
    if err != nil {
        fmt.Printf("获取用户档案失败: %v\n", err)
    } else {
        if profile.Error != "" {
            fmt.Printf("用户档案错误: %s\n", profile.Error)
        }
        if profile.User != nil {
            fmt.Printf("用户信息: %s (%s)\n", profile.User.Name, profile.User.Email)
        }
        if len(profile.Posts) > 0 {
            fmt.Printf("用户文章数量: %d\n", len(profile.Posts))
            fmt.Printf("第一篇文章: %s\n", profile.Posts[0].Title)
        }
    }
}

小结 #

本节全面介绍了 Go HTTP 客户端开发的核心技术:

  1. 基础客户端使用:便捷函数和自定义请求的创建方法
  2. 客户端配置:Transport 配置、连接池管理和客户端定制
  3. RESTful API 客户端:完整的 API 客户端实现,包括 CRUD 操作
  4. 文件操作:文件上传、下载和带进度显示的实现
  5. 错误处理和重试:智能重试机制和熔断器模式的实现
  6. 并发请求:并发发送请求和结果聚合的技术

这些技术为构建健壮、高性能的 HTTP 客户端应用提供了完整的解决方案。掌握这些知识,你就能够开发出能够与各种 Web 服务和 API 高效交互的 Go 应用程序。

在下一节中,我们将学习 Web 框架的使用,了解如何使用流行的 Go Web 框架来简化 Web 应用开发。