强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Chromium / ChromeDriver 完全指南 / 08 - 网页爬虫实战

08 - 网页爬虫实战

使用浏览器自动化技术抓取动态渲染页面,应对反爬机制,实现代理轮换与数据提取。


8.1 为什么需要浏览器爬虫

传统 HTTP 爬虫(如 requests + BeautifulSoup)只能获取服务器返回的原始 HTML。现代网站大量使用 JavaScript 动态渲染内容,传统爬虫无法获取渲染后的内容。

传统 HTTP 爬虫:
  HTTP 请求 → 原始 HTML → 解析
  ✅ 速度快,资源低
  ❌ 无法获取 JS 渲染内容

浏览器自动化爬虫:
  启动浏览器 → 加载页面 → 执行 JS → 获取渲染后 DOM → 解析
  ✅ 能获取动态内容
  ✅ 能模拟用户交互
  ❌ 资源消耗大,速度较慢

何时使用浏览器爬虫

场景建议
静态 HTML 页面❌ 使用 requests/httpx
REST API 可用❌ 直接调用 API
SPA (React/Vue/Angular)✅ 浏览器爬虫
需要登录/交互✅ 浏览器爬虫
无限滚动加载✅ 浏览器爬虫
Canvas/WebGL 渲染✅ 浏览器爬虫
需要截图证据✅ 浏览器爬虫

8.2 基础爬虫架构

┌─────────────────────────────────────────────────┐
│                 爬虫调度器                        │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐      │
│  │ URL 队列  │→│ 浏览器池  │→│ 数据管道  │      │
│  └──────────┘  └──────────┘  └──────────┘      │
│       ↑              │              │           │
│  ┌──────────┐  ┌─────────┐  ┌──────────┐       │
│  │ 链接提取  │  │ 反检测   │  │ 数据存储  │      │
│  └──────────┘  └─────────┘  └──────────┘       │
└─────────────────────────────────────────────────┘

8.3 Selenium 爬虫

基础示例 — 抓取新闻列表

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import json
import time

def create_driver():
    options = Options()
    options.add_argument("--headless=new")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument("--disable-blink-features=AutomationControlled")
    options.add_argument("--window-size=1920,1080")
    options.add_experimental_option("excludeSwitches", ["enable-automation"])
    options.add_experimental_option("useAutomationExtension", False)
    return webdriver.Chrome(options=options)

def scrape_news(url, max_pages=5):
    driver = create_driver()
    all_articles = []

    try:
        for page_num in range(1, max_pages + 1):
            driver.get(f"{url}?page={page_num}")
            wait = WebDriverWait(driver, 15)

            # 等待文章列表加载
            articles = wait.until(
                EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".article-item"))
            )

            for article in articles:
                try:
                    title = article.find_element(By.CSS_SELECTOR, ".title").text
                    link = article.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
                    summary = article.find_element(By.CSS_SELECTOR, ".summary").text
                    date = article.find_element(By.CSS_SELECTOR, ".date").text

                    all_articles.append({
                        "title": title,
                        "link": link,
                        "summary": summary,
                        "date": date,
                    })
                except Exception as e:
                    print(f"解析文章失败: {e}")
                    continue

            print(f"第 {page_num} 页: 获取 {len(articles)} 篇文章")
            time.sleep(2)  # 礼貌延迟

    finally:
        driver.quit()

    return all_articles

if __name__ == "__main__":
    articles = scrape_news("https://example.com/news", max_pages=3)
    print(f"\n共获取 {len(articles)} 篇文章")
    with open("/tmp/articles.json", "w", encoding="utf-8") as f:
        json.dump(articles, f, ensure_ascii=False, indent=2)

无限滚动加载

def scrape_infinite_scroll(url, max_scrolls=20):
    """抓取无限滚动页面"""
    driver = create_driver()
    items = set()

    try:
        driver.get(url)
        wait = WebDriverWait(driver, 10)
        wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".item")))

        for i in range(max_scrolls):
            # 记录当前元素数量
            current_count = len(driver.find_elements(By.CSS_SELECTOR, ".item"))

            # 滚动到底部
            driver.execute_script(
                "window.scrollTo(0, document.body.scrollHeight);"
            )

            # 等待新内容加载
            try:
                WebDriverWait(driver, 5).until(
                    lambda d: len(d.find_elements(By.CSS_SELECTOR, ".item")) > current_count
                )
            except:
                print(f"滚动 {i+1} 次后无新内容,停止")
                break

            print(f"滚动 {i+1}: 当前 {len(driver.find_elements(By.CSS_SELECTOR, '.item'))} 条")

        # 提取所有数据
        elements = driver.find_elements(By.CSS_SELECTOR, ".item")
        for el in elements:
            items.add(el.text)

    finally:
        driver.quit()

    return list(items)

