2.8.1 并发爬虫系统 #
网络爬虫是并发编程的经典应用场景之一。一个高效的爬虫系统需要同时处理大量的网络请求,合理管理并发数量,避免对目标服务器造成过大压力。本节将带你从零开始构建一个功能完整的并发爬虫系统。
系统架构设计 #
核心组件 #
一个完整的并发爬虫系统通常包含以下核心组件:
- URL 管理器:负责 URL 的存储、去重和调度
- 下载器:执行 HTTP 请求,获取网页内容
- 解析器:解析网页内容,提取数据和新的 URL
- 数据存储:保存爬取的数据
- 并发控制器:管理并发数量和速率限制
系统架构图 #
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ URL队列 │───▶│ 下载器池 │───▶│ 解析器池 │
└─────────────┘ └─────────────┘ └─────────────┘
▲ │
│ ▼
┌─────────────┐ ┌─────────────┐
│ URL管理器 │◀───────────────────────│ 数据提取 │
└─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ 去重过滤 │ │ 数据存储 │
└─────────────┘ └─────────────┘
基础数据结构 #
首先定义爬虫系统的基础数据结构:
package crawler
import (
"context"
"net/http"
"sync"
"time"
)
// Request 表示一个爬取请求
type Request struct {
URL string
Method string
Headers map[string]string
Depth int
Meta map[string]interface{}
}
// Response 表示爬取响应
type Response struct {
Request *Request
StatusCode int
Headers map[string][]string
Body []byte
URL string
}
// Item 表示提取的数据项
type Item struct {
URL string
Data map[string]interface{}
}
// Spider 爬虫配置
type Spider struct {
Name string
StartURLs []string
AllowedDomains []string
MaxDepth int
Concurrent int
Delay time.Duration
UserAgent string
}
URL 管理器实现 #
URL 管理器是爬虫系统的核心组件,负责 URL 的调度和去重:
// URLManager URL管理器
type URLManager struct {
mu sync.RWMutex
pending chan *Request
visited map[string]bool
processing map[string]bool
maxDepth int
closed bool
}
// NewURLManager 创建URL管理器
func NewURLManager(bufferSize, maxDepth int) *URLManager {
return &URLManager{
pending: make(chan *Request, bufferSize),
visited: make(map[string]bool),
processing: make(map[string]bool),
maxDepth: maxDepth,
}
}
// AddRequest 添加请求到队列
func (um *URLManager) AddRequest(req *Request) bool {
um.mu.Lock()
defer um.mu.Unlock()
// 检查是否已访问或正在处理
if um.visited[req.URL] || um.processing[req.URL] {
return false
}
// 检查深度限制
if req.Depth > um.maxDepth {
return false
}
// 标记为正在处理
um.processing[req.URL] = true
select {
case um.pending <- req:
return true
default:
// 队列已满,移除处理标记
delete(um.processing, req.URL)
return false
}
}
// GetRequest 获取待处理请求
func (um *URLManager) GetRequest() (*Request, bool) {
select {
case req := <-um.pending:
return req, true
default:
return nil, false
}
}
// MarkVisited 标记URL为已访问
func (um *URLManager) MarkVisited(url string) {
um.mu.Lock()
defer um.mu.Unlock()
um.visited[url] = true
delete(um.processing, url)
}
// HasPending 检查是否有待处理请求
func (um *URLManager) HasPending() bool {
um.mu.RLock()
defer um.mu.RUnlock()
return len(um.pending) > 0 || len(um.processing) > 0
}
// Close 关闭URL管理器
func (um *URLManager) Close() {
um.mu.Lock()
defer um.mu.Unlock()
if !um.closed {
close(um.pending)
um.closed = true
}
}
下载器实现 #
下载器负责执行 HTTP 请求,支持并发控制和重试机制:
// Downloader HTTP下载器
type Downloader struct {
client *http.Client
userAgent string
maxRetries int
retryDelay time.Duration
rateLimiter chan struct{}
}
// NewDownloader 创建下载器
func NewDownloader(concurrent int, timeout time.Duration) *Downloader {
return &Downloader{
client: &http.Client{
Timeout: timeout,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
},
userAgent: "Go-Crawler/1.0",
maxRetries: 3,
retryDelay: time.Second,
rateLimiter: make(chan struct{}, concurrent),
}
}
// Download 下载网页
func (d *Downloader) Download(ctx context.Context, req *Request) (*Response, error) {
// 获取速率限制令牌
select {
case d.rateLimiter <- struct{}{}:
defer func() { <-d.rateLimiter }()
case <-ctx.Done():
return nil, ctx.Err()
}
var lastErr error
for i := 0; i <= d.maxRetries; i++ {
if i > 0 {
// 重试延迟
select {
case <-time.After(d.retryDelay * time.Duration(i)):
case <-ctx.Done():
return nil, ctx.Err()
}
}
resp, err := d.doRequest(ctx, req)
if err == nil {
return resp, nil
}
lastErr = err
// 检查是否为可重试错误
if !d.isRetryableError(err) {
break
}
}
return nil, lastErr
}
// doRequest 执行HTTP请求
func (d *Downloader) doRequest(ctx context.Context, req *Request) (*Response, error) {
httpReq, err := http.NewRequestWithContext(ctx, req.Method, req.URL, nil)
if err != nil {
return nil, err
}
// 设置请求头
httpReq.Header.Set("User-Agent", d.userAgent)
for key, value := range req.Headers {
httpReq.Header.Set(key, value)
}
httpResp, err := d.client.Do(httpReq)
if err != nil {
return nil, err
}
defer httpResp.Body.Close()
body, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, err
}
return &Response{
Request: req,
StatusCode: httpResp.StatusCode,
Headers: httpResp.Header,
Body: body,
URL: httpResp.Request.URL.String(),
}, nil
}
// isRetryableError 判断是否为可重试错误
func (d *Downloader) isRetryableError(err error) bool {
// 网络超时、连接错误等可以重试
if netErr, ok := err.(net.Error); ok {
return netErr.Timeout() || netErr.Temporary()
}
return false
}
解析器实现 #
解析器负责从响应中提取数据和新的 URL:
// Parser 解析器接口
type Parser interface {
Parse(resp *Response) ([]Item, []*Request, error)
}
// HTMLParser HTML解析器
type HTMLParser struct {
extractors []DataExtractor
linkRules []LinkRule
}
// DataExtractor 数据提取器
type DataExtractor struct {
Name string
Selector string
Attr string
Regex *regexp.Regexp
}
// LinkRule 链接提取规则
type LinkRule struct {
Selector string
Attr string
AllowRegex *regexp.Regexp
DenyRegex *regexp.Regexp
FollowDepth int
}
// NewHTMLParser 创建HTML解析器
func NewHTMLParser() *HTMLParser {
return &HTMLParser{
extractors: make([]DataExtractor, 0),
linkRules: make([]LinkRule, 0),
}
}
// AddExtractor 添加数据提取器
func (p *HTMLParser) AddExtractor(extractor DataExtractor) {
p.extractors = append(p.extractors, extractor)
}
// AddLinkRule 添加链接提取规则
func (p *HTMLParser) AddLinkRule(rule LinkRule) {
p.linkRules = append(p.linkRules, rule)
}
// Parse 解析响应
func (p *HTMLParser) Parse(resp *Response) ([]Item, []*Request, error) {
doc, err := goquery.NewDocumentFromReader(bytes.NewReader(resp.Body))
if err != nil {
return nil, nil, err
}
// 提取数据
items := p.extractData(doc, resp.URL)
// 提取链接
requests := p.extractLinks(doc, resp.Request, resp.URL)
return items, requests, nil
}
// extractData 提取数据
func (p *HTMLParser) extractData(doc *goquery.Document, url string) []Item {
var items []Item
if len(p.extractors) == 0 {
return items
}
data := make(map[string]interface{})
for _, extractor := range p.extractors {
values := make([]string, 0)
doc.Find(extractor.Selector).Each(func(i int, s *goquery.Selection) {
var text string
if extractor.Attr == "" {
text = strings.TrimSpace(s.Text())
} else {
text, _ = s.Attr(extractor.Attr)
}
if extractor.Regex != nil {
matches := extractor.Regex.FindStringSubmatch(text)
if len(matches) > 1 {
text = matches[1]
}
}
if text != "" {
values = append(values, text)
}
})
if len(values) > 0 {
if len(values) == 1 {
data[extractor.Name] = values[0]
} else {
data[extractor.Name] = values
}
}
}
if len(data) > 0 {
items = append(items, Item{
URL: url,
Data: data,
})
}
return items
}
// extractLinks 提取链接
func (p *HTMLParser) extractLinks(doc *goquery.Document, parentReq *Request, baseURL string) []*Request {
var requests []*Request
base, err := url.Parse(baseURL)
if err != nil {
return requests
}
for _, rule := range p.linkRules {
doc.Find(rule.Selector).Each(func(i int, s *goquery.Selection) {
href, exists := s.Attr(rule.Attr)
if !exists {
return
}
// 解析URL
linkURL, err := base.Parse(href)
if err != nil {
return
}
urlStr := linkURL.String()
// 应用过滤规则
if rule.AllowRegex != nil && !rule.AllowRegex.MatchString(urlStr) {
return
}
if rule.DenyRegex != nil && rule.DenyRegex.MatchString(urlStr) {
return
}
// 创建新请求
newReq := &Request{
URL: urlStr,
Method: "GET",
Headers: parentReq.Headers,
Depth: parentReq.Depth + 1,
Meta: make(map[string]interface{}),
}
// 复制父请求的元数据
for k, v := range parentReq.Meta {
newReq.Meta[k] = v
}
requests = append(requests, newReq)
})
}
return requests
}
爬虫引擎实现 #
爬虫引擎是整个系统的核心,协调各个组件的工作:
// Engine 爬虫引擎
type Engine struct {
spider *Spider
urlManager *URLManager
downloader *Downloader
parser Parser
itemChan chan Item
stats *Stats
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// Stats 统计信息
type Stats struct {
mu sync.RWMutex
StartTime time.Time
RequestsTotal int64
RequestsSuccess int64
RequestsFailed int64
ItemsScraped int64
BytesDownloaded int64
}
// NewEngine 创建爬虫引擎
func NewEngine(spider *Spider, parser Parser) *Engine {
ctx, cancel := context.WithCancel(context.Background())
return &Engine{
spider: spider,
urlManager: NewURLManager(1000, spider.MaxDepth),
downloader: NewDownloader(spider.Concurrent, 30*time.Second),
parser: parser,
itemChan: make(chan Item, 100),
stats: &Stats{StartTime: time.Now()},
ctx: ctx,
cancel: cancel,
}
}
// Start 启动爬虫
func (e *Engine) Start() error {
// 添加起始URL
for _, startURL := range e.spider.StartURLs {
req := &Request{
URL: startURL,
Method: "GET",
Headers: make(map[string]string),
Depth: 0,
Meta: make(map[string]interface{}),
}
e.urlManager.AddRequest(req)
}
// 启动工作协程
for i := 0; i < e.spider.Concurrent; i++ {
e.wg.Add(1)
go e.worker(i)
}
// 启动监控协程
e.wg.Add(1)
go e.monitor()
return nil
}
// worker 工作协程
func (e *Engine) worker(id int) {
defer e.wg.Done()
ticker := time.NewTicker(e.spider.Delay)
defer ticker.Stop()
for {
select {
case <-e.ctx.Done():
return
case <-ticker.C:
// 获取请求
req, ok := e.urlManager.GetRequest()
if !ok {
// 检查是否还有待处理请求
if !e.urlManager.HasPending() {
return
}
continue
}
e.processRequest(req)
}
}
}
// processRequest 处理请求
func (e *Engine) processRequest(req *Request) {
atomic.AddInt64(&e.stats.RequestsTotal, 1)
// 下载
resp, err := e.downloader.Download(e.ctx, req)
if err != nil {
atomic.AddInt64(&e.stats.RequestsFailed, 1)
e.urlManager.MarkVisited(req.URL)
return
}
atomic.AddInt64(&e.stats.RequestsSuccess, 1)
atomic.AddInt64(&e.stats.BytesDownloaded, int64(len(resp.Body)))
// 解析
items, requests, err := e.parser.Parse(resp)
if err != nil {
e.urlManager.MarkVisited(req.URL)
return
}
// 处理提取的数据
for _, item := range items {
select {
case e.itemChan <- item:
atomic.AddInt64(&e.stats.ItemsScraped, 1)
case <-e.ctx.Done():
return
}
}
// 添加新请求
for _, newReq := range requests {
e.urlManager.AddRequest(newReq)
}
e.urlManager.MarkVisited(req.URL)
}
// monitor 监控协程
func (e *Engine) monitor() {
defer e.wg.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-e.ctx.Done():
return
case <-ticker.C:
e.printStats()
// 检查是否完成
if !e.urlManager.HasPending() {
e.cancel()
return
}
}
}
}
// printStats 打印统计信息
func (e *Engine) printStats() {
e.stats.mu.RLock()
defer e.stats.mu.RUnlock()
duration := time.Since(e.stats.StartTime)
fmt.Printf("[%s] Requests: %d (Success: %d, Failed: %d), Items: %d, Bytes: %s\n",
duration.Truncate(time.Second),
e.stats.RequestsTotal,
e.stats.RequestsSuccess,
e.stats.RequestsFailed,
e.stats.ItemsScraped,
humanizeBytes(e.stats.BytesDownloaded),
)
}
// Wait 等待爬虫完成
func (e *Engine) Wait() {
e.wg.Wait()
close(e.itemChan)
e.urlManager.Close()
}
// Items 获取数据通道
func (e *Engine) Items() <-chan Item {
return e.itemChan
}
// Stop 停止爬虫
func (e *Engine) Stop() {
e.cancel()
}
// humanizeBytes 格式化字节数
func humanizeBytes(bytes int64) string {
const unit = 1024
if bytes < unit {
return fmt.Sprintf("%d B", bytes)
}
div, exp := int64(unit), 0
for n := bytes / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
}
使用示例 #
下面是一个完整的使用示例,演示如何使用我们构建的爬虫系统:
package main
import (
"fmt"
"log"
"regexp"
"time"
)
func main() {
// 创建爬虫配置
spider := &Spider{
Name: "example-spider",
StartURLs: []string{"https://example.com"},
AllowedDomains: []string{"example.com"},
MaxDepth: 3,
Concurrent: 5,
Delay: time.Second,
UserAgent: "Example-Crawler/1.0",
}
// 创建HTML解析器
parser := NewHTMLParser()
// 添加数据提取规则
parser.AddExtractor(DataExtractor{
Name: "title",
Selector: "title",
})
parser.AddExtractor(DataExtractor{
Name: "description",
Selector: "meta[name='description']",
Attr: "content",
})
// 添加链接提取规则
parser.AddLinkRule(LinkRule{
Selector: "a",
Attr: "href",
AllowRegex: regexp.MustCompile(`^https://example\.com/.*`),
})
// 创建爬虫引擎
engine := NewEngine(spider, parser)
// 启动爬虫
if err := engine.Start(); err != nil {
log.Fatal(err)
}
// 处理爬取的数据
go func() {
for item := range engine.Items() {
fmt.Printf("URL: %s\n", item.URL)
for key, value := range item.Data {
fmt.Printf(" %s: %v\n", key, value)
}
fmt.Println()
}
}()
// 等待完成
engine.Wait()
fmt.Println("爬虫完成")
}
性能优化技巧 #
1. 连接池优化 #
// 优化HTTP客户端配置
func NewOptimizedDownloader(concurrent int) *Downloader {
transport := &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: concurrent,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
DialTimeout: 10 * time.Second,
}
return &Downloader{
client: &http.Client{
Transport: transport,
Timeout: 30 * time.Second,
},
rateLimiter: make(chan struct{}, concurrent),
}
}
2. 内存优化 #
// 使用对象池减少内存分配
var requestPool = sync.Pool{
New: func() interface{} {
return &Request{
Headers: make(map[string]string),
Meta: make(map[string]interface{}),
}
},
}
func GetRequest() *Request {
return requestPool.Get().(*Request)
}
func PutRequest(req *Request) {
// 清理数据
req.URL = ""
req.Method = ""
for k := range req.Headers {
delete(req.Headers, k)
}
for k := range req.Meta {
delete(req.Meta, k)
}
req.Depth = 0
requestPool.Put(req)
}
3. 去重优化 #
// 使用布隆过滤器进行快速去重
type BloomFilter struct {
bitSet []uint64
size uint64
hash1 hash.Hash64
hash2 hash.Hash64
}
func (bf *BloomFilter) Add(data []byte) {
h1 := bf.hash1.Sum64()
h2 := bf.hash2.Sum64()
for i := uint64(0); i < 3; i++ {
pos := (h1 + i*h2) % bf.size
bf.bitSet[pos/64] |= 1 << (pos % 64)
}
}
func (bf *BloomFilter) Contains(data []byte) bool {
h1 := bf.hash1.Sum64()
h2 := bf.hash2.Sum64()
for i := uint64(0); i < 3; i++ {
pos := (h1 + i*h2) % bf.size
if bf.bitSet[pos/64]&(1<<(pos%64)) == 0 {
return false
}
}
return true
}
监控和调试 #
1. 添加详细日志 #
// Logger 日志接口
type Logger interface {
Debug(msg string, fields ...interface{})
Info(msg string, fields ...interface{})
Warn(msg string, fields ...interface{})
Error(msg string, fields ...interface{})
}
// 在引擎中添加日志
func (e *Engine) processRequest(req *Request) {
e.logger.Debug("Processing request", "url", req.URL, "depth", req.Depth)
start := time.Now()
resp, err := e.downloader.Download(e.ctx, req)
duration := time.Since(start)
if err != nil {
e.logger.Error("Download failed", "url", req.URL, "error", err, "duration", duration)
return
}
e.logger.Info("Download success", "url", req.URL, "status", resp.StatusCode, "size", len(resp.Body), "duration", duration)
}
2. 性能指标收集 #
// Metrics 性能指标
type Metrics struct {
RequestDuration *prometheus.HistogramVec
RequestsTotal *prometheus.CounterVec
ItemsScraped prometheus.Counter
ActiveWorkers prometheus.Gauge
QueueSize prometheus.Gauge
}
func NewMetrics() *Metrics {
return &Metrics{
RequestDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "crawler_request_duration_seconds",
Help: "Request duration in seconds",
},
[]string{"status"},
),
RequestsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "crawler_requests_total",
Help: "Total number of requests",
},
[]string{"status"},
),
ItemsScraped: prometheus.NewCounter(
prometheus.CounterOpts{
Name: "crawler_items_scraped_total",
Help: "Total number of items scraped",
},
),
}
}
小结 #
本节我们构建了一个功能完整的并发爬虫系统,涵盖了以下关键技术:
- 模块化设计:将系统分解为独立的组件,便于维护和扩展
- 并发控制:使用 Channel 和 Goroutine 实现高效的并发处理
- 资源管理:合理管理内存和网络连接,避免资源泄漏
- 错误处理:实现重试机制和优雅的错误处理
- 性能优化:使用对象池、布隆过滤器等技术提升性能
这个爬虫系统展示了 Go 语言在并发编程方面的强大能力,为后续的高并发系统开发奠定了基础。