一条网络数据管道的可靠性,只取决于它的采集层。当仪表盘数据过时或数字开始对不上时,原因几乎从来不是分析代码,而是管道的前端:一个在网站改版后崩掉的爬虫、开始被封禁的请求,或者那些在浏览器中能加载、但对一个普通 HTTP 抓取只返回一个空壳的页面。把采集当作脆弱的来对待,整条管道就会继承那份脆弱。

本指南将向你展示如何构建一条可扩展的网络数据管道,由 Crawlbase 负责采集,由标准的 ETL 工具负责其余部分。你会用 Crawling API 处理按需工作、用异步 Crawler 处理大批量任务来采集页面,转换并验证原始 HTML,把干净的数据行加载进存储,并连同监控一起调度整件事。每一步都有你可以改造的可运行代码。

一条可扩展的网络数据管道长什么样

这个模式就是经典的 ETL 形态,外加一个重要的关注点分离。Crawlbase 作为一个采集层坐镇前端,处理所有让爬取变得不稳定的事情:JavaScript 渲染、IP 轮换、请求路由和封禁缓解。你的系统处理解析、验证、存储和分析。流程从左读到右:

bash
Web  ->  Crawlbase (collect)  ->  Transform + Validate  ->  Storage  ->  BI / ML

在这里划出边界的理由是耐久性。外部网站不是稳定的依赖;它们会毫无预警地推出布局变更、运行实验、部署反机器人防御。通过在前面放一个托管的采集层,一次网站变更就变成了一个配置上的事情,而不是一次管道宕机。Crawlbase 为两种工作负载形态给你两个采集工具,而一条生产管道通常两者都用。

  • Crawling API 用于对已知 URL 进行实时、按需的获取。你发送一个 URL,它返回页面。
  • Async Crawler 用于大规模、发出即不管的采集。你推送 URL,它异步地抓取它们,并把结果 POST 到你的 webhook。

这正是任何认真的电商网络爬取操作最终都会落到的同一种分离:一条用于定向查询的快路径,和一条用于覆盖面的批量路径。如果你对底层的代理机制还很陌生,什么是代理服务器是有用的背景知识,不过一个托管 API 的意义恰恰在于你不必去管理其中任何一项。

第 1 步:用 Crawling API 采集

Crawling API 接收一个 URL 加上你的令牌,并返回渲染好的页面。你发送一个 HTTP GET;它把请求通过一个轮换的 IP 池来路由,在你传入一个 JS 令牌时可选地渲染 JavaScript,并把 HTML(或已解析的 JSON)交回给你。最简单的调用就是一条 curl:

bash
curl 'https://api.crawlbase.com/?token=YOUR_TOKEN&url=https%3A%2F%2Fexample.com%2Fproducts'

在一条管道里,你想要的是一个小巧、可复用的采集器,而不是裸的 curl。安装官方客户端并把调用包起来,这样管道的其余部分拿到的就是干净的 HTML,永远不用去想令牌或重试。对客户端渲染的页面使用 JS 令牌,对静态 HTML 使用普通令牌。

bash
python3 -m venv venv && source venv/bin/activate
pip install crawlbase
python
from crawlbase import CrawlingAPI

api = CrawlingAPI({'token': 'YOUR_TOKEN'})

def collect(url, render=False):
    options = {'ajax_wait': True, 'page_wait': 3000} if render else {}
    response = api.get(url, options)
    status = response['status_code']
    if status != 200:
        raise RuntimeError(f'collect failed for {url}: {status}')
    return response['body'].decode('utf-8')

html = collect('https://example.com/products', render=True)
print(len(html), 'bytes')

有两个细节让它达到管道级别而不是一个玩具。第一,它检查 status_code,并在任何不是一次干净抓取的情况上抛出异常,这样一个糟糕的页面会响亮地暴露出来,而不是用空行毒害你的数据仓库。第二,render 标志让你的调用点对哪些页面需要 JavaScript 保持诚实:只在内容确实需要的地方才支付渲染成本。这个采集器就是你的调度器将为每个已知 URL 调用的那个单元。

普通令牌对比 JS 令牌