点击翻页

def scrape_with_clicks(url):
    """通过点击按钮翻页"""
    driver = create_driver()
    results = []

    try:
        driver.get(url)

        while True:
            # 提取当前页数据
            items = driver.find_elements(By.CSS_SELECTOR, ".product")
            for item in items:
                results.append({
                    "name": item.find_element(By.CSS_SELECTOR, ".name").text,
                    "price": item.find_element(By.CSS_SELECTOR, ".price").text,
                })

            # 查找下一页按钮
            try:
                next_btn = driver.find_element(By.CSS_SELECTOR, ".pagination .next:not(.disabled)")
                next_btn.click()

                # 等待页面更新
                WebDriverWait(driver, 10).until(
                    EC.staleness_of(items[0])
                )
                time.sleep(1)
            except:
                print("没有下一页了")
                break

    finally:
        driver.quit()

    return results

8.4 Playwright 爬虫

from playwright.sync_api import sync_playwright
import json

def scrape_with_playwright(url):
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        context = browser.new_context(
            viewport={"width": 1920, "height": 1080},
            locale="zh-CN",
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
        )
        page = context.new_page()

        # 路由拦截 (加速)
        page.route("**/*.{png,jpg,jpeg,gif,webp,svg,woff,woff2}", lambda route: route.abort())

        page.goto(url, wait_until="networkidle")

        # 自动等待 + 提取数据
        page.wait_for_selector(".article-list")
        articles = page.locator(".article-item").all()

        results = []
        for article in articles:
            results.append({
                "title": article.locator(".title").text_content(),
                "link": article.locator("a").get_attribute("href"),
                "summary": article.locator(".summary").text_content(),
            })

        browser.close()
        return results

# 使用异步版本
async def scrape_async(url):
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        page = await browser.new_page()
        await page.goto(url)
        # ... 异步操作
        await browser.close()

无限滚动 (Playwright)

def scrape_infinite_scroll_pw(url):
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()
        page.goto(url)

        while True:
            # 记录当前数量
            count = page.locator(".item").count()

            # 滚动到底部
            page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
            page.wait_for_timeout(2000)

            # 检查是否有新内容
            new_count = page.locator(".item").count()
            if new_count == count:
                break

        # 提取所有数据
        items = page.locator(".item").all()
        results = [item.text_content() for item in items]

        browser.close()
        return results

8.5 反检测策略

8.5.1 基础反检测

def create_stealth_driver():
    """创建不易被检测的浏览器"""
    options = Options()
    options.add_argument("--headless=new")  # 新版无头,指纹与有头一致
    options.add_argument("--disable-blink-features=AutomationControlled")
    options.add_experimental_option("excludeSwitches", ["enable-automation"])
    options.add_experimental_option("useAutomationExtension", False)

    # 自定义 UserAgent
    options.add_argument(
        "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    )

    driver = webdriver.Chrome(options=options)

    # 移除 webdriver 标志
    driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
        "source": """
            Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
            Object.defineProperty(navigator, 'plugins', {
                get: () => [1, 2, 3, 4, 5]
            });
            Object.defineProperty(navigator, 'languages', {
                get: () => ['zh-CN', 'zh', 'en-US', 'en']
            });
            window.chrome = { runtime: {} };
            const originalQuery = window.navigator.permissions.query;
            window.navigator.permissions.query = (parameters) => (
                parameters.name === 'notifications'
                    ? Promise.resolve({ state: Notification.permission })
                    : originalQuery(parameters)
            );
        """
    })

    return driver

8.5.2 使用 undetected-chromedriver

pip install undetected-chromedriver
import undetected_chromedriver as uc

def scrape_with_uc(url):
    options = uc.ChromeOptions()
    options.add_argument("--headless=new")

    driver = uc.Chrome(options=options)
    driver.get(url)

    # 自动处理 Cloudflare 等反爬
    title = driver.title
    print(f"标题: {title}")

    driver.quit()
    return title

8.5.3 Playwright Stealth

npm install playwright-extra puppeteer-extra-plugin-stealth
const { chromium } = require('playwright-extra');
const stealth = require('puppeteer-extra-plugin-stealth')();
chromium.use(stealth);

(async () => {
  const browser = await chromium.launch({ headless: true });
  const page = await browser.newPage();
  await page.goto('https://example.com');
  // ... 操作
  await browser.close();
})();

反检测策略汇总

