2.8.1 并发爬虫系统

2.8.1 并发爬虫系统 #

网络爬虫是并发编程的经典应用场景之一。一个高效的爬虫系统需要同时处理大量的网络请求,合理管理并发数量,避免对目标服务器造成过大压力。本节将带你从零开始构建一个功能完整的并发爬虫系统。

系统架构设计 #

核心组件 #

一个完整的并发爬虫系统通常包含以下核心组件:

  1. URL 管理器:负责 URL 的存储、去重和调度
  2. 下载器:执行 HTTP 请求,获取网页内容
  3. 解析器:解析网页内容,提取数据和新的 URL
  4. 数据存储:保存爬取的数据
  5. 并发控制器:管理并发数量和速率限制

系统架构图 #

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  URL队列    │───▶│  下载器池   │───▶│  解析器池   │
└─────────────┘    └─────────────┘    └─────────────┘
       ▲                                      │
       │                                      ▼
┌─────────────┐                        ┌─────────────┐
│  URL管理器  │◀───────────────────────│  数据提取   │
└─────────────┘                        └─────────────┘
       │                                      │
       ▼                                      ▼
┌─────────────┐                        ┌─────────────┐
│  去重过滤   │                        │  数据存储   │
└─────────────┘                        └─────────────┘

基础数据结构 #

首先定义爬虫系统的基础数据结构:

package crawler

import (
    "context"
    "net/http"
    "sync"
    "time"
)

// Request 表示一个爬取请求
type Request struct {
    URL     string
    Method  string
    Headers map[string]string
    Depth   int
    Meta    map[string]interface{}
}

// Response 表示爬取响应
type Response struct {
    Request    *Request
    StatusCode int
    Headers    map[string][]string
    Body       []byte
    URL        string
}

// Item 表示提取的数据项
type Item struct {
    URL  string
    Data map[string]interface{}
}

// Spider 爬虫配置
type Spider struct {
    Name           string
    StartURLs      []string
    AllowedDomains []string
    MaxDepth       int
    Concurrent     int
    Delay          time.Duration
    UserAgent      string
}

URL 管理器实现 #

URL 管理器是爬虫系统的核心组件,负责 URL 的调度和去重:

// URLManager URL管理器
type URLManager struct {
    mu          sync.RWMutex
    pending     chan *Request
    visited     map[string]bool
    processing  map[string]bool
    maxDepth    int
    closed      bool
}

// NewURLManager 创建URL管理器
func NewURLManager(bufferSize, maxDepth int) *URLManager {
    return &URLManager{
        pending:    make(chan *Request, bufferSize),
        visited:    make(map[string]bool),
        processing: make(map[string]bool),
        maxDepth:   maxDepth,
    }
}

// AddRequest 添加请求到队列
func (um *URLManager) AddRequest(req *Request) bool {
    um.mu.Lock()
    defer um.mu.Unlock()

    // 检查是否已访问或正在处理
    if um.visited[req.URL] || um.processing[req.URL] {
        return false
    }

    // 检查深度限制
    if req.Depth > um.maxDepth {
        return false
    }

    // 标记为正在处理
    um.processing[req.URL] = true

    select {
    case um.pending <- req:
        return true
    default:
        // 队列已满,移除处理标记
        delete(um.processing, req.URL)
        return false
    }
}

// GetRequest 获取待处理请求
func (um *URLManager) GetRequest() (*Request, bool) {
    select {
    case req := <-um.pending:
        return req, true
    default:
        return nil, false
    }
}

// MarkVisited 标记URL为已访问
func (um *URLManager) MarkVisited(url string) {
    um.mu.Lock()
    defer um.mu.Unlock()

    um.visited[url] = true
    delete(um.processing, url)
}

// HasPending 检查是否有待处理请求
func (um *URLManager) HasPending() bool {
    um.mu.RLock()
    defer um.mu.RUnlock()

    return len(um.pending) > 0 || len(um.processing) > 0
}

// Close 关闭URL管理器
func (um *URLManager) Close() {
    um.mu.Lock()
    defer um.mu.Unlock()

    if !um.closed {
        close(um.pending)
        um.closed = true
    }
}

下载器实现 #

下载器负责执行 HTTP 请求,支持并发控制和重试机制:

// Downloader HTTP下载器
type Downloader struct {
    client      *http.Client
    userAgent   string
    maxRetries  int
    retryDelay  time.Duration
    rateLimiter chan struct{}
}

