Scrapy中如何提高数据的插入速度

速度问题

最近工作中遇到这么一个问题,全站抓取时采用分布式:爬虫A与爬虫B,爬虫A给爬虫B喂饼,爬虫B由于各种原因运行的比较慢,达不到预期效果,所以必须对爬虫B进行优化。

提升Scrapy运行速度有很多方法,国外有大佬说过

Speed up web scraper

Here's a collection of things to try:

  1. use latest scrapy version (if not using already)
  2. check if non-standard middlewares are used
  3. try to increase CONCURRENT_REQUESTS_PER_DOMAIN, CONCURRENT_REQUESTS settings (docs) turn off logging LOG_ENABLED = False (docs)
  4. try yielding an item in a loop instead of collecting items into the items list and returning them use local cache DNS (see this thread)
  5. check if this site is using download threshold and limits your download speed (see this thread) log cpu and memory usage during the spider run - see if there are any problems there
  6. try run the same spider under scrapyd service
  7. see if grequests + lxml will perform better (ask if you need any help with implementing this solution)
  8. try running Scrapy on pypy, see Running Scrapy on PyPy

大致看了下,确实可以提高爬虫运行速度,但是对于海量数据(这里说的是百万级)还需要考虑一点的就是数据插入问题,这里我们使用的是 Mongo。

官方示例

让我们先从官方文档开始 Write items to MongoDB

import pymongo

class MongoPipeline(object):

    collection_name = 'scrapy_items'

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        self.db[self.collection_name].insert_one(dict(item))
        return item

比较简单,这里插入使用的方法是 insert_one,继续文档:

insert_one(document, bypass_document_validation=False, session=None)

Insert a single document.

>>> db.test.count({'x': 1})
0
>>> result = db.test.insert_one({'x': 1})
>>> result.inserted_id
ObjectId('54f112defba522406c9cc208')
>>> db.test.find_one({'x': 1})
{u'x': 1, u'_id': ObjectId('54f112defba522406c9cc208')}

以前经常使用的 insert 方法,已经不被赞同

insert(doc_or_docs, manipulate=True, check_keys=True, continue_on_error=False, **kwargs)

Insert a document(s) into this collection.

DEPRECATED - Use insert_one() or insert_many() instead.

Changed in version 3.0: Removed the safe parameter. Pass w=0 for unacknowledged write operations.

insert 简单理解就是插入,把我们采集到的 item 插入到数据库,这样存在一个很严重的问题,就是去重

去重

晚上有一种很流行的写法,使用 update命令,如:

self.db[self.collection_name].update({'id': item['id']}, {'$set': dict(item)}, True)

解释为:

比较重要的一点就在于process_item,在这里使用了update方法,第一个参数传入查询条件,这里使用的是id,第二个参数传入字典类型的对象,就是我们的item,第三个参数传入True,这样就可以保证,如果查询数据存在的话就更新,不存在的话就插入。这样就可以保证去重了。

这确实是一种很简单的方法,其实原理很简单,就是在每次插入数据前,对数据库中查询,是否有该 ID,如果没有就插入,如果有就放弃。

对于数据量比较少的项目,这确实是一种很简单的方法,很简单就完成了目标。

但是,我们现在说的是百万级数据,如果每一条数据在插入前,都需要去查询该数据是否在数据库,那会多么耗时,效率会大大较低,那么还有什么好办法呢?

索引

MongoDB 索引

索引能够实现高效地查询。没有索引,MongoDB 就必须扫描集合中的所有文档,才能找到匹配查询语句的文档。这种扫描毫无效率可言,需要处理大量的数据。

索引是一种特殊的数据结构,将一小块数据集保存为容易遍历的形式。索引能够存储某种特殊字段或字段集的值,并按照索引指定的方式将字段值进行排序。

我们可以借助索引,使用 insert_one方法提高效率。代码实现:

class MongoDBPipeline(object):
    def open_spider(self, spider):
        self.client = mongodb_client
        self.db = self.client.get_database()
        self.collection = self.db['test']
        # 添加唯一索引
        self.collection.create_index('id', unique=True)

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        try:
            self.collection.insert_one(dict(item))
            return item
        except DuplicateKeyError:
            spider.logger.debug(' duplicate key error collection')
            return item

其实很简单,就是在 open_spider先创建唯一索引,然后再插入数据。注意需要在process_item中使用异常处理,因为很有可能插入重复数据,到时候就会输出日志。

其他方法

mongo 除了 insert_one方法还有一种,insert_many

insert_many(documents, ordered=True, bypass_document_validation=False, session=None)

Insert an iterable of documents.

>>> db.test.count()
0
>>> result = db.test.insert_many([{'x': i} for i in range(2)])
>>> result.inserted_ids
[ObjectId('54f113fffba522406c9cc20e'), ObjectId('54f113fffba522406c9cc20f')]
>>> db.test.count()
2

这样插入的数据不再是一条,而是很多,

What's the difference between insert(), insertOne() and insertMany() methods on MongoDB

大佬有写到,可以去看看。

同时插入多条数据,减轻数据库压力。但是这个“多”到底还是多少,目前不得而知。

结语

除了更多机器和更多节点,还有很多方法可以提升 Scrapy运行速度。

今天说到的是管道阻塞问题,还有其他地方也可以优化,还需要努力。

原文发布于微信公众号 - Python爬虫与算法进阶(zhangslob)

原文发表时间:2018-03-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏深度学习之tensorflow实战篇

mongodb11天之屠龙宝刀(四)高级查询:MongoDB内嵌字段查询

mongodb11天之获取屠龙宝刀(四)高级查询:MongoDB内嵌字段查询 实战环境 IDE:nosql manager for mongodb 表...

3044
来自专栏决胜机器学习

设计模式专题(二十) ——职责链模式

设计模式专题(二十)——职责链模式 (原创内容,转载请注明来源,谢谢) 一、概述 职责链模式(Chainof Responsibility),是使多个对象都有...

3479
来自专栏更流畅、简洁的软件开发方式

分页解决方案 之 分页算法——Pager_SQL的思路和使用方法

      分页算法(也就是分页读取数据的时候使用的select 语句)面临两大难题:一个是不同的数据库使用的分页算法是不一样的(比如SQL Server 20...

2568
来自专栏扎心了老铁

spark-streaming集成Kafka处理实时数据

在这篇文章里,我们模拟了一个场景,实时分析订单数据,统计实时收益。 场景模拟 我试图覆盖工程上最为常用的一个场景: 1)首先,向Kafka里实时的写入订单数据,...

7175
来自专栏码洞

Shell文本处理编写单行指令的诀窍

小编编程资质一般,刚出道的时候使用的是windows来做程序开发,平时linux命令的知识仅限于在学校里玩ubuntu的时候学到的那丁点。在一次偶然看见项目的主...

742
来自专栏JavaQ

MySQL中如何选择VARCHAR和CHAR类型

首先,VARCHAR和CHAR是两种最主要的字符串类型。在设计用于存储字符串的表字段时,可能会对到底选哪个类型有所犹豫,确实如果不了解它们之间的区别,选择上不会...

3276
来自专栏牛肉圆粉不加葱

Spark 内存管理的前世今生(下)

在《Spark 内存管理的前世今生(上)》中,我们介绍了 UnifiedMemoryManager 是如何管理内存的。然而,UnifiedMemoryManag...

1022
来自专栏挖掘大数据

整合Kafka到spark-streaming实例

在这篇文章里,我们模拟了一个场景,实时分析订单数据,统计实时收益。

2K9
来自专栏web编程技术分享

【手把手】JavaWeb 入门级项目实战 -- 文章发布系统 (第五节)

45211
来自专栏沃趣科技

MySQL的一个表最多可以有多少个字段

问题由来 引用我们客户的原话: *创建如下表,提示我:* ? *如果我将下面表中的varchar(200),修改成text(或blob):报错变为另一个:* ?...

6909

扫码关注云+社区