做过舆情监控或数据分析的人大多会遇到类似需求:
一开始很多人用单机脚本就能跑通,但随着监控范围扩大,话题数和评论量成倍增加,往往就得考虑分布式架构。
最简单的尝试就是写一个多线程脚本,把微博热搜前几十个话题抓下来:
import requests
from concurrent.futures import ThreadPoolExecutor
urls = [f"https://s.weibo.com/top/summary?cate=realtimehot&page={i}" for i in range(1, 6)]
def fetch(url):
resp = requests.get(url, timeout=5)
return resp.text
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(executor.map(fetch, urls))
print("采集结果:", len(results))
这种方式在测试阶段没问题,能快速验证数据可用性。但问题也很明显:
如果把目标换成小红书上的某个热门话题,比如“旅游攻略”,情况就不一样了:
更合适的做法是分布式队列:不同节点并发消费任务,搭配爬虫代理IP,抗风险能力和处理速度都能上一个台阶。
import requests
import redis
import random
# ===== 代理IP配置(亿牛云示例) =====
PROXY_HOST = "proxy.16yun.cn"
PROXY_PORT = "3100"
PROXY_USER = "16YUN"
PROXY_PASS = "16IP"
proxy_url = f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"
# ===== Redis 队列 =====
r = redis.StrictRedis(host="localhost", port=6379, db=0)
# ===== UA池 =====
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)...",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)...",
"Mozilla/5.0 (X11; Linux x86_64)..."
]
def fetch(url):
headers = {"User-Agent": random.choice(user_agents)}
proxies = {"http": proxy_url, "https": proxy_url}
try:
resp = requests.get(url, headers=headers, proxies=proxies, timeout=8)
if resp.status_code == 200:
print(f"[成功] {url}")
return resp.text
else:
print(f"[失败] {url}, 状态码 {resp.status_code}")
except Exception as e:
print(f"[异常] {url}, {e}")
return None
def worker():
while True:
url = r.lpop("task_queue")
if not url:
break
url = url.decode("utf-8")
html = fetch(url)
if html:
# TODO: 在这里解析帖子和评论
pass
if __name__ == "__main__":
for i in range(1, 11):
r.rpush("task_queue", f"https://www.xiaohongshu.com/explore?topic=旅游攻略&page={i}")
worker()
简单来说:数据规模和时效性决定了架构选择。
换句话说,可以先从简单方案入手,但要给架构留好扩展的空间。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。