Crawlbase 给你两个令牌。普通令牌又快又便宜地返回静态 HTML;JS 令牌先在一个真实浏览器中渲染页面,对于客户端渲染的网站你会需要它。只有当一个页面对普通抓取返回一个空壳时才动用 JS 令牌,并把它与 ajax_waitpage_wait 搭配,这样迟加载的内容才有时间出现。

第 2 步:用异步 Crawler 扩展规模

Crawling API 是同步的:一个请求,一个响应,而你的代码在等待。对于几百个已知 URL 这恰好合适。对于几万个,在每次调用上阻塞就无法扩展了。异步 Crawler 翻转了这个模型。你把 URL 推送进一个命名的爬虫,请求立即返回一个 Request ID,Crawlbase 在后台抓取页面,当它完成时把结果 POST 到你的回调端点。你的代码里没有任何东西在阻塞着等待页面。

你通过给同一个端点添加两个参数来选择进入异步模式:callback=truecrawler=YourCrawlerName(你在仪表盘里创建一次这个爬虫,并把它指向你的 webhook URL)。推送一个 URL 看起来是这样的:

bash
curl 'https://api.crawlbase.com/?token=YOUR_TOKEN&callback=true&crawler=my-pipeline&url=https%3A%2F%2Fexample.com%2Fp%2F123'

你拿回来的不是页面正文,而是一个 Request ID,这意味着这个 URL 已经排队了:

json
{ "rid": "1e92e8bf4618772871c14d4" }

从你这一侧看,推送一个大批量就是一个紧凑的循环。重点是吞吐量:你把所有 URL 一股脑发出去,不等任何一个完成,由队列来吸收这些工作。

python
from crawlbase import CrawlingAPI

api = CrawlingAPI({'token': 'YOUR_TOKEN'})

def push_batch(urls):
    options = {'callback': True, 'crawler': 'my-pipeline'}
    for url in urls:
        response = api.get(url, options)
        rid = response['body']['rid']
        print(f'queued {url} -> {rid}')

push_batch([
    'https://example.com/p/123',
    'https://example.com/p/124',
    'https://example.com/p/125',
])

异步的另一半是回调处理器。Crawlbase 把爬取到的页面 POST 到你随爬虫注册的 webhook,在请求正文里发送 HTML,在请求头里发送元数据(Request ID、原始 URL 和状态)。你的处理器应该做最低限度的事:用一个 200 快速确认,并把载荷交给你的转换步骤。在内联里做繁重的解析会冒着投递超时并被重试的风险。

javascript
const express = require('express')
const app = express()

// Crawlbase POSTs raw HTML; capture the body as text
app.use(express.text({ type: '*/*', limit: '10mb' }))

app.post('/crawlbase/callback', (req, res) => {
  const rid = req.headers['rid']
  const url = req.headers['url']
  const status = req.headers['original_status']

  // ack immediately, process out of band
  res.sendStatus(200)

  enqueueForTransform({ rid, url, status, html: req.body })
})

app.listen(8080, () => console.log('callback listening on :8080'))

如果你宁愿根本不运行一个 webhook,就把爬虫指向 Crawlbase Cloud Storage 并改为轮询它;代价是一点点延迟换来零基础设施。无论哪种方式,异步模型都让你能采集数百万个页面,而你的应用永远不会在一次抓取上阻塞。

Crawlbase Crawling API + Crawler

一个令牌覆盖采集的两半:对已知 URL 的同步调用和应对量的异步推送,渲染、轮换 IP 和封禁缓解都在服务器端处理。从免费套餐开始,把回调接到一个用完即弃的端点,在你构建管道其余部分之前就看到结果到达。

第 3 步:转换并验证原始 HTML

采集给你 HTML。转换步骤把那些 HTML 变成干净、带类型的记录,并把任何过不了质量关的东西扔掉。这正是很多管道悄悄腐烂的地方:一个任务报告成功,但它写入的行是空的,因为一个选择器漂移了。要显式地验证,这样一次解析失败看起来才像一次失败。

用任何适合你技术栈的东西来解析;这个示例用的是 BeautifulSoup。这个函数提取字段,把它们规范化为原生类型,并拒绝发出一条缺少名称或价格无法解析的记录。

python
import re
from bs4 import BeautifulSoup