// NewDownloader 创建下载器
func NewDownloader(concurrent int, timeout time.Duration) *Downloader {
    return &Downloader{
        client: &http.Client{
            Timeout: timeout,
            Transport: &http.Transport{
                MaxIdleConns:        100,
                MaxIdleConnsPerHost: 10,
                IdleConnTimeout:     90 * time.Second,
            },
        },
        userAgent:   "Go-Crawler/1.0",
        maxRetries:  3,
        retryDelay:  time.Second,
        rateLimiter: make(chan struct{}, concurrent),
    }
}

// Download 下载网页
func (d *Downloader) Download(ctx context.Context, req *Request) (*Response, error) {
    // 获取速率限制令牌
    select {
    case d.rateLimiter <- struct{}{}:
        defer func() { <-d.rateLimiter }()
    case <-ctx.Done():
        return nil, ctx.Err()
    }

    var lastErr error
    for i := 0; i <= d.maxRetries; i++ {
        if i > 0 {
            // 重试延迟
            select {
            case <-time.After(d.retryDelay * time.Duration(i)):
            case <-ctx.Done():
                return nil, ctx.Err()
            }
        }

        resp, err := d.doRequest(ctx, req)
        if err == nil {
            return resp, nil
        }

        lastErr = err

        // 检查是否为可重试错误
        if !d.isRetryableError(err) {
            break
        }
    }

    return nil, lastErr
}

// doRequest 执行HTTP请求
func (d *Downloader) doRequest(ctx context.Context, req *Request) (*Response, error) {
    httpReq, err := http.NewRequestWithContext(ctx, req.Method, req.URL, nil)
    if err != nil {
        return nil, err
    }

    // 设置请求头
    httpReq.Header.Set("User-Agent", d.userAgent)
    for key, value := range req.Headers {
        httpReq.Header.Set(key, value)
    }

    httpResp, err := d.client.Do(httpReq)
    if err != nil {
        return nil, err
    }
    defer httpResp.Body.Close()

    body, err := io.ReadAll(httpResp.Body)
    if err != nil {
        return nil, err
    }

    return &Response{
        Request:    req,
        StatusCode: httpResp.StatusCode,
        Headers:    httpResp.Header,
        Body:       body,
        URL:        httpResp.Request.URL.String(),
    }, nil
}

// isRetryableError 判断是否为可重试错误
func (d *Downloader) isRetryableError(err error) bool {
    // 网络超时、连接错误等可以重试
    if netErr, ok := err.(net.Error); ok {
        return netErr.Timeout() || netErr.Temporary()
    }
    return false
}

解析器实现 #

解析器负责从响应中提取数据和新的 URL:

// Parser 解析器接口
type Parser interface {
    Parse(resp *Response) ([]Item, []*Request, error)
}

// HTMLParser HTML解析器
type HTMLParser struct {
    extractors []DataExtractor
    linkRules  []LinkRule
}

// DataExtractor 数据提取器
type DataExtractor struct {
    Name     string
    Selector string
    Attr     string
    Regex    *regexp.Regexp
}

// LinkRule 链接提取规则
type LinkRule struct {
    Selector    string
    Attr        string
    AllowRegex  *regexp.Regexp
    DenyRegex   *regexp.Regexp
    FollowDepth int
}

// NewHTMLParser 创建HTML解析器
func NewHTMLParser() *HTMLParser {
    return &HTMLParser{
        extractors: make([]DataExtractor, 0),
        linkRules:  make([]LinkRule, 0),
    }
}

// AddExtractor 添加数据提取器
func (p *HTMLParser) AddExtractor(extractor DataExtractor) {
    p.extractors = append(p.extractors, extractor)
}

// AddLinkRule 添加链接提取规则
func (p *HTMLParser) AddLinkRule(rule LinkRule) {
    p.linkRules = append(p.linkRules, rule)
}

// Parse 解析响应
func (p *HTMLParser) Parse(resp *Response) ([]Item, []*Request, error) {
    doc, err := goquery.NewDocumentFromReader(bytes.NewReader(resp.Body))
    if err != nil {
        return nil, nil, err
    }

    // 提取数据
    items := p.extractData(doc, resp.URL)

    // 提取链接
    requests := p.extractLinks(doc, resp.Request, resp.URL)

    return items, requests, nil
}

