在公司做数据产品时,我们常常遇到三个痛点:脚本跑不稳、页面渲染抓不到数据、以及规模化后调度和重试逻辑变得难以维护。最开始通常是一两个 Python 脚本在开发机上跑通,但到了生产环境,问题就会接踵而来:单机负载、被目标站点限速、以及不同页面结构带来的脆弱性。
本项目的目标是把一个可验证的单机 Playwright 爬虫,逐步演进成能在多台机器上并行工作的集群化方案,并把关键实现、思路与示例代码记录下来,方便求职/展示或直接在企业内复用。
本文以招聘网站为示例,目标字段很常见也很实用,便于后续做分析:
这些字段既能做趋势分析,也能用于构建岗位搜索引擎或行业报告。
这种组合能在保证可维护性的同时兼顾扩展与性能。
这是最直接的版本,用来确认页面结构、JS 渲染及提取逻辑是否稳健。代码示例(异步 Playwright):
# single_playwright.py
import asyncio
from playwright.async_api import async_playwright
# ====== 代理配置(示例:亿牛云) ======
proxy_host = "proxy.16yun.cn"
proxy_port = "3100"
proxy_user = "16YUN"
proxy_pass = "16IP"
async def crawl_page(keyword: str, page_num: int = 1):
"""
单页抓取示例函数:访问并解析职位卡片,返回字典列表
"""
async with async_playwright() as pw:
# 启动浏览器(无头/有头可根据调试需要切换)
browser = await pw.chromium.launch(headless=True,
proxy={
"server": f"http://{proxy_host}:{proxy_port}",
"username": proxy_user,
"password": proxy_pass
})
page = await browser.new_page()
url = f"https://example-job-site.com/search?kw={keyword}&p={page_num}"
await page.goto(url, timeout=30000)
# 等待职位列表出现(根据实际页面调整选择器)
await page.wait_for_selector('.job-card', timeout=15000)
cards = await page.query_selector_all('.job-card')
results = []
for c in cards:
# 下面的选择器只是示例,实际按目标站点调整
title = await c.query_selector_eval('.job-title', 'el => el.textContent.trim()')
company = await c.query_selector_eval('.company', 'el => el.textContent.trim()')
salary = await c.query_selector_eval('.salary', 'el => el.textContent.trim()')
city = await c.query_selector_eval('.city', 'el => el.textContent.trim()')
results.append({
"title": title,
"company": company,
"salary": salary,
"city": city
})
await browser.close()
return results
if __name__ == "__main__":
rows = asyncio.run(crawl_page("Python 爬虫", page_num=1))
for r in rows:
print(r)
说明:先把单页、单关键词跑通,再处理翻页、异常与重试逻辑。单机阶段以稳定为先,别急着并发。
当单页逻辑稳定后,我们会把单页函数包装成任务池,利用 asyncio 并发多个标签页(或多个浏览器实例)来提高吞吐。示例要点:
(此处略去并发包装的完整代码,实际项目中可用 asyncio.Semaphore
控制并发数。)
把任务放到 Redis 列表或流(stream)里,多台机器从队列里取任务并执行。示例消费端伪代码:
# worker.py (伪代码,需加异常处理与日志)
import asyncio
import aioredis
from single_playwright import crawl_page # 引入前面实现的单页函数
REDIS_URL = "redis://127.0.0.1:6379/0"
TASK_QUEUE = "job_tasks"
async def worker_loop():
redis = await aioredis.from_url(REDIS_URL)
while True:
task = await redis.lpop(TASK_QUEUE)
if not task:
await asyncio.sleep(1)
continue
# 假设任务是 JSON 字符串,包含 keyword 和 page_num
task = task.decode("utf-8")
# 这里简单演示,生产环境请解析 JSON 并做失败重试、幂等处理
keyword = task
print("取到任务:", keyword)
results = await crawl_page(keyword)
# 结果入库/发送到下游(例如 MongoDB/Elasticsearch)
# save_results(results)
关键点:
route.abort()
拦截图片、视频、广告等资源,显著减轻网络压力。落库示例(MongoDB 文档结构):
{
"title": "Python 爬虫工程师",
"company": "某某科技",
"salary": "20k-35k",
"city": "上海",
"publish_time": "2025-09-01",
"source": "example-job-site",
"crawl_at": "2025-09-23T10:00:00"
}
落库后可以做的展示(列举思路):
这些可视化既能作为产品端仪表盘,也可以作为简历/作品集中突出的数据展示部分。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。