前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >实战 | 如何利用 Scrapy 编写一个完整的爬虫!

实战 | 如何利用 Scrapy 编写一个完整的爬虫!

作者头像
AirPython
发布2022-12-29 14:49:35
6440
发布2022-12-29 14:49:35
举报
文章被收录于专栏:Python 自动化

大家好,我是安果!

提到爬虫框架,这里不得不提 Scrapy,它是一款非常强大的分布式异步爬虫框架,更加适用于企业级的爬虫!

项目地址:

https://github.com/scrapy/scrapy

本篇文章将借助一个简单实例来聊聊使用 Scrapy 编写爬虫的完整流程

1. 实战

目标对象:

aHR0cHMlM0EvL2dvLmNxbW1nby5jb20vZm9ydW0tMjMzLTEuaHRtbA==

我们需要爬取目标网站下帖子的基本信息

2-1 安装依赖

代码语言:javascript
复制
# 安装依赖
pip3 install Scrapy

# Mysql
pip3 install mysqlclient

2-2 创建项目及爬虫

分析目前地址,获取网站 HOST 及爬取地址,在某个文件夹下利用命令创建一个爬虫项目及一个爬虫

代码语言:javascript
复制
# 创建一个爬虫项目
scrapy startproject cqmmgo

# 打开文件夹
cd cqmmgo

# 创建一个爬虫
scrapy genspider talk 网站HOST

2-3 定义 Item 实体对象

在 items.py 文件中,将需要爬取的数据定义为 Item

比如,这里就需要爬取帖子标题、作者、阅读数、评论数、贴子 URL、发布时间

代码语言:javascript
复制
# items.py

import scrapy

# 杂谈
class CqTalkItem(scrapy.Item):
    # 标题
    title = scrapy.Field()

    # 作者
    author = scrapy.Field()

    # 查看次数
    watch_num = scrapy.Field()

    # 评论次数
    comment_num = scrapy.Field()

    # 地址
    address_url = scrapy.Field()

    # 发布时间
    create_time = scrapy.Field()

2-4 编写爬虫

在 spiders 文件夹下的爬虫文件中编写具体的爬虫逻辑

通过分析发现,帖子数据是通过模板直接渲染,非动态加载,因此我们直接对 response 进行数据解析

PS:解析方式这里推荐使用 Xpath

解析完成的数据组成上面定义的 Item 实体添加到生成器中

代码语言:javascript
复制
# spiders/talk.py

import scrapy
from cqmmgo.items import CqTalkItem
from cqmmgo.settings import talk_hour_before
from cqmmgo.utils import calc_interval_hour

class TalkSpider(scrapy.Spider):
    name = 'talk'
    allowed_domains = ['HOST']

    # 第1-5页数据
    start_urls = ['https://HOST/forum-233-{}.html'.format(i + 1) for i in range(5)]

    def parse(self, response):
        # 直接Xpath解析
        elements = response.xpath('//div[contains(@class,"list-data-item")]')

        for element in elements:
            item = CqTalkItem()
            # title = element.xpath('.//span[@class="has-businessTag"]/text()').extract_first()
            # title = element.xpath('.//span[@class="has-businessTag"]/text()').extract()
            # title = element.xpath('.//*[@class="subject"]/a').extract_first()
            title = element.xpath('.//*[@class="subject"]/a/@title').extract_first()
            author = element.xpath(".//span[@itemprop='帖子作者']/text()").extract_first()
            watch_num = element.xpath(".//span[@class='num-read']/text()").extract_first()
            comment_num = element.xpath(".//span[@itemprop='回复数']/text()").extract_first()
            address_url = "https:" + element.xpath('.//*[@class="subject"]/a/@href').extract_first()
            create_time = element.xpath('.//span[@class="author-time"]/text()').extract_first().strip()

            # 过滤超过设定小时之前的数据
            if calc_interval_hour(create_time) > talk_hour_before:
                continue

            print(
                f"标题:{title},作者:{author},观看:{watch_num},评论:{comment_num},地址:{address_url},发布时间:{create_time}")

            item['title'] = title
            item['author'] = author
            item['watch_num'] = watch_num
            item['comment_num'] = comment_num
            item['address_url'] = address_url
            item['create_time'] = create_time

            yield item

2-5 自定义随机 UA 下载中间件

在 middlewares.py 文件中自定义随机 User Agent 下载中间件

代码语言:javascript
复制
# middlewares.py
import random  # 导入随机模块

class RandomUADownloaderMiddleware(object):
    def process_request(self, request, spider):
        # UA列表
        USER_AGENT_LIST = [
            'Opera/9.20 (Macintosh; Intel Mac OS X; U; en)',
            'Opera/9.0 (Macintosh; PPC Mac OS X; U; en)',
            'iTunes/9.0.3 (Macintosh; U; Intel Mac OS X 10_6_2; en-ca)',
            'Mozilla/4.76 [en_jp] (X11; U; SunOS 5.8 sun4u)',
            'iTunes/4.2 (Macintosh; U; PPC Mac OS X 10.2)',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.8; rv:16.0) Gecko/20120813 Firefox/16.0',
            'Mozilla/4.77 [en] (X11; I; IRIX;64 6.5 IP30)',
            'Mozilla/4.8 [en] (X11; U; SunOS; 5.7 sun4u)'
        ]

        # 随机生成一个UA
        agent = random.choice(USER_AGENT_LIST)

        # 设置到请求头中
        request.headers['User_Agent'] = agent

