抓取单个页面很容易。在一次运行中抓取分散在众多不同网站上的数千个页面,才是大多数方案崩溃的地方:普通循环在每次请求时都会阻塞,IP 在几百次调用后就被封禁,重复的 URL 白白浪费请求额度,而一个加载缓慢的页面会拖慢整个任务。问题不在于解析,而在于规模化时的吞吐量、反封锁和记账管理。
本指南介绍如何以生产级任务实际运行的方式,在 Python 中同时抓取多个网站。你将使用 Crawling API 在真实浏览器和可信 IP 后获取并渲染每个页面,并使用异步 Crawler 将数千个 URL 推送到一个托管队列中并发抓取,并将结果推送到 webhook。我们将涵盖多 URL 排队、并发控制、去重,以及将结果汇总到一处。
为什么单一循环无法扩展
第一个爬虫几乎总是一个循环:读取 URL,获取页面,解析,保存,重复。这对十个页面有效,对一万个则会崩溃。每次获取都阻塞下一次,所以总时间是所有请求时间之和。从单一 IP 发送这些请求,网站在几百次调用后就会开始返回 403 和 CAPTCHA。而且循环中什么都不会注意到你昨天已经抓取了列表的一半,所以你还要为已有的页面重复付费。
同时跨多个网站扩展意味着解决三个独立的问题。你需要并发性,使慢速页面不阻塞快速页面。你需要反封锁,通过轮换 IP 和真实浏览器渲染使你远离封禁名单。还需要记账管理,使重复 URL 被跳过,而完成的结果汇总到一处存储。本指南的其余部分将每个问题映射到一个工具并展示代码。
保持边界清晰。Crawling API 每次调用获取并渲染一个页面:它运行 JavaScript,轮换 IP,并返回完整 HTML。异步 Crawler 是其上层的队列:你推送多个 URL,它并发抓取,自动重试失败项,并将每个结果 POST 到你托管的 webhook。对于你等待完成的有限批次使用 API,对于大规模的推送收集任务使用 Crawler。
你将构建什么
针对跨不同网站的 URL 列表,构建两种可运行的模式。第一种是并发批次:将去重后的 URL 集通过 Crawling API 循环处理,并将每个结果写入 JSON 文件,适合你希望在运行完成后拿到结果的数百到数千个页面。第二种是将大量任务异步推送到 Crawler,适用于单次获取已不可行的数万个 URL 规模。两者都使用官方 crawlbase Python 客户端。
搭建环境
你需要 Python 3.8 或更高版本。确认你的版本,创建虚拟环境以隔离依赖,然后安装客户端。
python --version python -m venv scrape_env source scrape_env/bin/activate pip install crawlbase
在 Windows 上,将激活命令中的 source 行替换为 scrape_env\Scripts\activate。crawlbase 包是官方客户端,同时封装了 Crawling API 和异步 Crawler,无需手动构建 HTTP 调用。注册后,从你的 Crawlbase 控制台获取两个令牌:普通令牌用于静态页面,JavaScript(JS)令牌用于客户端渲染的页面。从环境变量读取它们,而非硬编码。
export CRAWLBASE_TOKEN=your_normal_token_here export CRAWLBASE_JS_TOKEN=your_js_token_here
普通令牌获取静态 HTML,更便宜且更快。JS 令牌先在真实浏览器中渲染页面,适用于客户端加载内容的网站。当你同时抓取许多不同网站时,两种情况都会遇到,因此常见模式是默认使用 JS 令牌,对已知是静态的目标降级使用普通令牌。
构建去重 URL 集
在任何获取之前,先清理输入。从站点地图、分类页面和之前运行拼凑而来的真实目标列表,充满了重复项和陈旧条目。提前去重是你能做的最廉价的单一优化,因为从未发送的请求不花任何成本。对每个 URL 进行规范化,并记录已抓取的内容。
import json import os from urllib.parse import urlparse, urlunparse def normalize(url): parts = urlparse(url.strip()) # Drop fragments and trailing slashes so near-duplicates collapse. path = parts.path.rstrip("/") or "/" return urlunparse((parts.scheme, parts.netloc, path, "", parts.query, "")) raw_urls = [ "https://books.toscrape.com/catalogue/a-light-in-the-attic_1000/index.html", "https://books.toscrape.com/catalogue/tipping-the-velvet_999/index.html", "https://quotes.toscrape.com/page/1/", "https://quotes.toscrape.com/page/1/#top", # duplicate after normalizing ] targets = sorted({normalize(u) for u in raw_urls}) print(f"{len(targets)} unique URLs to crawl")
集合推导式用一行代码消除完全相同的和近似重复的 URL。对于跨多天运行的任务,将已抓取集合持久化到磁盘,并在每次运行开始时从 targets 中减去它,这样你就永远不会重新获取已有的页面。这是记账层在你花费任何一次请求额度之前就发挥作用的方式。
使用 Crawling API 并发抓取批次
现在开始获取。朴素版本是串行循环,但串行恰恰是无法扩展的方式,因此通过线程池运行请求。每次对 Crawling API 的调用都是 I/O 密集型的,等待网络响应,这正是线程池擅长处理的场景。适度的工作线程数量可以保持多个请求同时在途,同时不会对任何单一网站造成过大压力。
from concurrent.futures import ThreadPoolExecutor, as_completed from crawlbase import CrawlingAPI api = CrawlingAPI({"token": os.environ["CRAWLBASE_JS_TOKEN"]}) def fetch(url): options = {"ajax_wait": "true", "page_wait": 2000} response = api.get(url, options) status = response["headers"].get("pc_status") return { "url": url, "status": status, "html": response["body"].decode("utf-8", "ignore"), } results = [] with ThreadPoolExecutor(max_workers=10) as pool: futures = {pool.submit(fetch, u): u for u in targets} for future in as_completed(futures): url = futures[future] try: results.append(future.result()) except Exception as err: print(f"Failed {url}: {err}") print(f"Collected {len(results)} pages")
两个细节使这段代码更加健壮。pc_status 响应头携带目标返回的原始状态码,让你能够区分真正的 200 和软失败,并决定是否保留该行。将 future.result() 包裹在 try/except 中意味着一个失败的 URL 会记录日志并继续运行,而不是终止整个批次。Crawling API 负责每次调用的渲染和 IP 轮换,因此你的代码只需管理并发。
一次调用在真实浏览器和轮换住宅 IP 后获取并渲染一个页面,使跨多个不同网站的批次保持不被封锁,而你无需自行运行无头浏览器集群或代理池。先从免费套餐的公开页面开始,然后将同一循环扩展到数千个 URL。
解析并保存收集的结果
你现在拥有了 results 中每个页面的原始 HTML。每个网站的解析方式各异,但收集步骤相同:提取你需要的字段,并为每个页面写入一条结构化记录。在每行记录上保留 URL 和抓取时间戳,使输出同时成为运行内容和时间的审计轨迹。
from datetime import datetime, timezone from bs4 import BeautifulSoup def parse_title(html): soup = BeautifulSoup(html, "html.parser") title = soup.find("title") return title.text.strip() if title else None rows = [] for item in results: if item["status"] != "200": continue rows.append({ "url": item["url"], "title": parse_title(item["html"]), "captured_at": datetime.now(timezone.utc).isoformat(), }) with open("scraped.json", "w") as f: json.dump(rows, f, indent=2) print(f"Wrote {len(rows)} rows to scraped.json")
这需要在客户端旁安装 beautifulsoup4(pip install beautifulsoup4)。跳过未返回干净 200 的行,可以将软失败(空 body 或挑战页面)排除在数据集之外,这种静默损坏会在大规模抓取中悄悄污染数据。对于知名目标(如大型零售商或电商平台),可以完全跳过手写解析,改用 Crawling API 直接返回预先解析好的 JSON。
通过异步 Crawler 突破批次限制
线程池批次是处理几千个你想等待结果的 URL 的正确工具。超过这个规模,在数万个页面抓取时阻塞你的进程就不再实际,这时异步 Crawler 接手。它是一个基于推送的托管队列:你通过同一客户端提交 URL,每个都获得一个请求 ID,系统以自己的并发方式抓取队列并在失败时重试,然后将每个完成的页面 POST 到你服务器上的 webhook。
from crawlbase import CrawlingAPI crawler = CrawlingAPI({"token": os.environ["CRAWLBASE_JS_TOKEN"]}) # Push each URL to the async Crawler; results arrive at your webhook. for url in targets: response = crawler.post(url, { "callback": "https://your-app.example.com/webhook", "callback_headers": "X-Job-Id:bulk-run-01", }) body = json.loads(response["body"]) print(f"Queued {url} as request {body['rid']}")
每次 post 返回一个请求 ID(rid),你可以记录它来追踪任务。Crawler 在后台以自己的并发和重试逻辑抓取队列,所以你的脚本在所有 URL 提交完毕后立即结束,无需等待抓取过程。当一个页面完成时,系统将结果 POST 到你的回调 URL,而 callback_headers 字段让你给一次运行打标签,使接收处理程序知道一次推送属于哪个任务。
收集推送结果
异步模型反转了收集方式:不再是主动拉取页面,而是被动接收。你的 webhook 运行与批次版本相同的解析和保存逻辑,只是触发方式不同。Flask 中的一个最简处理程序如下所示。
from flask import Flask, request app = Flask(__name__) @app.route("/webhook", methods=["POST"]) def webhook(): rid = request.headers.get("rid") original_url = request.headers.get("original_url") html = request.get_data(as_text=True) row = { "url": original_url, "title": parse_title(html), "captured_at": datetime.now(timezone.utc).isoformat(), } with open("bulk.jsonl", "a") as f: f.write(json.dumps(row) + "\n") return "", 200
追加到 JSON Lines 文件意味着每次推送都是一次独立写入,并发回调不会像重新序列化单个 JSON 数组那样互相覆盖。Crawler 在响应头中传递原始 URL 和请求 ID,因此批次版本的 parse_title 和行结构可以直接沿用。这使流水线能够从数千个页面增长到数十万个,而你的进程永远不需要坐在那里等待。
在大规模情况下你无法逐一检查抓取结果,因此依赖内置监控。Crawlbase 控制台追踪请求量、成功率和失败率以及已用额度,实时监控显示当前队列深度。失败率持续上升通常意味着某个目标开始挑战流量,你希望在几分钟内就发现这一点,而不是在运行结束后才发现一半数据缺失。
并发、速率限制与保持不被封锁
更多工作线程并不总是更快。并发过高,你要么耗尽计划的请求速率,要么对单个域名的请求量过大足以触发其防御,反而因重试而拖慢整个运行。解决方案是按域控制并发,而非全局控制:十个在途请求分散在十个不同网站上是温和的,而十个针对同一网站则是激进的。按主机对 URL 集分组,并限制针对任何一个主机同时在途的请求数。
由于 Crawling API 和 Crawler 都在服务端轮换住宅 IP 并在真实浏览器后渲染,保持不被封锁最繁重的部分已为你处理好。如果你更倾向于通过自己的客户端路由到轮换池,Smart AI Proxy 以直接代理接口的形式提供相同的住宅 IP 轮换。无论哪种方式,都要控制请求节奏、变换目标,并关注状态码,以便在网站开始反抗时及时退让。完整策略见如何在不被封锁的情况下抓取网站。
负责任地抓取
规模化抓取是责任,不仅仅是能力。坚守公开可获取的数据;不要抓取登录后的内容、付费专区内容,或任何未经明确授权的个人数据或受版权保护的内容。阅读每个网站的 robots.txt 和服务条款,并遵守其中规定的访问规则。同时要对自己进行速率限制:在请求之间添加间隔并按域限制并发,既能让你远离封禁名单,也能减轻网站服务器的压力。克制不仅是道德选择,也是运营选择,因为尊重限制的任务比不尊重的任务运行时间长得多。
核心要点
- 分解问题。同时抓取多个网站是三个问题,不是一个:并发、反封锁和记账管理。将每个问题映射到一个工具,而非将它们全部塞进一个循环。
- 在获取前去重。规范化 URL 并跳过已抓取的,因为最廉价的请求就是从未发送的那个。
- 有限批次使用线程池。Crawling API 调用是 I/O 密集型的,因此适度的工作线程池抓取数百到数千个页面的速度远快于串行循环。
- 规模化时推送到异步 Crawler。对于数万个 URL,将其提交到队列并在 webhook 处接收结果,并发、重试和监控均免费提供。
- 按域控制并发。将负载分散到各个主机,并限制对任何单一网站的在途请求数,以保持不被封锁,而非触发防御。
- 负责任地抓取。仅限公开数据,遵守 robots.txt 和服务条款,并对自己进行速率限制以保持任务持续运行。
常见问题
如何在 Python 中同时抓取多个网站?
构建一个去重的 URL 集,然后并发获取而非串行循环。对于有限批次,通过 ThreadPoolExecutor 运行 Crawling API,使慢速页面不阻塞快速页面,并将每个结果收集到写入磁盘的列表中。对于非常大的任务,改为将 URL 推送到异步 Crawler,它在后台排队并抓取,将每个完成的页面推送到你托管的 webhook。
Crawling API 和异步 Crawler 有什么区别?
Crawling API 是同步的:你发送一个 URL 并在响应中等待渲染后的页面,适用于单次抓取或小型并发批次。异步 Crawler 为规模而生:你推送多个 URL,它以自己的并发方式在后台抓取,并将每个结果 POST 到你的 webhook。两者共享相同的渲染和反封锁基础设施,因此根据你的吞吐量需求选择合适的一个。
抓取多个网站时如何避免被封锁?
轮换 IP 并在真实浏览器后渲染页面,控制请求节奏以免对任何单一域名造成过大压力。Crawling API 和 Crawler 在服务端处理 IP 轮换和渲染,因此大部分封锁问题已得到解决。如果你自行路由客户端,请使用像 Smart AI Proxy 这样的轮换接口,按域控制并发,并关注状态码以便在网站开始挑战流量时及时退让。
如何处理数千个页面中的重复 URL?
通过去除片段和尾部斜杠规范化每个 URL,然后将它们存入集合,使完全相同的和近似重复的 URL 自动合并。对于跨时间恢复的运行,将已抓取 URL 的集合持久化到磁盘,并在每次运行开始时从目标列表中减去它。这种记账管理使你不必为已有的页面重复付费。
应该运行多少个并发请求?
从适度的十个工作线程开始,然后根据你计划的请求速率和目标的响应情况进行调整。重要的数字是每个域的并发量,而非全局总量:十个请求分散在十个网站上是温和的,而十个针对同一网站则是激进的。按主机对 URL 分组,并限制对任何单一网站同时在途的请求数以保持不被封锁。
抓取数千个网站是否合法?
抓取公开可获取的数据通常被接受,但合法性取决于每个网站的服务条款、版权以及 GDPR 和 CCPA 等数据保护法律。坚守公开数据,避免登录后内容、付费专区内容以及任何个人或受版权保护的内容,遵守 robots.txt,并对自己进行速率限制。如果对特定目标有疑问,请在对其运行大规模任务之前查阅其条款并寻求法律建议。
大规模爬取任何站点,无需与基础设施对抗。
Crawlbase 负责处理代理、指纹和 CAPTCHA,让你的团队专注于交付数据流水线,而非维护爬取管道。1,000 次请求免费,无需信用卡。