检测手段应对策略
UserAgent 检测设置真实 UA,移除 HeadlessChrome 标识
navigator.webdriverObject.defineProperty 覆盖
插件数量伪造 navigator.plugins
语言设置设置 navigator.languages
Canvas 指纹使用 --headless=new(与有头一致)
WebGL 指纹修改 WEBGL_debug_renderer_info
TLS 指纹使用 curl_cffitls-client
Cloudflare使用 undetected-chromedriver 或等待
速率限制请求间隔 + 随机延迟
IP 封禁使用代理池
验证码人工介入或第三方服务

8.6 代理配置

Selenium 代理

# HTTP 代理
options.add_argument("--proxy-server=http://proxy.example.com:8080")

# SOCKS5 代理
options.add_argument("--proxy-server=socks5://proxy.example.com:1080")

# 需要认证的代理
# Chrome 不支持直接在参数中传递代理认证,需使用扩展

代理认证扩展

import zipfile
import base64

def create_proxy_extension(host, port, username, password):
    """创建代理认证扩展"""
    manifest_json = """
    {
        "version": "1.0.0",
        "manifest_version": 2,
        "name": "Proxy Auth",
        "permissions": ["proxy", "webRequest", "webRequestBlocking", "<all_urls>"],
        "background": { "scripts": ["background.js"] }
    }
    """
    background_js = f"""
    chrome.webRequest.onAuthRequired.addListener(
        (details, callback) => {{
            callback({{
                authCredentials: {{ username: "{username}", password: "{password}" }}
            }});
        }},
        {{ urls: ["<all_urls>"] }},
        ["blocking"]
    );

    chrome.proxy.settings.set({{
        value: {{
            mode: "fixed_servers",
            rules: {{
                singleProxy: {{
                    scheme: "http",
                    host: "{host}",
                    port: parseInt("{port}")
                }},
                bypassList: ["localhost"]
            }}
        }},
        scope: "regular"
    }});
    """

    plugin_path = "/tmp/proxy_auth_plugin.zip"
    with zipfile.ZipFile(plugin_path, 'w') as zp:
        zp.writestr("manifest.json", manifest_json)
        zp.writestr("background.js", background_js)

    return plugin_path

# 使用
ext_path = create_proxy_extension("proxy.example.com", "8080", "user", "pass")
options.add_extension(ext_path)

代理轮换

import random

PROXY_LIST = [
    "http://proxy1.example.com:8080",
    "http://proxy2.example.com:8080",
    "http://proxy3.example.com:8080",
    "socks5://proxy4.example.com:1080",
]

def get_random_proxy():
    return random.choice(PROXY_LIST)

def scrape_with_proxy_rotation(urls):
    results = {}
    for url in urls:
        proxy = get_random_proxy()
        options = Options()
        options.add_argument(f"--proxy-server={proxy}")
        options.add_argument("--headless=new")
        options.add_argument("--no-sandbox")

        driver = webdriver.Chrome(options=options)
        try:
            driver.get(url)
            results[url] = driver.page_source
            print(f"✅ {url} via {proxy}")
        except Exception as e:
            print(f"❌ {url} via {proxy}: {e}")
        finally:
            driver.quit()

        time.sleep(random.uniform(1, 3))

    return results

8.7 数据提取技巧

8.7.1 复杂数据提取

def extract_product_data(driver):
    """提取结构化商品数据"""
    products = []

    items = driver.find_elements(By.CSS_SELECTOR, ".product-card")
    for item in items:
        product = {}

        # 标题
        try:
            product["name"] = item.find_element(By.CSS_SELECTOR, ".product-name").text
        except:
            product["name"] = None

        # 价格
        try:
            price_text = item.find_element(By.CSS_SELECTOR, ".price").text
            product["price"] = float(price_text.replace("¥", "").replace(",", "").strip())
        except:
            product["price"] = None

        # 图片 URL
        try:
            product["image"] = item.find_element(By.CSS_SELECTOR, "img").get_attribute("src")
        except:
            product["image"] = None

        # 评分
        try:
            stars = item.find_elements(By.CSS_SELECTOR, ".star.filled")
            product["rating"] = len(stars)
        except:
            product["rating"] = None

        # 链接
        try:
            product["url"] = item.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
        except:
            product["url"] = None

        products.append(product)

    return products

8.7.2 JavaScript 提取

def extract_with_js(driver):
    """使用 JavaScript 提取数据(更高效)"""
    data = driver.execute_script("""
        return Array.from(document.querySelectorAll('.item')).map(item => ({
            title: item.querySelector('.title')?.textContent?.trim(),
            link: item.querySelector('a')?.href,
            image: item.querySelector('img')?.src,
            price: parseFloat(item.querySelector('.price')?.textContent?.replace(/[^0-9.]/g, '')),
            tags: Array.from(item.querySelectorAll('.tag')).map(t => t.textContent.trim()),
        }));
    """)
    return data

