3.1.4 HTTP 客户端开发 #
HTTP 客户端是现代应用程序的重要组成部分,用于与外部 API、微服务或第三方服务进行通信。Go 语言的 net/http
包提供了功能强大且易于使用的 HTTP 客户端实现。本节将详细介绍如何使用 Go 开发高效、可靠的 HTTP 客户端。
基础 HTTP 客户端 #
使用便捷函数 #
Go 提供了一系列便捷函数用于快速发送 HTTP 请求:
package main
import (
"fmt"
"io"
"net/http"
"strings"
)
func main() {
// GET 请求
resp, err := http.Get("https://httpbin.org/get")
if err != nil {
fmt.Printf("GET 请求失败: %v\n", err)
return
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Printf("读取响应失败: %v\n", err)
return
}
fmt.Printf("GET 响应状态: %s\n", resp.Status)
fmt.Printf("GET 响应内容: %s\n", string(body))
// POST 请求
postData := strings.NewReader(`{"name": "张三", "age": 25}`)
resp, err = http.Post("https://httpbin.org/post", "application/json", postData)
if err != nil {
fmt.Printf("POST 请求失败: %v\n", err)
return
}
defer resp.Body.Close()
body, err = io.ReadAll(resp.Body)
if err != nil {
fmt.Printf("读取响应失败: %v\n", err)
return
}
fmt.Printf("POST 响应状态: %s\n", resp.Status)
fmt.Printf("POST 响应内容: %s\n", string(body))
// PostForm 请求
formData := map[string][]string{
"username": {"zhangsan"},
"password": {"123456"},
}
resp, err = http.PostForm("https://httpbin.org/post", formData)
if err != nil {
fmt.Printf("PostForm 请求失败: %v\n", err)
return
}
defer resp.Body.Close()
fmt.Printf("PostForm 响应状态: %s\n", resp.Status)
}
使用 http.NewRequest #
对于更复杂的请求,可以使用 http.NewRequest
创建自定义请求:
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
)
type User struct {
Name string `json:"name"`
Email string `json:"email"`
Age int `json:"age"`
}
func main() {
// 创建用户数据
user := User{
Name: "李四",
Email: "[email protected]",
Age: 30,
}
// 序列化为 JSON
jsonData, err := json.Marshal(user)
if err != nil {
fmt.Printf("JSON 序列化失败: %v\n", err)
return
}
// 创建请求
req, err := http.NewRequest("POST", "https://httpbin.org/post", bytes.NewBuffer(jsonData))
if err != nil {
fmt.Printf("创建请求失败: %v\n", err)
return
}
// 设置请求头
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "MyApp/1.0")
req.Header.Set("Authorization", "Bearer token123")
// 发送请求
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("发送请求失败: %v\n", err)
return
}
defer resp.Body.Close()
// 读取响应
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Printf("读取响应失败: %v\n", err)
return
}
fmt.Printf("响应状态: %s\n", resp.Status)
fmt.Printf("响应头: %v\n", resp.Header)
fmt.Printf("响应内容: %s\n", string(body))
}
自定义 HTTP 客户端 #
客户端配置 #
package main
import (
"crypto/tls"
"fmt"
"net/http"
"time"
)
func main() {
// 创建自定义 Transport
transport := &http.Transport{
MaxIdleConns: 100, // 最大空闲连接数
MaxIdleConnsPerHost: 10, // 每个主机的最大空闲连接数
IdleConnTimeout: 90 * time.Second, // 空闲连接超时
DisableCompression: false, // 启用压缩
TLSClientConfig: &tls.Config{
InsecureSkipVerify: false, // 验证 TLS 证书
},
}
// 创建自定义客户端
client := &http.Client{
Transport: transport,
Timeout: 30 * time.Second, // 总超时时间
CheckRedirect: func(req *http.Request, via []*http.Request) error {
// 限制重定向次数
if len(via) >= 5 {
return fmt.Errorf("重定向次数过多")
}
return nil
},
}
// 使用自定义客户端发送请求
resp, err := client.Get("https://httpbin.org/get")
if err != nil {
fmt.Printf("请求失败: %v\n", err)
return
}
defer resp.Body.Close()
fmt.Printf("响应状态: %s\n", resp.Status)
}
连接池管理 #
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
// HTTP 客户端管理器
type HTTPClientManager struct {
clients map[string]*http.Client
mutex sync.RWMutex
}
func NewHTTPClientManager() *HTTPClientManager {
return &HTTPClientManager{
clients: make(map[string]*http.Client),
}
}
// 获取或创建客户端
func (m *HTTPClientManager) GetClient(name string, config *ClientConfig) *http.Client {
m.mutex.RLock()
client, exists := m.clients[name]
m.mutex.RUnlock()
if exists {
return client
}
m.mutex.Lock()
defer m.mutex.Unlock()
// 双重检查
if client, exists := m.clients[name]; exists {
return client
}
// 创建新客户端
transport := &http.Transport{
MaxIdleConns: config.MaxIdleConns,
MaxIdleConnsPerHost: config.MaxIdleConnsPerHost,
IdleConnTimeout: config.IdleConnTimeout,
}
client = &http.Client{
Transport: transport,
Timeout: config.Timeout,
}
m.clients[name] = client
return client
}
type ClientConfig struct {
MaxIdleConns int
MaxIdleConnsPerHost int
IdleConnTimeout time.Duration
Timeout time.Duration
}
func main() {
manager := NewHTTPClientManager()
// 配置不同的客户端
fastConfig := &ClientConfig{
MaxIdleConns: 50,
MaxIdleConnsPerHost: 5,
IdleConnTimeout: 30 * time.Second,
Timeout: 5 * time.Second,
}
slowConfig := &ClientConfig{
MaxIdleConns: 20,
MaxIdleConnsPerHost: 2,
IdleConnTimeout: 60 * time.Second,
Timeout: 30 * time.Second,
}
// 获取客户端
fastClient := manager.GetClient("fast", fastConfig)
slowClient := manager.GetClient("slow", slowConfig)
// 使用不同的客户端
resp1, err := fastClient.Get("https://httpbin.org/delay/1")
if err != nil {
fmt.Printf("快速客户端请求失败: %v\n", err)
} else {
fmt.Printf("快速客户端响应: %s\n", resp1.Status)
resp1.Body.Close()
}
resp2, err := slowClient.Get("https://httpbin.org/delay/10")
if err != nil {
fmt.Printf("慢速客户端请求失败: %v\n", err)
} else {
fmt.Printf("慢速客户端响应: %s\n", resp2.Status)
resp2.Body.Close()
}
}
请求构建和发送 #
RESTful API 客户端 #
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
)
// API 客户端
type APIClient struct {
BaseURL string
HTTPClient *http.Client
APIKey string
}
// 用户结构
type User struct {
ID int `json:"id,omitempty"`
Name string `json:"name"`
Email string `json:"email"`
Age int `json:"age"`
}
// API 响应结构
type APIResponse struct {
Success bool `json:"success"`
Data interface{} `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
// 创建 API 客户端
func NewAPIClient(baseURL, apiKey string) *APIClient {
return &APIClient{
BaseURL: baseURL,
HTTPClient: &http.Client{
Timeout: 30 * time.Second,
},
APIKey: apiKey,
}
}
// 发送请求的通用方法
func (c *APIClient) sendRequest(method, endpoint string, body interface{}) (*http.Response, error) {
// 构建 URL
u, err := url.Parse(c.BaseURL + endpoint)
if err != nil {
return nil, err
}
// 准备请求体
var reqBody io.Reader
if body != nil {
jsonData, err := json.Marshal(body)
if err != nil {
return nil, err
}
reqBody = bytes.NewBuffer(jsonData)
}
// 创建请求
req, err := http.NewRequest(method, u.String(), reqBody)
if err != nil {
return nil, err
}
// 设置请求头
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
if c.APIKey != "" {
req.Header.Set("Authorization", "Bearer "+c.APIKey)
}
// 发送请求
return c.HTTPClient.Do(req)
}
// 获取用户列表
func (c *APIClient) GetUsers() ([]User, error) {
resp, err := c.sendRequest("GET", "/users", nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("API 错误: %s", resp.Status)
}
var users []User
if err := json.NewDecoder(resp.Body).Decode(&users); err != nil {
return nil, err
}
return users, nil
}
// 获取单个用户
func (c *APIClient) GetUser(id int) (*User, error) {
endpoint := "/users/" + strconv.Itoa(id)
resp, err := c.sendRequest("GET", endpoint, nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("用户不存在")
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("API 错误: %s", resp.Status)
}
var user User
if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
return nil, err
}
return &user, nil
}
// 创建用户
func (c *APIClient) CreateUser(user *User) (*User, error) {
resp, err := c.sendRequest("POST", "/users", user)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
return nil, fmt.Errorf("创建用户失败: %s", resp.Status)
}
var createdUser User
if err := json.NewDecoder(resp.Body).Decode(&createdUser); err != nil {
return nil, err
}
return &createdUser, nil
}
// 更新用户
func (c *APIClient) UpdateUser(id int, user *User) (*User, error) {
endpoint := "/users/" + strconv.Itoa(id)
resp, err := c.sendRequest("PUT", endpoint, user)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("更新用户失败: %s", resp.Status)
}
var updatedUser User
if err := json.NewDecoder(resp.Body).Decode(&updatedUser); err != nil {
return nil, err
}
return &updatedUser, nil
}
// 删除用户
func (c *APIClient) DeleteUser(id int) error {
endpoint := "/users/" + strconv.Itoa(id)
resp, err := c.sendRequest("DELETE", endpoint, nil)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("删除用户失败: %s", resp.Status)
}
return nil
}
func main() {
// 创建 API 客户端
client := NewAPIClient("https://jsonplaceholder.typicode.com", "your-api-key")
// 获取用户列表
users, err := client.GetUsers()
if err != nil {
fmt.Printf("获取用户列表失败: %v\n", err)
} else {
fmt.Printf("获取到 %d 个用户\n", len(users))
if len(users) > 0 {
fmt.Printf("第一个用户: %+v\n", users[0])
}
}
// 获取单个用户
user, err := client.GetUser(1)
if err != nil {
fmt.Printf("获取用户失败: %v\n", err)
} else {
fmt.Printf("用户详情: %+v\n", user)
}
// 创建新用户
newUser := &User{
Name: "张三",
Email: "[email protected]",
Age: 25,
}
createdUser, err := client.CreateUser(newUser)
if err != nil {
fmt.Printf("创建用户失败: %v\n", err)
} else {
fmt.Printf("创建的用户: %+v\n", createdUser)
}
}
文件上传客户端 #
package main
import (
"bytes"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"path/filepath"
)
// 上传文件
func uploadFile(url, fieldName, filename string) error {
// 打开文件
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close()
// 创建 multipart writer
var requestBody bytes.Buffer
writer := multipart.NewWriter(&requestBody)
// 创建文件字段
part, err := writer.CreateFormFile(fieldName, filepath.Base(filename))
if err != nil {
return err
}
// 复制文件内容
if _, err := io.Copy(part, file); err != nil {
return err
}
// 添加其他字段
writer.WriteField("description", "上传的文件")
writer.WriteField("category", "document")
// 关闭 writer
if err := writer.Close(); err != nil {
return err
}
// 创建请求
req, err := http.NewRequest("POST", url, &requestBody)
if err != nil {
return err
}
// 设置 Content-Type
req.Header.Set("Content-Type", writer.FormDataContentType())
// 发送请求
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// 读取响应
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
fmt.Printf("上传响应状态: %s\n", resp.Status)
fmt.Printf("上传响应内容: %s\n", string(body))
return nil
}
// 下载文件
func downloadFile(url, filename string) error {
// 发送 GET 请求
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
// 检查状态码
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("下载失败: %s", resp.Status)
}
// 创建文件
file, err := os.Create(filename)
if err != nil {
return err
}
defer file.Close()
// 复制内容
_, err = io.Copy(file, resp.Body)
if err != nil {
return err
}
fmt.Printf("文件下载完成: %s\n", filename)
return nil
}
// 带进度的文件下载
type ProgressReader struct {
io.Reader
Total int64
Current int64
Callback func(current, total int64)
}
func (pr *ProgressReader) Read(p []byte) (int, error) {
n, err := pr.Reader.Read(p)
pr.Current += int64(n)
if pr.Callback != nil {
pr.Callback(pr.Current, pr.Total)
}
return n, err
}
func downloadFileWithProgress(url, filename string) error {
// 发送 HEAD 请求获取文件大小
headResp, err := http.Head(url)
if err != nil {
return err
}
headResp.Body.Close()
fileSize := headResp.ContentLength
// 发送 GET 请求
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
// 创建文件
file, err := os.Create(filename)
if err != nil {
return err
}
defer file.Close()
// 创建进度读取器
progressReader := &ProgressReader{
Reader: resp.Body,
Total: fileSize,
Callback: func(current, total int64) {
percent := float64(current) / float64(total) * 100
fmt.Printf("\r下载进度: %.2f%% (%d/%d bytes)", percent, current, total)
},
}
// 复制内容
_, err = io.Copy(file, progressReader)
if err != nil {
return err
}
fmt.Printf("\n文件下载完成: %s\n", filename)
return nil
}
func main() {
// 上传文件示例
if err := uploadFile("https://httpbin.org/post", "file", "test.txt"); err != nil {
fmt.Printf("上传文件失败: %v\n", err)
}
// 下载文件示例
if err := downloadFile("https://httpbin.org/json", "downloaded.json"); err != nil {
fmt.Printf("下载文件失败: %v\n", err)
}
// 带进度的下载示例
if err := downloadFileWithProgress("https://httpbin.org/bytes/1024", "large_file.bin"); err != nil {
fmt.Printf("下载文件失败: %v\n", err)
}
}
错误处理和重试 #
智能重试机制 #
package main
import (
"context"
"fmt"
"math"
"net/http"
"time"
)
// 重试配置
type RetryConfig struct {
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
RetryableErrors []int // 可重试的状态码
}
// 默认重试配置
func DefaultRetryConfig() *RetryConfig {
return &RetryConfig{
MaxRetries: 3,
InitialDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
BackoffFactor: 2.0,
RetryableErrors: []int{
http.StatusInternalServerError,
http.StatusBadGateway,
http.StatusServiceUnavailable,
http.StatusGatewayTimeout,
},
}
}
// 重试客户端
type RetryClient struct {
HTTPClient *http.Client
Config *RetryConfig
}
func NewRetryClient(config *RetryConfig) *RetryClient {
if config == nil {
config = DefaultRetryConfig()
}
return &RetryClient{
HTTPClient: &http.Client{
Timeout: 30 * time.Second,
},
Config: config,
}
}
// 检查是否可重试
func (rc *RetryClient) isRetryable(statusCode int, err error) bool {
// 网络错误通常可以重试
if err != nil {
return true
}
// 检查状态码是否可重试
for _, code := range rc.Config.RetryableErrors {
if statusCode == code {
return true
}
}
return false
}
// 计算退避延迟
func (rc *RetryClient) calculateDelay(attempt int) time.Duration {
delay := float64(rc.Config.InitialDelay) * math.Pow(rc.Config.BackoffFactor, float64(attempt))
maxDelay := float64(rc.Config.MaxDelay)
if delay > maxDelay {
delay = maxDelay
}
return time.Duration(delay)
}
// 带重试的请求
func (rc *RetryClient) Do(req *http.Request) (*http.Response, error) {
var lastErr error
var resp *http.Response
for attempt := 0; attempt <= rc.Config.MaxRetries; attempt++ {
// 克隆请求(因为请求体可能被消费)
reqClone := req.Clone(req.Context())
// 发送请求
resp, lastErr = rc.HTTPClient.Do(reqClone)
// 如果成功或不可重试,直接返回
if lastErr == nil && !rc.isRetryable(resp.StatusCode, nil) {
return resp, nil
}
// 如果是最后一次尝试,不再等待
if attempt == rc.Config.MaxRetries {
break
}
// 计算延迟时间
delay := rc.calculateDelay(attempt)
fmt.Printf("请求失败,%v 后重试 (尝试 %d/%d)\n", delay, attempt+1, rc.Config.MaxRetries)
// 等待重试
select {
case <-time.After(delay):
case <-req.Context().Done():
return nil, req.Context().Err()
}
// 关闭之前的响应体
if resp != nil {
resp.Body.Close()
}
}
return resp, lastErr
}
// 带超时和重试的 GET 请求
func (rc *RetryClient) GetWithTimeout(url string, timeout time.Duration) (*http.Response, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
return rc.Do(req)
}
func main() {
// 创建重试客户端
retryClient := NewRetryClient(nil)
// 测试正常请求
resp, err := retryClient.GetWithTimeout("https://httpbin.org/get", 10*time.Second)
if err != nil {
fmt.Printf("请求失败: %v\n", err)
} else {
fmt.Printf("请求成功: %s\n", resp.Status)
resp.Body.Close()
}
// 测试会失败的请求(触发重试)
resp, err = retryClient.GetWithTimeout("https://httpbin.org/status/500", 30*time.Second)
if err != nil {
fmt.Printf("重试后仍然失败: %v\n", err)
} else {
fmt.Printf("最终响应: %s\n", resp.Status)
resp.Body.Close()
}
}
熔断器模式 #
package main
import (
"errors"
"fmt"
"net/http"
"sync"
"time"
)
// 熔断器状态
type CircuitState int
const (
StateClosed CircuitState = iota
StateOpen
StateHalfOpen
)
// 熔断器配置
type CircuitBreakerConfig struct {
MaxRequests uint32 // 半开状态下的最大请求数
Interval time.Duration // 统计间隔
Timeout time.Duration // 开启状态的超时时间
ReadyToTrip func(counts Counts) bool // 判断是否应该开启熔断器
OnStateChange func(name string, from CircuitState, to CircuitState) // 状态变化回调
}
// 计数器
type Counts struct {
Requests uint32
TotalSuccesses uint32
TotalFailures uint32
ConsecutiveSuccesses uint32
ConsecutiveFailures uint32
}
// 熔断器
type CircuitBreaker struct {
name string
maxRequests uint32
interval time.Duration
timeout time.Duration
readyToTrip func(counts Counts) bool
onStateChange func(name string, from CircuitState, to CircuitState)
mutex sync.Mutex
state CircuitState
generation uint64
counts Counts
expiry time.Time
}
// 创建熔断器
func NewCircuitBreaker(name string, config CircuitBreakerConfig) *CircuitBreaker {
cb := &CircuitBreaker{
name: name,
maxRequests: config.MaxRequests,
interval: config.Interval,
timeout: config.Timeout,
readyToTrip: config.ReadyToTrip,
onStateChange: config.OnStateChange,
}
cb.toNewGeneration(time.Now())
return cb
}
// 执行请求
func (cb *CircuitBreaker) Execute(req func() (*http.Response, error)) (*http.Response, error) {
generation, err := cb.beforeRequest()
if err != nil {
return nil, err
}
defer func() {
if r := recover(); r != nil {
cb.afterRequest(generation, false)
panic(r)
}
}()
resp, err := req()
cb.afterRequest(generation, err == nil && resp.StatusCode < 500)
return resp, err
}
// 请求前检查
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if state == StateOpen {
return generation, errors.New("熔断器开启")
} else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
return generation, errors.New("熔断器半开状态请求数过多")
}
cb.counts.Requests++
return generation, nil
}
// 请求后处理
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if generation != before {
return
}
if success {
cb.onSuccess(state, now)
} else {
cb.onFailure(state, now)
}
}
// 成功处理
func (cb *CircuitBreaker) onSuccess(state CircuitState, now time.Time) {
cb.counts.TotalSuccesses++
cb.counts.ConsecutiveSuccesses++
cb.counts.ConsecutiveFailures = 0
if state == StateHalfOpen {
cb.setState(StateClosed, now)
}
}
// 失败处理
func (cb *CircuitBreaker) onFailure(state CircuitState, now time.Time) {
cb.counts.TotalFailures++
cb.counts.ConsecutiveFailures++
cb.counts.ConsecutiveSuccesses = 0
if cb.readyToTrip(cb.counts) {
cb.setState(StateOpen, now)
}
}
// 获取当前状态
func (cb *CircuitBreaker) currentState(now time.Time) (CircuitState, uint64) {
switch cb.state {
case StateClosed:
if !cb.expiry.IsZero() && cb.expiry.Before(now) {
cb.toNewGeneration(now)
}
case StateOpen:
if cb.expiry.Before(now) {
cb.setState(StateHalfOpen, now)
}
}
return cb.state, cb.generation
}
// 设置状态
func (cb *CircuitBreaker) setState(state CircuitState, now time.Time) {
if cb.state == state {
return
}
prev := cb.state
cb.state = state
cb.toNewGeneration(now)
if cb.onStateChange != nil {
cb.onStateChange(cb.name, prev, state)
}
}
// 新的统计周期
func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
cb.generation++
cb.counts = Counts{}
var zero time.Time
switch cb.state {
case StateClosed:
if cb.interval == 0 {
cb.expiry = zero
} else {
cb.expiry = now.Add(cb.interval)
}
case StateOpen:
cb.expiry = now.Add(cb.timeout)
default: // StateHalfOpen
cb.expiry = zero
}
}
// 带熔断器的 HTTP 客户端
type CircuitBreakerClient struct {
client *http.Client
breakers map[string]*CircuitBreaker
mutex sync.RWMutex
}
func NewCircuitBreakerClient() *CircuitBreakerClient {
return &CircuitBreakerClient{
client: &http.Client{Timeout: 10 * time.Second},
breakers: make(map[string]*CircuitBreaker),
}
}
// 获取或创建熔断器
func (cbc *CircuitBreakerClient) getBreaker(host string) *CircuitBreaker {
cbc.mutex.RLock()
breaker, exists := cbc.breakers[host]
cbc.mutex.RUnlock()
if exists {
return breaker
}
cbc.mutex.Lock()
defer cbc.mutex.Unlock()
if breaker, exists := cbc.breakers[host]; exists {
return breaker
}
config := CircuitBreakerConfig{
MaxRequests: 3,
Interval: 10 * time.Second,
Timeout: 30 * time.Second,
ReadyToTrip: func(counts Counts) bool {
return counts.Requests >= 5 && counts.ConsecutiveFailures >= 3
},
OnStateChange: func(name string, from CircuitState, to CircuitState) {
fmt.Printf("熔断器 %s 状态变化: %v -> %v\n", name, from, to)
},
}
breaker = NewCircuitBreaker(host, config)
cbc.breakers[host] = breaker
return breaker
}
// 发送请求
func (cbc *CircuitBreakerClient) Get(url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
host := req.URL.Host
breaker := cbc.getBreaker(host)
return breaker.Execute(func() (*http.Response, error) {
return cbc.client.Do(req)
})
}
func main() {
client := NewCircuitBreakerClient()
// 测试正常请求
for i := 0; i < 10; i++ {
resp, err := client.Get("https://httpbin.org/get")
if err != nil {
fmt.Printf("请求 %d 失败: %v\n", i+1, err)
} else {
fmt.Printf("请求 %d 成功: %s\n", i+1, resp.Status)
resp.Body.Close()
}
time.Sleep(1 * time.Second)
}
// 测试失败请求(触发熔断器)
fmt.Println("\n测试失败请求:")
for i := 0; i < 10; i++ {
resp, err := client.Get("https://httpbin.org/status/500")
if err != nil {
fmt.Printf("请求 %d 失败: %v\n", i+1, err)
} else {
fmt.Printf("请求 %d 响应: %s\n", i+1, resp.Status)
resp.Body.Close()
}
time.Sleep(1 * time.Second)
}
}
并发请求处理 #
并发请求和结果聚合 #
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
// 请求结果
type RequestResult struct {
URL string
Response *http.Response
Error error
Duration time.Duration
}
// 并发 HTTP 客户端
type ConcurrentClient struct {
client *http.Client
maxWorkers int
}
func NewConcurrentClient(maxWorkers int) *ConcurrentClient {
return &ConcurrentClient{
client: &http.Client{
Timeout: 10 * time.Second,
},
maxWorkers: maxWorkers,
}
}
// 并发发送多个请求
func (cc *ConcurrentClient) GetMultiple(urls []string) []RequestResult {
results := make([]RequestResult, len(urls))
jobs := make(chan int, len(urls))
// 启动工作协程
var wg sync.WaitGroup
for i := 0; i < cc.maxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for index := range jobs {
url := urls[index]
start := time.Now()
resp, err := cc.client.Get(url)
duration := time.Since(start)
results[index] = RequestResult{
URL: url,
Response: resp,
Error: err,
Duration: duration,
}
}
}()
}
// 发送任务
for i := range urls {
jobs <- i
}
close(jobs)
// 等待完成
wg.Wait()
return results
}
// 带超时的并发请求
func (cc *ConcurrentClient) GetMultipleWithTimeout(urls []string, timeout time.Duration) []RequestResult {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
results := make([]RequestResult, len(urls))
jobs := make(chan int, len(urls))
var wg sync.WaitGroup
for i := 0; i < cc.maxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case index, ok := <-jobs:
if !ok {
return
}
url := urls[index]
start := time.Now()
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
results[index] = RequestResult{
URL: url,
Error: err,
Duration: time.Since(start),
}
continue
}
resp, err := cc.client.Do(req)
duration := time.Since(start)
results[index] = RequestResult{
URL: url,
Response: resp,
Error: err,
Duration: duration,
}
case <-ctx.Done():
return
}
}
}()
}
// 发送任务
go func() {
for i := range urls {
select {
case jobs <- i:
case <-ctx.Done():
break
}
}
close(jobs)
}()
// 等待完成或超时
wg.Wait()
return results
}
// API 聚合示例
type UserInfo struct {
ID int `json:"id"`
Name string `json:"name"`
Username string `json:"username"`
Email string `json:"email"`
}
type PostInfo struct {
ID int `json:"id"`
UserID int `json:"userId"`
Title string `json:"title"`
Body string `json:"body"`
}
type UserProfile struct {
User *UserInfo `json:"user"`
Posts []PostInfo `json:"posts"`
Error string `json:"error,omitempty"`
}
// 聚合用户信息
func (cc *ConcurrentClient) GetUserProfile(userID int) (*UserProfile, error) {
urls := []string{
fmt.Sprintf("https://jsonplaceholder.typicode.com/users/%d", userID),
fmt.Sprintf("https://jsonplaceholder.typicode.com/users/%d/posts", userID),
}
results := cc.GetMultipleWithTimeout(urls, 15*time.Second)
profile := &UserProfile{}
// 处理用户信息
if results[0].Error != nil {
profile.Error = fmt.Sprintf("获取用户信息失败: %v", results[0].Error)
} else if results[0].Response.StatusCode != http.StatusOK {
profile.Error = fmt.Sprintf("用户信息请求失败: %s", results[0].Response.Status)
results[0].Response.Body.Close()
} else {
var user UserInfo
if err := json.NewDecoder(results[0].Response.Body).Decode(&user); err != nil {
profile.Error = fmt.Sprintf("解析用户信息失败: %v", err)
} else {
profile.User = &user
}
results[0].Response.Body.Close()
}
// 处理用户文章
if results[1].Error != nil {
if profile.Error == "" {
profile.Error = fmt.Sprintf("获取用户文章失败: %v", results[1].Error)
}
} else if results[1].Response.StatusCode != http.StatusOK {
if profile.Error == "" {
profile.Error = fmt.Sprintf("用户文章请求失败: %s", results[1].Response.Status)
}
results[1].Response.Body.Close()
} else {
var posts []PostInfo
if err := json.NewDecoder(results[1].Response.Body).Decode(&posts); err != nil {
if profile.Error == "" {
profile.Error = fmt.Sprintf("解析用户文章失败: %v", err)
}
} else {
profile.Posts = posts
}
results[1].Response.Body.Close()
}
return profile, nil
}
func main() {
client := NewConcurrentClient(5)
// 测试并发请求
urls := []string{
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
"https://httpbin.org/get",
"https://httpbin.org/json",
}
fmt.Println("发送并发请求...")
start := time.Now()
results := client.GetMultipleWithTimeout(urls, 10*time.Second)
totalDuration := time.Since(start)
fmt.Printf("总耗时: %v\n", totalDuration)
for i, result := range results {
if result.Error != nil {
fmt.Printf("请求 %d (%s) 失败: %v (耗时: %v)\n",
i+1, result.URL, result.Error, result.Duration)
} else {
fmt.Printf("请求 %d (%s) 成功: %s (耗时: %v)\n",
i+1, result.URL, result.Response.Status, result.Duration)
result.Response.Body.Close()
}
}
// 测试 API 聚合
fmt.Println("\n测试 API 聚合...")
profile, err := client.GetUserProfile(1)
if err != nil {
fmt.Printf("获取用户档案失败: %v\n", err)
} else {
if profile.Error != "" {
fmt.Printf("用户档案错误: %s\n", profile.Error)
}
if profile.User != nil {
fmt.Printf("用户信息: %s (%s)\n", profile.User.Name, profile.User.Email)
}
if len(profile.Posts) > 0 {
fmt.Printf("用户文章数量: %d\n", len(profile.Posts))
fmt.Printf("第一篇文章: %s\n", profile.Posts[0].Title)
}
}
}
小结 #
本节全面介绍了 Go HTTP 客户端开发的核心技术:
- 基础客户端使用:便捷函数和自定义请求的创建方法
- 客户端配置:Transport 配置、连接池管理和客户端定制
- RESTful API 客户端:完整的 API 客户端实现,包括 CRUD 操作
- 文件操作:文件上传、下载和带进度显示的实现
- 错误处理和重试:智能重试机制和熔断器模式的实现
- 并发请求:并发发送请求和结果聚合的技术
这些技术为构建健壮、高性能的 HTTP 客户端应用提供了完整的解决方案。掌握这些知识,你就能够开发出能够与各种 Web 服务和 API 高效交互的 Go 应用程序。
在下一节中,我们将学习 Web 框架的使用,了解如何使用流行的 Go Web 框架来简化 Web 应用开发。