def transform(html, source_url):
    soup = BeautifulSoup(html, 'html.parser')
    records = []

    for card in soup.select('.product-card'):
        name = card.select_one('.title')
        price = card.select_one('.price')
        if not name or not price:
            continue  # skip incomplete cards, do not emit junk

        digits = re.sub(r'[^\d.]', '', price.get_text())
        if not digits:
            continue

        records.append({
            'name': name.get_text(strip=True),
            'price': float(digits),
            'source_url': source_url,
        })

    if not records:
        raise ValueError(f'no records parsed from {source_url} (selectors may have drifted)')

    return records

真正重要的那个形态:把每个字段清洗成一个原生类型(一个浮点价格、一个去除空白的字符串),丢掉不完整的记录而不是写入空白,并在一整个页面什么都没产出时抛出异常,这样一个漂移的选择器在它崩掉的当天就被抓到,而不是几周后在一份报告里才被发现。如果你想对受支持的网站完全跳过解析,Crawling API 会直接返回结构化的 JSON,这一步就变成了一个直通。

第 4 步:加载进存储

手里有了经过验证的记录,就把它们写到某个可查询的地方。目的地取决于规模和用途:一个像 PostgreSQL 这样用于事务性访问的关系型数据库、一个像 BigQuery 这样用于分析的数据仓库、一个搜索存储,或者下游的一个流式平台。SQLite 足够用来展示这个模式,而能推广开来的正是这个模式:在一个稳定的键上做 upsert,这样重新运行管道时会更新已有的行而不是把它们重复一遍。

python
import sqlite3
from datetime import datetime, timezone

def load(records, db_path='pipeline.db'):
    conn = sqlite3.connect(db_path)
    conn.execute('''
        CREATE TABLE IF NOT EXISTS products (
            source_url TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            price REAL NOT NULL,
            collected_at TEXT NOT NULL
        )''')

    now = datetime.now(timezone.utc).isoformat()
    for r in records:
        conn.execute('''
            INSERT INTO products (source_url, name, price, collected_at)
            VALUES (?, ?, ?, ?)
            ON CONFLICT(source_url) DO UPDATE SET
                name=excluded.name,
                price=excluded.price,
                collected_at=excluded.collected_at
        ''', (r['source_url'], r['name'], r['price'], now))

    conn.commit()
    conn.close()

upsert 正是让加载步骤具有幂等性的东西:把同一个批次跑两遍会让表停在同样的状态,这恰好是当一个调度器重试一次失败的运行时你想要的。collected_at 时间戳给你一个新鲜度信号,你会在下一步的监控里用到它。把那些 SQLite 调用换成你的数据仓库客户端,逻辑原封不动地照搬过去。

第 5 步:自动化、调度和监控

这些部件组合成一个管道函数,而这个函数就是你的调度器所调用的。把采集、转换和加载用一个逐 URL 的 try/except 连起来,能让一个糟糕的页面不至于杀死整次运行。

python
import logging

logging.basicConfig(level=logging.INFO)
log = logging.getLogger('pipeline')

def run_pipeline(urls):
    ok, failed = 0, 0
    for url in urls:
        try:
            html = collect(url, render=True)
            records = transform(html, url)
            load(records)
            ok += 1
        except Exception as err:
            failed += 1
            log.error('pipeline failed for %s: %s', url, err)

    log.info('run complete: %d ok, %d failed', ok, failed)
    if failed > ok:
        raise RuntimeError('majority of URLs failed, check upstream')

要让它按计划运行,最简单的选项是 cron。这条条目每六小时运行一次管道,并把输出追加到一个你可以 tail 或运送到你监控栈的日志里:

bash
# run the pipeline every 6 hours
0 */6 * * * /path/to/venv/bin/python /path/to/run.py >> /var/log/pipeline.log 2>&1

cron 对于寥寥几个任务是没问题的。一旦你在各步骤之间有了依赖、重试和回填,就升级到一个像 Apache Airflow 或 Prefect 这样的工作流编排器,它们给你 DAG、自动重试,以及一个查看运行历史的界面。对于异步 Crawler,在采集这一侧根本没有调度器要运行:你推送 URL,结果在完成时流进你的回调。