// extractData 提取数据
func (p *HTMLParser) extractData(doc *goquery.Document, url string) []Item {
    var items []Item

    if len(p.extractors) == 0 {
        return items
    }

    data := make(map[string]interface{})

    for _, extractor := range p.extractors {
        values := make([]string, 0)

        doc.Find(extractor.Selector).Each(func(i int, s *goquery.Selection) {
            var text string
            if extractor.Attr == "" {
                text = strings.TrimSpace(s.Text())
            } else {
                text, _ = s.Attr(extractor.Attr)
            }

            if extractor.Regex != nil {
                matches := extractor.Regex.FindStringSubmatch(text)
                if len(matches) > 1 {
                    text = matches[1]
                }
            }

            if text != "" {
                values = append(values, text)
            }
        })

        if len(values) > 0 {
            if len(values) == 1 {
                data[extractor.Name] = values[0]
            } else {
                data[extractor.Name] = values
            }
        }
    }

    if len(data) > 0 {
        items = append(items, Item{
            URL:  url,
            Data: data,
        })
    }

    return items
}

// extractLinks 提取链接
func (p *HTMLParser) extractLinks(doc *goquery.Document, parentReq *Request, baseURL string) []*Request {
    var requests []*Request

    base, err := url.Parse(baseURL)
    if err != nil {
        return requests
    }

    for _, rule := range p.linkRules {
        doc.Find(rule.Selector).Each(func(i int, s *goquery.Selection) {
            href, exists := s.Attr(rule.Attr)
            if !exists {
                return
            }

            // 解析URL
            linkURL, err := base.Parse(href)
            if err != nil {
                return
            }

            urlStr := linkURL.String()

            // 应用过滤规则
            if rule.AllowRegex != nil && !rule.AllowRegex.MatchString(urlStr) {
                return
            }

            if rule.DenyRegex != nil && rule.DenyRegex.MatchString(urlStr) {
                return
            }

            // 创建新请求
            newReq := &Request{
                URL:     urlStr,
                Method:  "GET",
                Headers: parentReq.Headers,
                Depth:   parentReq.Depth + 1,
                Meta:    make(map[string]interface{}),
            }

            // 复制父请求的元数据
            for k, v := range parentReq.Meta {
                newReq.Meta[k] = v
            }

            requests = append(requests, newReq)
        })
    }

    return requests
}

爬虫引擎实现 #

爬虫引擎是整个系统的核心,协调各个组件的工作:

// Engine 爬虫引擎
type Engine struct {
    spider      *Spider
    urlManager  *URLManager
    downloader  *Downloader
    parser      Parser
    itemChan    chan Item
    stats       *Stats
    ctx         context.Context
    cancel      context.CancelFunc
    wg          sync.WaitGroup
}

// Stats 统计信息
type Stats struct {
    mu              sync.RWMutex
    StartTime       time.Time
    RequestsTotal   int64
    RequestsSuccess int64
    RequestsFailed  int64
    ItemsScraped    int64
    BytesDownloaded int64
}

// NewEngine 创建爬虫引擎
func NewEngine(spider *Spider, parser Parser) *Engine {
    ctx, cancel := context.WithCancel(context.Background())

    return &Engine{
        spider:     spider,
        urlManager: NewURLManager(1000, spider.MaxDepth),
        downloader: NewDownloader(spider.Concurrent, 30*time.Second),
        parser:     parser,
        itemChan:   make(chan Item, 100),
        stats:      &Stats{StartTime: time.Now()},
        ctx:        ctx,
        cancel:     cancel,
    }
}

// Start 启动爬虫
func (e *Engine) Start() error {
    // 添加起始URL
    for _, startURL := range e.spider.StartURLs {
        req := &Request{
            URL:     startURL,
            Method:  "GET",
            Headers: make(map[string]string),
            Depth:   0,
            Meta:    make(map[string]interface{}),
        }
        e.urlManager.AddRequest(req)
    }

    // 启动工作协程
    for i := 0; i < e.spider.Concurrent; i++ {
        e.wg.Add(1)
        go e.worker(i)
    }

    // 启动监控协程
    e.wg.Add(1)
    go e.monitor()

    return nil
}

// worker 工作协程
func (e *Engine) worker(id int) {
    defer e.wg.Done()

    ticker := time.NewTicker(e.spider.Delay)
    defer ticker.Stop()

    for {
        select {
        case <-e.ctx.Done():
            return
        case <-ticker.C:
            // 获取请求
            req, ok := e.urlManager.GetRequest()
            if !ok {
                // 检查是否还有待处理请求
                if !e.urlManager.HasPending() {
                    return
                }
                continue
            }

            e.processRequest(req)
        }
    }
}

