
一个完整的爬虫监控系统,不应只简单地判断进程是否存在,而应覆盖以下几个维度:
我们的监控系统将围绕这些维度,通过日志分析、心跳检测和指标上报等方式,并在异常发生时通过多种渠道(如邮件、钉钉、微信)发出警报。
smtplib和email库,通用性强。requests调用Webhook,适合团队协作。logging模块,进行结构化日志记录。我们将以一个Scrapy爬虫为例,演示如何逐步为其注入监控能力。
步骤一:创建Scrapy爬虫项目
我们首先创建一个基础的Scrapy项目,用于爬取前程无忧的Python职位列表。
scrapy startproject zhilian_monitor
cd zhilian_monitor
scrapy genspider zhilian_python jobs.51job.com步骤二:定制结构化的日志记录
在settings.py中配置日志,确保能记录不同级别(INFO, WARNING, ERROR)的信息,并输出到文件。
# settings.py
import logging
from datetime import datetime
LOG_LEVEL = 'INFO'
# 日志文件按日期分割
LOG_FILE = f'logs/zhilian_spider_{datetime.now().strftime("%Y%m%d")}.log'在爬虫文件zhilian_python.py中,我们通过logger记录关键业务指标。
# spiders/zhilian_python.py
import scrapy
import logging
class ZhilianPythonSpider(scrapy.Spider):
name = 'zhilian_python'
allowed_domains = ['jobs.51job.com']
start_urls = ['https://jobs.51job.com/beijing/py/p1/']
def parse(self, response):
# 记录开始爬取一个页面的日志
self.logger.info(f"开始解析页面: {response.url}")
job_list = response.css('.e.e2 .el')
# 记录本页解析到的职位数量
job_count = len(job_list)
self.logger.info(f"页面 {response.url} 解析到 {job_count} 个职位")
if job_count == 0:
# 这是一个危险信号!可能页面结构变了
self.logger.error(f"警告!页面 {response.url} 未解析到任何职位,可能页面结构已更新!")
# 此处可以触发一个报警(后续实现)
for job in job_list:
try:
item = {
'title': job.css('.t1 a::text').get('').strip(),
'company': job.css('.cname a::text').get('').strip(),
'salary': job.css('.sal::text').get('').strip(),
'link': job.css('.t1 a::attr(href)').get(),
}
# 检查关键字段是否为空
if not item['title']:
self.logger.warning(f"一个职位标题为空,链接: {item['link']}")
yield item
except Exception as e:
self.logger.error(f"解析职位条目时发生错误: {e}", exc_info=True)
# 模拟翻页...步骤三:构建核心监控与报警类
我们在项目根目录创建一个monitor.py文件,这是我们监控系统的大脑。
# monitor.py
import smtplib
import requests
import logging
from email.mime.text import MIMEText
from email.header import Header
from scrapy import signals
from scrapy.exceptions import NotConfigured
class SpiderMonitor:
def __init__(self, stats, mail_settings, dingding_webhook=None):
self.stats = stats
self.mail_settings = mail_settings
self.dingding_webhook = dingding_webhook
# 初始化计数器
self.error_count = 0
self.item_scraped_count = 0
@classmethod
def from_crawler(cls, crawler):
# 从settings.py中读取配置
mail_settings = crawler.settings.getdict('MAIL_SETTINGS')
dingding_webhook = crawler.settings.get('DINGDING_WEBHOOK')
if not mail_settings and not dingding_webhook:
raise NotConfigured("未配置任何报警通道(邮件或钉钉)")
# 实例化监控器
monitor = cls(crawler.stats, mail_settings, dingding_webhook)
# 连接Scrapy信号
crawler.signals.connect(monitor.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(monitor.item_scraped, signal=signals.item_scraped)
crawler.signals.connect(monitor.spider_error, signal=signals.spider_error)
return monitor
def spider_error(self, failure, response, spider):
"""处理爬虫错误信号"""
self.error_count += 1
error_msg = f"爬虫错误: {failure.value}, 发生在URL: {response.url}"
spider.logger.error(error_msg)
# 如果错误数量达到阈值,立即报警
if self.error_count >= 10:
self.send_alert(f"【爬虫异常报警】错误数已达{self.error_count}次", error_msg)
def item_scraped(self, item, response, spider):
"""处理item爬取成功信号"""
self.item_scraped_count += 1
# 每爬取100个item,记录一次心率
if self.item_scraped_count % 100 == 0:
spider.logger.info(f"爬虫心率正常,已成功爬取 {self.item_scraped_count} 个条目。")
def spider_closed(self, spider, reason):
"""爬虫关闭时发送统计报告"""
stats = self.stats.get_stats()
spider.logger.info(f"爬虫关闭,原因: {reason}")
# 准备报告内容
total_items = stats.get('item_scraped_count', 0)
total_errors = self.error_count
finish_reason = stats.get('finish_reason', 'Unknown')
start_time = stats.get('start_time')
end_time = stats.get('finish_time')
duration = (end_time - start_time).total_seconds() if start_time and end_time else 0
report_content = f"""
前程无忧Python爬虫运行结束报告:
- 运行状态: {reason}
- 开始时间: {start_time}
- 结束时间: {end_time}
- 总运行时长: {duration:.2f} 秒
- 成功爬取条目: {total_items}
- 累计错误数: {total_errors}
- 请求总数: {stats.get('downloader/request_count', 0)}
- 200响应数: {stats.get('downloader/response_status_count/200', 0)}
详细数据请查看日志文件。
"""
# 如果是因为异常关闭,或者爬取数量为0,则发送警报级别的报告
if reason != 'finished' or total_items == 0:
self.send_alert("【爬虫异常结束报告】", report_content)
else:
# 正常结束,发送通知级别的报告
self.send_notification("【爬虫正常结束报告】", report_content)
def send_alert(self, subject, content):
"""发送警报(高优先级)"""
self._send_mail(subject, content)
self._send_dingding(f"{subject}\n{content}")
def send_notification(self, subject, content):
"""发送通知(低优先级)"""
# 通知可以只发一个通道,比如只发钉钉
self._send_dingding(f"{subject}\n{content}")
def _send_mail(self, subject, content):
"""通过邮件发送报警"""
try:
msg = MIMEText(content, 'plain', 'utf-8')
msg['From'] = Header(self.mail_settings['from'])
msg['To'] = Header(self.mail_settings['to'])
msg['Subject'] = Header(subject, 'utf-8')
smtp = smtplib.SMTP_SSL(self.mail_settings['host'], self.mail_settings['port'])
smtp.login(self.mail_settings['from'], self.mail_settings['password'])
smtp.sendmail(self.mail_settings['from'], [self.mail_settings['to']], msg.as_string())
smtp.quit()
self.stats.inc_value('monitor/email_sent')
except Exception as e:
logging.error(f"发送邮件失败: {e}")
def _send_dingding(self, content):
"""通过钉钉机器人发送报警"""
if not self.dingding_webhook:
return
try:
headers = {'Content-Type': 'application/json'}
data = {
"msgtype": "text",
"text": {
"content": content
}
}
response = requests.post(self.dingding_webhook, json=data, headers=headers)
response.raise_for_status()
self.stats.inc_value('monitor/dingding_sent')
except Exception as e:
logging.error(f"发送钉钉消息失败: {e}")步骤四:配置Scrapy启用监控
在settings.py中启用我们编写的监控扩展,并配置报警通道。
# settings.py
# ... 其他配置 ...
# 扩展配置
EXTENSIONS = {
'zhilian_monitor.monitor.SpiderMonitor': 500,
}
# 邮件配置
MAIL_SETTINGS = {
'host': 'smtp.qq.com', # 例如QQ邮箱SMTP服务器
'port': 465,
'from': 'your_email@qq.com',
'password': 'your_authorization_code', # 注意是授权码,不是密码
'to': 'admin@yourcompany.com',
}
# 钉钉机器人Webhook地址
DINGDING_WEBHOOK = 'https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN_HERE'
# 启用Scrapy状态收集
STATS_DUMP = True完成以上配置后,运行你的Scrapy爬虫。当发生以下情况时,你将收到相应的报警:
本文实现的监控系统是一个强大的起点,但在生产环境中还可以进一步优化:
stats)上报到Prometheus + Grafana,构建实时数据大屏。“工欲善其事,必先利其器”。为爬虫构建监控报警系统,就如同为数据采集这艘航船安装了雷达和声呐。它不能避免所有风浪,但能让你在触礁前及时转向,在迷航时找到方向。通过本文介绍的Python技术方案,你可以以较低的成本,为你的“前程无忧”爬虫乃至任何Scrapy爬虫项目,赋予强大的状态感知能力和快速的问题响应能力,从而确保数据生产的稳定与高效。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。