监控正是一条你信得过的管道和一条你要时刻照看的管道之间的区别。至少追踪三样东西。:每次运行的行数,这样一次骤降就标记出一个采集问题。新鲜度:你存储的 collected_at 时间戳,这样当数据变陈旧时你能告警。失败率:每次运行的成功对失败的统计,这样一个悄悄爬升的数字会在一切崩溃之前就警告你一个目标网站正在变化。把那个和明智的爬取卫生搭配起来;如何爬取网站而不被封禁涵盖了那些让采集层在规模上保持健康的做法。

回顾

核心要点

  • 采集是薄弱环节。在前面放一个托管的采集层,这样一次网站变更就是一次配置微调,而不是一次管道宕机。
  • 两种采集模式。Crawling API 服务于同步的、已知 URL 的查询;异步 Crawler 推送大批量,并不阻塞地把结果 POST 到你的 webhook。
  • 在转换里验证。把字段清洗成原生类型,丢掉不完整的记录,并在一个页面什么都没产出时抛出异常,这样漂移的选择器会响亮地失败。
  • 让加载具有幂等性。在一个稳定的键上做 upsert,这样重试和重新运行会更新行而不是把它们重复一遍。
  • 调度并监控。cron 或一个编排器驱动运行;追踪量、新鲜度和失败率以便及早抓住问题。

常见问题

我该如何用 Crawlbase 构建一条可扩展的网络数据管道?

把 Crawlbase 用作采集层,其余部分用标准的 ETL 工具。对已知 URL 用 Crawling API、对大批量用异步 Crawler 来采集页面,把返回的 HTML 转换成经过验证的带类型记录,用一个幂等的 upsert 把它们加载进存储,并用 cron 或一个编排器调度运行,同时监控量、新鲜度和失败率。Crawlbase 处理渲染、IP 轮换和封禁缓解,这样你的代码只跟干净的数据打交道。

我什么时候该用 Crawling API 而不是异步 Crawler?

当你有一份已知的 URL 列表并想立即拿回页面时使用 Crawling API,它适合后端服务、监控任务和实时查询。当你在大批量采集,或者想要发出即不管的投递时使用异步 Crawler:你推送 URL,立刻拿到一个 Request ID,Crawlbase 在每个结果完成时把它 POST 到你的回调。许多管道两者都跑,API 用于定向获取,Crawler 用于广泛覆盖。

异步 Crawler 的回调是如何工作的?

你在仪表盘里创建一个命名的爬虫并把它指向你的 webhook URL,然后用 callback=truecrawler=YourCrawlerName 推送 URL。每次推送都立即返回一个 Request ID。当 Crawlbase 完成抓取一个页面时,它向你的 webhook 发送一个 HTTP POST,HTML 在正文里、元数据在请求头里。你的处理器应该快速返回一个 200,并在带外处理载荷,这样投递就不会超时。

我还需要管理代理或处理反机器人防御吗?

不需要。Crawling API 和 Crawler 把请求通过一个轮换的 IP 池来路由,在你传入一个 JS 令牌时渲染 JavaScript,并在服务器端施加封禁缓解。你发送一个 URL 并拿回一个页面,所以你省去了自己运行一池代理和一支无头浏览器队伍。如果你只是给自己的技术栈需要原始的轮换 IP,Smart AI Proxy 把同一个网络暴露成一个标准的代理端点。

我该如何让我的管道不写入空数据或坏数据?

在转换步骤里验证。在采集时检查响应状态,并在任何不是一次干净抓取的情况上抛出异常,然后在解析里丢掉缺少必需字段的记录,并在一整个页面产出零条记录时抛出异常,因为那通常意味着一个选择器漂移了。用一个 upsert 让加载具有幂等性,这样重试不会重复行,并存储一个采集时间戳,这样你能监控新鲜度,并在数据变陈旧时告警。

这条管道能处理数百万个页面吗?

能。一个朴素设计里的瓶颈是在每次同步抓取上阻塞,而异步 Crawler 通过把工作排队并经由回调投递结果来消除它。不等待地推送大批量,让队列吸收负载,并在结果到达时处理它们。对于非常大或持续进行的项目,一个企业版套餐增添了大批量采集所需的吞吐量和支持。

开始构建

大规模爬取任何站点,无需与基础设施对抗。

Crawlbase 负责处理代理、指纹和 CAPTCHA,让你的团队专注于交付数据流水线,而非维护爬取管道。1,000 次请求免费,无需信用卡。

自助开通 · 无需销售通话 · 提供企业级爬取量