// processRequest 处理请求
func (e *Engine) processRequest(req *Request) {
    atomic.AddInt64(&e.stats.RequestsTotal, 1)

    // 下载
    resp, err := e.downloader.Download(e.ctx, req)
    if err != nil {
        atomic.AddInt64(&e.stats.RequestsFailed, 1)
        e.urlManager.MarkVisited(req.URL)
        return
    }

    atomic.AddInt64(&e.stats.RequestsSuccess, 1)
    atomic.AddInt64(&e.stats.BytesDownloaded, int64(len(resp.Body)))

    // 解析
    items, requests, err := e.parser.Parse(resp)
    if err != nil {
        e.urlManager.MarkVisited(req.URL)
        return
    }

    // 处理提取的数据
    for _, item := range items {
        select {
        case e.itemChan <- item:
            atomic.AddInt64(&e.stats.ItemsScraped, 1)
        case <-e.ctx.Done():
            return
        }
    }

    // 添加新请求
    for _, newReq := range requests {
        e.urlManager.AddRequest(newReq)
    }

    e.urlManager.MarkVisited(req.URL)
}

// monitor 监控协程
func (e *Engine) monitor() {
    defer e.wg.Done()

    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-e.ctx.Done():
            return
        case <-ticker.C:
            e.printStats()

            // 检查是否完成
            if !e.urlManager.HasPending() {
                e.cancel()
                return
            }
        }
    }
}

// printStats 打印统计信息
func (e *Engine) printStats() {
    e.stats.mu.RLock()
    defer e.stats.mu.RUnlock()

    duration := time.Since(e.stats.StartTime)

    fmt.Printf("[%s] Requests: %d (Success: %d, Failed: %d), Items: %d, Bytes: %s\n",
        duration.Truncate(time.Second),
        e.stats.RequestsTotal,
        e.stats.RequestsSuccess,
        e.stats.RequestsFailed,
        e.stats.ItemsScraped,
        humanizeBytes(e.stats.BytesDownloaded),
    )
}

// Wait 等待爬虫完成
func (e *Engine) Wait() {
    e.wg.Wait()
    close(e.itemChan)
    e.urlManager.Close()
}

// Items 获取数据通道
func (e *Engine) Items() <-chan Item {
    return e.itemChan
}

// Stop 停止爬虫
func (e *Engine) Stop() {
    e.cancel()
}

// humanizeBytes 格式化字节数
func humanizeBytes(bytes int64) string {
    const unit = 1024
    if bytes < unit {
        return fmt.Sprintf("%d B", bytes)
    }
    div, exp := int64(unit), 0
    for n := bytes / unit; n >= unit; n /= unit {
        div *= unit
        exp++
    }
    return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
}

使用示例 #

下面是一个完整的使用示例,演示如何使用我们构建的爬虫系统:

package main

import (
    "fmt"
    "log"
    "regexp"
    "time"
)

func main() {
    // 创建爬虫配置
    spider := &Spider{
        Name:           "example-spider",
        StartURLs:      []string{"https://example.com"},
        AllowedDomains: []string{"example.com"},
        MaxDepth:       3,
        Concurrent:     5,
        Delay:          time.Second,
        UserAgent:      "Example-Crawler/1.0",
    }

    // 创建HTML解析器
    parser := NewHTMLParser()

    // 添加数据提取规则
    parser.AddExtractor(DataExtractor{
        Name:     "title",
        Selector: "title",
    })

    parser.AddExtractor(DataExtractor{
        Name:     "description",
        Selector: "meta[name='description']",
        Attr:     "content",
    })

    // 添加链接提取规则
    parser.AddLinkRule(LinkRule{
        Selector:   "a",
        Attr:       "href",
        AllowRegex: regexp.MustCompile(`^https://example\.com/.*`),
    })

    // 创建爬虫引擎
    engine := NewEngine(spider, parser)

    // 启动爬虫
    if err := engine.Start(); err != nil {
        log.Fatal(err)
    }

    // 处理爬取的数据
    go func() {
        for item := range engine.Items() {
            fmt.Printf("URL: %s\n", item.URL)
            for key, value := range item.Data {
                fmt.Printf("  %s: %v\n", key, value)
            }
            fmt.Println()
        }
    }()

    // 等待完成
    engine.Wait()

    fmt.Println("爬虫完成")
}

性能优化技巧 #

1. 连接池优化 #