8.7.3 Playwright 数据提取

def extract_with_playwright(url):
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()
        page.goto(url)

        # 使用 evaluate 提取
        data = page.evaluate("""
            () => Array.from(document.querySelectorAll('.item')).map(item => ({
                title: item.querySelector('.title')?.textContent?.trim(),
                link: item.querySelector('a')?.href,
            }))
        """)

        # 使用 Locator API 提取
        items = page.locator(".item").all()
        for item in items:
            title = item.locator(".title").text_content()
            link = item.locator("a").get_attribute("href")

        browser.close()
        return data

8.8 爬虫最佳实践

礼貌爬取

import random
import time

def polite_delay(min_sec=1, max_sec=3):
    """随机延迟,模拟人类行为"""
    time.sleep(random.uniform(min_sec, max_sec))

# 每次请求后延迟
for url in url_list:
    driver.get(url)
    # ... 提取数据
    polite_delay(2, 5)  # 2-5 秒随机延迟

错误重试

from functools import wraps
import time

def retry(max_retries=3, delay=2):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    wait = delay * (2 ** attempt)  # 指数退避
                    print(f"重试 {attempt + 1}/{max_retries}: {e}, 等待 {wait}s")
                    time.sleep(wait)
            return None
        return wrapper
    return decorator

@retry(max_retries=3, delay=2)
def fetch_page(url):
    driver.get(url)
    # ... 提取数据

数据去重

import hashlib

def deduplicate(items, key="url"):
    """根据指定字段去重"""
    seen = set()
    unique = []
    for item in items:
        val = item.get(key, "")
        hash_val = hashlib.md5(val.encode()).hexdigest()
        if hash_val not in seen:
            seen.add(hash_val)
            unique.append(item)
    return unique

8.9 性能优化

优化策略说明效果
禁用图片拦截图片请求减少 30-60% 加载时间
禁用 CSS拦截 CSS 请求减少 10-30% 加载时间
禁用字体拦截字体请求减少 5-10% 加载时间
使用 eager 等待不等图片/CSS 加载减少 20-40% 等待
连接复用复用 BrowserContext避免重复启动
并发控制多 tab 并行N 倍吞吐量
API 优先发现 API 直接调用10x+ 速度提升

并发爬虫 (Playwright)

import asyncio
from playwright.async_api import async_playwright

async def scrape_page(page, url):
    await page.goto(url, wait_until="domcontentloaded")
    title = await page.title()
    return {"url": url, "title": title}

async def scrape_concurrent(urls, concurrency=5):
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context()

        # 创建并发任务
        semaphore = asyncio.Semaphore(concurrency)
        results = []

        async def limited_scrape(url):
            async with semaphore:
                page = await context.new_page()
                try:
                    result = await scrape_page(page, url)
                    results.append(result)
                finally:
                    await page.close()

        tasks = [limited_scrape(url) for url in urls]
        await asyncio.gather(*tasks, return_exceptions=True)

        await browser.close()
        return results

# 运行
urls = [f"https://example.com/page/{i}" for i in range(100)]
results = asyncio.run(scrape_concurrent(urls, concurrency=5))

8.10 要点回顾

要点说明
API 优先能用 API 就不用浏览器爬虫
反检测必备设置 UA、移除 webdriver 标志、随机延迟
代理轮换避免 IP 封禁
随机延迟模拟人类行为,礼貌爬取
错误重试指数退避 + 最大重试次数
数据去重URL 哈希去重
资源拦截禁用图片/CSS/字体加速加载

8.11 注意事项

⚠️ 法律合规: 爬虫行为需遵守网站的 robots.txt 和服务条款,避免抓取敏感个人信息。

⚠️ 服务器压力: 不要过于频繁地请求,合理设置延迟,避免对目标服务器造成压力。

⚠️ 数据存储: 大规模爬取注意存储方案,小规模用 JSON/CSV,大规模用数据库。

⚠️ Headless 指纹: 虽然 --headless=new 指纹与有头模式一致,但部分高级检测(如 BrowserLeaks)仍可能区分。


8.12 扩展阅读

资源链接
robots.txt 规范https://www.robotstxt.org/
undetected-chromedriverhttps://github.com/ultrafunkamsterdam/undetected-chromedriver
playwright-extrahttps://github.com/berstend/puppeteer-extra/tree/master/packages/playwright-extra
Scrapy + Playwrighthttps://github.com/scrapy-plugins/scrapy-playwright
浏览器指纹检测https://browserleaks.com/
Bot 检测测试https://bot.sannysoft.com/