2-6 自定义下载管道 Pipline

在 piplines.py 文件中,自定义两个下载管道,分别将数据写入到本地 CSV 文件和 Mysql 数据中

PS:为了演示方便,这里仅展示同步写入 Mysql 数据库的方式

代码语言:javascript
复制
# piplines.py

from scrapy.exporters import CsvItemExporter
from cqmmgo.items import CqTalkItem
import MySQLdb  # 导入数据库模块

class TalkPipeline(object):
    """杂谈"""

    def __init__(self):
        self.file = open("./result/talk.csv", 'wb')
        self.exporter = CsvItemExporter(self.file, fields_to_export=[
            'title', 'author', 'watch_num', 'comment_num', 'create_time', 'address_url'
        ])
        self.exporter.start_exporting()

    def process_item(self, item, spider):
        if isinstance(item, CqTalkItem):
            self.exporter.export_item(item)
        return item

    # 关闭资源
    def close_spider(self, spider):
        self.exporter.finish_exporting()
        self.file.close()

# 数据存入到数据库(同步)
class MysqlPipeline(object):
    def __init__(self):
        # 链接mysql数据库
        self.conn = MySQLdb.connect("host", "root", "pwd", "cq", charset="utf8", use_unicode=True)
        self.cursor = self.conn.cursor()

    def process_item(self, item, spider):
        table_name = 'talk'

        # sql语句
        insert_sql = """
            insert into  {}(title,author,watch_num,comment_num,address_url,create_time,insert_time) values(%s,%s,%s,%s,%s,%s,%s)  
        """.format(table_name)

        # 从item获得数据,保存为元祖,插入数据库
        params = list()
        params.append(item.get("title", ""))
        params.append(item.get("author", ""))
        params.append(item.get("watch_num", 0))
        params.append(item.get("comment_num", 0))
        params.append(item.get("address_url", ""))
        params.append(item.get("create_time", ""))
        params.append(current_date())

        # 执行插入数据到数据库操作
        self.cursor.execute(insert_sql, tuple(params))

        # 提交,保存到数据库
        self.conn.commit()

        return item

    def close_spider(self, spider):
        """释放数据库资源"""
        self.cursor.close()
        self.conn.close()

当然,这里也可以定义一个数据去重的数据管道,通过帖子标题,对重复的数据不进行处理即可

代码语言:javascript
复制
# piplines.py

from scrapy.exceptions import DropItem

class DuplicatesPipeline(object):
    """
    Pipline去重
    """

    def __init__(self):
        self.talk_set = set()

    def process_item(self, item, spider):
        name = item['title']
        if name in self.talk_set:
            raise DropItem("重复数据,抛弃:%s" % item)

        self.talk_set.add(name)
        return item

2-7 配置爬虫配置文件

打开 settings.py 文件,对下载延迟时间、默认请求头、下载中间件、数据管道进行编辑

代码语言:javascript
复制
# settings.py

# Obey robots.txt rules
ROBOTSTXT_OBEY = False

DOWNLOAD_DELAY = 3

# Override the default request headers:
DEFAULT_REQUEST_HEADERS = {
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
    'Accept-Language': 'zh-CN,zh;q=0.9',
    'Host': 'HOST',
    'Referer': 'https://HOST/forum-233-1.html',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
}

DOWNLOADER_MIDDLEWARES = {
    'cqmmgo.middlewares.RandomUADownloaderMiddleware': 543,
    'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
}

ITEM_PIPELINES = {
    'cqmmgo.pipelines.TalkPipeline': 1,
    'cqmmgo.pipelines.MysqlPipeline': 6,
    'cqmmgo.pipelines.DuplicatesPipeline': 200,
    'cqmmgo.pipelines.CqmmgoPipeline': 300,
}

# 爬取时间限制
talk_hour_before = 24

2-8 爬虫主入口

在爬虫项目根目录下创建一个文件,通过下面的方式运行单个爬虫

代码语言:javascript
复制
# main.py

from scrapy.cmdline import execute
import sys, os

def start():
    sys.path.append(os.path.dirname(__file__))
    # 运行单个爬虫
    execute(["scrapy", "crawl", "talk"])

if __name__ == '__main__':
    start()

2. 最后

如果 Scrapy 项目中包含多个爬虫,我们可以利用 CrawlerProcess 类并发执行多个爬虫

代码语言:javascript
复制
# main.py

from scrapy.utils.project import get_project_settings
from scrapy.crawler import CrawlerProcess

# 同时运行项目下的多个爬虫
def start():
    setting = get_project_settings()
    process = CrawlerProcess(setting)

    # 不运行的爬虫
    spider_besides = ['other']

    # 所有爬虫
    for spider_name in process.spiders.list():
        if spider_name in spider_besides:
            continue
        print("现在执行爬虫:%s" % (spider_name))
        process.crawl(spider_name)
    process.start()

if __name__ == '__main__':
    start()

当然,除了 Scrapy 外,我们也可以考虑另外一款爬虫框架 Feapder

使用方法可以参考之前写的一篇文章

介绍一款能取代 Scrapy 的爬虫框架 - feapder

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-11-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 AirPython 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 实战
  • 2. 最后
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档