// 优化HTTP客户端配置
func NewOptimizedDownloader(concurrent int) *Downloader {
    transport := &http.Transport{
        MaxIdleConns:        100,
        MaxIdleConnsPerHost: concurrent,
        IdleConnTimeout:     90 * time.Second,
        TLSHandshakeTimeout: 10 * time.Second,
        DialTimeout:         10 * time.Second,
    }

    return &Downloader{
        client: &http.Client{
            Transport: transport,
            Timeout:   30 * time.Second,
        },
        rateLimiter: make(chan struct{}, concurrent),
    }
}

2. 内存优化 #

// 使用对象池减少内存分配
var requestPool = sync.Pool{
    New: func() interface{} {
        return &Request{
            Headers: make(map[string]string),
            Meta:    make(map[string]interface{}),
        }
    },
}

func GetRequest() *Request {
    return requestPool.Get().(*Request)
}

func PutRequest(req *Request) {
    // 清理数据
    req.URL = ""
    req.Method = ""
    for k := range req.Headers {
        delete(req.Headers, k)
    }
    for k := range req.Meta {
        delete(req.Meta, k)
    }
    req.Depth = 0

    requestPool.Put(req)
}

3. 去重优化 #

// 使用布隆过滤器进行快速去重
type BloomFilter struct {
    bitSet []uint64
    size   uint64
    hash1  hash.Hash64
    hash2  hash.Hash64
}

func (bf *BloomFilter) Add(data []byte) {
    h1 := bf.hash1.Sum64()
    h2 := bf.hash2.Sum64()

    for i := uint64(0); i < 3; i++ {
        pos := (h1 + i*h2) % bf.size
        bf.bitSet[pos/64] |= 1 << (pos % 64)
    }
}

func (bf *BloomFilter) Contains(data []byte) bool {
    h1 := bf.hash1.Sum64()
    h2 := bf.hash2.Sum64()

    for i := uint64(0); i < 3; i++ {
        pos := (h1 + i*h2) % bf.size
        if bf.bitSet[pos/64]&(1<<(pos%64)) == 0 {
            return false
        }
    }
    return true
}

监控和调试 #

1. 添加详细日志 #

// Logger 日志接口
type Logger interface {
    Debug(msg string, fields ...interface{})
    Info(msg string, fields ...interface{})
    Warn(msg string, fields ...interface{})
    Error(msg string, fields ...interface{})
}

// 在引擎中添加日志
func (e *Engine) processRequest(req *Request) {
    e.logger.Debug("Processing request", "url", req.URL, "depth", req.Depth)

    start := time.Now()
    resp, err := e.downloader.Download(e.ctx, req)
    duration := time.Since(start)

    if err != nil {
        e.logger.Error("Download failed", "url", req.URL, "error", err, "duration", duration)
        return
    }

    e.logger.Info("Download success", "url", req.URL, "status", resp.StatusCode, "size", len(resp.Body), "duration", duration)
}

2. 性能指标收集 #

// Metrics 性能指标
type Metrics struct {
    RequestDuration   *prometheus.HistogramVec
    RequestsTotal     *prometheus.CounterVec
    ItemsScraped      prometheus.Counter
    ActiveWorkers     prometheus.Gauge
    QueueSize         prometheus.Gauge
}

func NewMetrics() *Metrics {
    return &Metrics{
        RequestDuration: prometheus.NewHistogramVec(
            prometheus.HistogramOpts{
                Name: "crawler_request_duration_seconds",
                Help: "Request duration in seconds",
            },
            []string{"status"},
        ),
        RequestsTotal: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "crawler_requests_total",
                Help: "Total number of requests",
            },
            []string{"status"},
        ),
        ItemsScraped: prometheus.NewCounter(
            prometheus.CounterOpts{
                Name: "crawler_items_scraped_total",
                Help: "Total number of items scraped",
            },
        ),
    }
}

小结 #

本节我们构建了一个功能完整的并发爬虫系统,涵盖了以下关键技术:

  1. 模块化设计:将系统分解为独立的组件,便于维护和扩展
  2. 并发控制:使用 Channel 和 Goroutine 实现高效的并发处理
  3. 资源管理:合理管理内存和网络连接,避免资源泄漏
  4. 错误处理:实现重试机制和优雅的错误处理
  5. 性能优化:使用对象池、布隆过滤器等技术提升性能

这个爬虫系统展示了 Go 语言在并发编程方面的强大能力,为后续的高并发系统开发奠定了基础。