爬虫课堂(二十七)|使用scrapy-redis框架实现分布式爬虫(2)源码分析

我们在说Scrapy之所以不支持分布式,主要是因为有三大问题没有解决:

  1. requests队列不能集中管理。
  2. 去重逻辑不能集中管理。
  3. 保持数据逻辑不能集中管理。

scrapy-redis结合了分布式数据库Redis,重写了Scrapy一些比较关键的代码,将Scrapy变成一个可以在多个主机上同时运行的分布式爬虫。 scrapy-redis是github上的一个开源项目,可以直接下载到它的源代码 。 但是scrapy-redis的官方文档写的比较简洁,没有提及其运行原理。如果想全面的理解分布式爬虫的运行原理,还是得看scrapy-redis的源代码才行,当然在这之前需要先理解Scrapy的运行原理,不然看scrapy-redis会很费劲。把Redis加入到Scrapy之后的一个运行流程图参考下图27-1所示:

图27-1

scrapy-redis的源代码很少,也比较好懂,scrapy-redis工程的主体还是是Redis和Scrapy两个库,工程本身实现的东西不是很多,这个工程就像胶水一样,把这两个插件粘结了起来。 接下来通过分析scrapy-redis源码来理解它是如何实现分布式的爬虫系统,它的源码文件如下所示。

scrapy_redis
    __init__.py
    connection.py
    defaults.py
    dupefilter.py
    picklecompat.py
    pipelines.py
    queue.py
    scheduler.py
    spiders.py
    utils.py

connection.py和defaults.py

负责根据setting中配置实例化redis连接。被dupefilter和scheduler调用,总之涉及到redis存取的都要使用到这个模块。 connection.py源码如下:

import six

from scrapy.utils.misc import load_object

from . import defaults


# Shortcut maps 'setting name' -> 'parmater name'.
SETTINGS_PARAMS_MAP = {
    'REDIS_URL': 'url',
    'REDIS_HOST': 'host',
    'REDIS_PORT': 'port',
    'REDIS_ENCODING': 'encoding',
}


def get_redis_from_settings(settings):
    """Returns a redis client instance from given Scrapy settings object.

    This function uses ``get_client`` to instantiate the client and uses
    ``defaults.REDIS_PARAMS`` global as defaults values for the parameters. You
    can override them using the ``REDIS_PARAMS`` setting.

    Parameters
    ----------
    settings : Settings
        A scrapy settings object. See the supported settings below.

    Returns
    -------
    server
        Redis client instance.

    Other Parameters
    ----------------
    REDIS_URL : str, optional
        Server connection URL.
    REDIS_HOST : str, optional
        Server host.
    REDIS_PORT : str, optional
        Server port.
    REDIS_ENCODING : str, optional
        Data encoding.
    REDIS_PARAMS : dict, optional
        Additional client parameters.

    """
    params = defaults.REDIS_PARAMS.copy()
    params.update(settings.getdict('REDIS_PARAMS'))
    # XXX: Deprecate REDIS_* settings.
    for source, dest in SETTINGS_PARAMS_MAP.items():
        val = settings.get(source)
        if val:
            params[dest] = val

    # Allow ``redis_cls`` to be a path to a class.
    if isinstance(params.get('redis_cls'), six.string_types):
        params['redis_cls'] = load_object(params['redis_cls'])

    return get_redis(**params)


# Backwards compatible alias.
from_settings = get_redis_from_settings


def get_redis(**kwargs):
    """Returns a redis client instance.

    Parameters
    ----------
    redis_cls : class, optional
        Defaults to ``redis.StrictRedis``.
    url : str, optional
        If given, ``redis_cls.from_url`` is used to instantiate the class.
    **kwargs
        Extra parameters to be passed to the ``redis_cls`` class.

    Returns
    -------
    server
        Redis client instance.

    """
    redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS)
    url = kwargs.pop('url', None)
    if url:
        return redis_cls.from_url(url, **kwargs)
    else:
        return redis_cls(**kwargs)

defaults.py源码如下:

import redis


# For standalone use.
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'

PIPELINE_KEY = '%(spider)s:items'

REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
# Sane connection defaults.
REDIS_PARAMS = {
    'socket_timeout': 30,
    'socket_connect_timeout': 30,
    'retry_on_timeout': True,
    'encoding': REDIS_ENCODING,
}

SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'

START_URLS_KEY = '%(name)s:start_urls'
START_URLS_AS_SET = False

去重逻辑集中管理:dupefilter.py

Scrapy中的去重实现是利用集合这个数据结构,在 Scrapy 分布式中去重就需要利用一个共享的集合,那么在这里使用的就是 Redis 中的集合数据结构,我们来看下它的去重类是怎样实现的,其内实现了一个 RFPDupeFilter 类,实现如下:

import logging
import time

from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprint

from . import defaults
from .connection import get_redis_from_settings


logger = logging.getLogger(__name__)


# TODO: Rename class to RedisDupeFilter.
class RFPDupeFilter(BaseDupeFilter):
    """Redis-based request duplicates filter.

    This class can also be used with default Scrapy's scheduler.

    """

    logger = logger

    def __init__(self, server, key, debug=False):
        """Initialize the duplicates filter.

        Parameters
        ----------
        server : redis.StrictRedis
            The redis server instance.
        key : str
            Redis key Where to store fingerprints.
        debug : bool, optional
            Whether to log filtered requests.

        """
        self.server = server
        self.key = key
        self.debug = debug
        self.logdupes = True

    @classmethod
    def from_settings(cls, settings):
        """Returns an instance from given settings.

        This uses by default the key ``dupefilter:<timestamp>``. When using the
        ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
        it needs to pass the spider name in the key.

        Parameters
        ----------
        settings : scrapy.settings.Settings

        Returns
        -------
        RFPDupeFilter
            A RFPDupeFilter instance.


        """
        server = get_redis_from_settings(settings)
        # XXX: This creates one-time key. needed to support to use this
        # class as standalone dupefilter with scrapy's default scheduler
        # if scrapy passes spider on open() method this wouldn't be needed
        # TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
        key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
        debug = settings.getbool('DUPEFILTER_DEBUG')
        return cls(server, key=key, debug=debug)

    @classmethod
    def from_crawler(cls, crawler):
        """Returns instance from crawler.

        Parameters
        ----------
        crawler : scrapy.crawler.Crawler

        Returns
        -------
        RFPDupeFilter
            Instance of RFPDupeFilter.

        """
        return cls.from_settings(crawler.settings)

    def request_seen(self, request):
        """Returns True if request was already seen.

        Parameters
        ----------
        request : scrapy.http.Request

        Returns
        -------
        bool

        """
        fp = self.request_fingerprint(request)
        # This returns the number of values added, zero if already exists.
        added = self.server.sadd(self.key, fp)
        return added == 0

    def request_fingerprint(self, request):
        """Returns a fingerprint for a given request.

        Parameters
        ----------
        request : scrapy.http.Request

        Returns
        -------
        str

        """
        return request_fingerprint(request)

    @classmethod
    def from_spider(cls, spider):
        settings = spider.settings
        server = get_redis_from_settings(settings)
        dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY)
        key = dupefilter_key % {'spider': spider.name}
        debug = settings.getbool('DUPEFILTER_DEBUG')
        return cls(server, key=key, debug=debug)

    def close(self, reason=''):
        """Delete data on close. Called by Scrapy's scheduler.

        Parameters
        ----------
        reason : str, optional

        """
        self.clear()

    def clear(self):
        """Clears fingerprints data."""
        self.server.delete(self.key)

    def log(self, request, spider):
        """Logs given request.

        Parameters
        ----------
        request : scrapy.http.Request
        spider : scrapy.spiders.Spider

        """
        if self.debug:
            msg = "Filtered duplicate request: %(request)s"
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
        elif self.logdupes:
            msg = ("Filtered duplicate request %(request)s"
                   " - no more duplicates will be shown"
                   " (see DUPEFILTER_DEBUG to show all duplicates)")
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
            self.logdupes = False

在这里我们注意到同样实现了一个 request_seen() 方法,和 Scrapy 中的 request_seen() 方法实现极其类似,不过在这里集合使用的是 server 对象的 sadd() 操作,也就是集合不再是简单的一个简单数据结构了,在这里直接换成了数据库的存储方式。

鉴别重复的方式还是使用指纹,而指纹的获取同样是使用 request_fingerprint() 方法完成的。获取指纹之后就直接尝试向集合中添加这个指纹,如果添加成功,那么就代表这个指纹原本不存在于集合中,返回值就是 1,而最后的返回结果是判定添加结果是否为 0,如果为 1,那这个判定结果就是 False,也就是不重复,否则判定为重复。

这样我们就成功利用 Redis 的集合完成了指纹的记录和重复的验证。

requests队列集中管理:queue.py

有三个队列的实现,首先它实现了一个父类 Base,提供一些基本方法和属性:

class Base(object):
    """Per-spider base queue class"""

    def __init__(self, server, spider, key, serializer=None):
        """Initialize per-spider redis queue.

        Parameters
        ----------
        server : StrictRedis
            Redis client instance.
        spider : Spider
            Scrapy spider instance.
        key: str
            Redis key where to put and get messages.
        serializer : object
            Serializer object with ``loads`` and ``dumps`` methods.

        """
        if serializer is None:
            # Backward compatibility.
            # TODO: deprecate pickle.
            serializer = picklecompat
        if not hasattr(serializer, 'loads'):
            raise TypeError("serializer does not implement 'loads' function: %r"
                            % serializer)
        if not hasattr(serializer, 'dumps'):
            raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
                            % serializer)

        self.server = server
        self.spider = spider
        self.key = key % {'spider': spider.name}
        self.serializer = serializer

    def _encode_request(self, request):
        """Encode a request object"""
        obj = request_to_dict(request, self.spider)
        return self.serializer.dumps(obj)

    def _decode_request(self, encoded_request):
        """Decode an request previously encoded"""
        obj = self.serializer.loads(encoded_request)
        return request_from_dict(obj, self.spider)

    def __len__(self):
        """Return the length of the queue"""
        raise NotImplementedError

    def push(self, request):
        """Push a request"""
        raise NotImplementedError

    def pop(self, timeout=0):
        """Pop a request"""
        raise NotImplementedError

    def clear(self):
        """Clear queue/stack"""
        self.server.delete(self.key)

首先看一下_encode_request()_decode_request()方法,因为我们需要把一个Request 对象存储到数据库中,但数据库无法直接存储对象,所以需要将 Request 序列化转成字符串再存储,而这两个方法就分别是序列化和反序列化的操作,利用 pickle 库来实现,一般在调用push()Request存入数据库时会调用_encode_request()方法进行序列化,在调用pop()取出Request的时候会调用_decode_request()进行反序列化。

在父类中 len()、push() 和 pop() 方法都是未实现的,会直接抛出 NotImplementedError,因此这个类是不能直接被使用的,所以必须要实现一个子类来重写这三个方法,而不同的子类就会有不同的实现,也就有着不同的功能。

那么接下来就需要定义一些子类来继承 Base 类,并重写这几个方法,那在源码中就有三个子类的实现,它们分别是 FifoQueue、PriorityQueue、LifoQueue,我们分别来看下它们的实现原理。

首先是 FifoQueue:

class FifoQueue(Base):
    """Per-spider FIFO queue"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.brpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.rpop(self.key)
        if data:
            return self._decode_request(data)

可以看到这个类继承了Base类,并重写了 len()、push()、pop() 这三个方法,在这三个方法中都是对 server 对象的操作,而 server 对象就是一个 Redis 连接对象,我们可以直接调用其操作 Redis 的方法对数据库进行操作,可以看到这里的操作方法有 llen()、lpush()、rpop() 等,那这就代表此爬取队列是使用的 Redis的列表,序列化后的 Request 会被存入列表中,就是列表的其中一个元素,len() 方法是获取列表的长度,push() 方法中调用了 lpush() 操作,这代表从列表左侧存入数据,pop() 方法中调用了 rpop() 操作,这代表从列表右侧取出数据。

所以 Request 在列表中的存取顺序是左侧进、右侧出,所以这是有序的进出,即先进先出,英文叫做 First Input First Output,也被简称作 Fifo,而此类的名称就叫做FifoQueue。

另外还有一个与之相反的实现类,叫做LifoQueue,实现如下:

class LifoQueue(Base):
    """Per-spider LIFO queue."""

    def __len__(self):
        """Return the length of the stack"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.blpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.lpop(self.key)

        if data:
            return self._decode_request(data)

与 FifoQueue 不同的就是它的 pop() 方法,在这里使用的是 lpop() 操作,也就是从左侧出,而 push() 方法依然是使用的 lpush() 操作,是从左侧入。那么这样达到的效果就是先进后出、后进先出,英文叫做 Last In First Out,简称为 Lifo,而此类名称就叫做 LifoQueue。同时这个存取方式类似栈的操作,所以其实也可以称作 StackQueue。

另外在源码中还有一个子类实现,叫做PriorityQueue,顾名思义,它叫做优先级队列,实现如下:

class PriorityQueue(Base):
    """Per-spider priority queue abstraction using redis' sorted set"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.zcard(self.key)

    def push(self, request):
        """Push a request"""
        data = self._encode_request(request)
        score = -request.priority
        # We don't use zadd method as the order of arguments change depending on
        # whether the class is Redis or StrictRedis, and the option of using
        # kwargs only accepts strings, not bytes.
        self.server.execute_command('ZADD', self.key, score, data)

    def pop(self, timeout=0):
        """
        Pop a request
        timeout not support in this queue class
        """
        # use atomic range/remove using multi/exec
        pipe = self.server.pipeline()
        pipe.multi()
        pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
        results, count = pipe.execute()
        if results:
            return self._decode_request(results[0])

在这里我们可以看到 len()、push()、pop() 方法中使用了 server 对象的 zcard()、zadd()、zrange() 操作,可以知道这里使用的存储结果是有序集合 Sorted Set,在这个集合中每个元素都可以设置一个分数,那么这个分数就代表优先级。

在 len() 方法里调用了 zcard() 操作,返回的就是有序集合的大小,也就是爬取队列的长度,在 push() 方法中调用了 zadd() 操作,就是向集合中添加元素,这里的分数指定成 Request 的优先级的相反数,因为分数低的会排在集合的前面,所以这里高优先级的 Request 就会存在集合的最前面。pop() 方法是首先调用了 zrange() 操作取出了集合的第一个元素,因为最高优先级的 Request 会存在集合最前面,所以第一个元素就是最高优先级的 Request,然后再调用 zremrangebyrank() 操作将这个元素删除,这样就完成了取出并删除的操作。

此队列是默认使用的队列,也就是爬取队列默认是使用有序集合来存储的。

调度器:scheduler.py

ScrapyRedis 还帮我们实现了一个配合 Queue、 DupeFilter 使用的调度器 Scheduler,源文件名称是 scheduler.py。

在这里指定了一些配置,如 SCHEDULER_FLUSH_ON_START 即是否在爬取开始的时候清空爬取队列,SCHEDULER_PERSIST 即是否在爬取结束后保持爬取队列不清除,我们可以在 settings.py 里面自由配置,而此调度器很好的实现了对接。

接下来我们再看下两个核心的存取方法,实现如下:

    def enqueue_request(self, request):
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        if self.stats:
            self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
        self.queue.push(request)
        return True

    def next_request(self):
        block_pop_timeout = self.idle_before_close
        request = self.queue.pop(block_pop_timeout)
        if request and self.stats:
            self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
        return request

enqueue_request() 就是调度器向队列中添加 Request,在这里做的核心操作就是调用 Queue 的 push() 操作,同时还有一些统计和日志操作,next_request() 就是从队列中取 Request,核心操作就是调用 Queue 的 pop() 操作,那么此时如果队列中还有 Request,则会直接取出来,接着爬取,否则当队列为空时,则会重新开始爬取。

保持数据逻辑不能集中管理:pipelines.py

首先说明一点,如果我们仅仅是想把分布式的数据都保持到一个数据库中,这里的操作不是必须的,而我们之所以要先把数据集中放到Redis里是为了集中管理。 源代码如下:

from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread

from . import connection, defaults


default_serialize = ScrapyJSONEncoder().encode


class RedisPipeline(object):
    """Pushes serialized item into a redis list/queue

    Settings
    --------
    REDIS_ITEMS_KEY : str
        Redis key where to store items.
    REDIS_ITEMS_SERIALIZER : str
        Object path to serializer function.

    """

    def __init__(self, server,
                 key=defaults.PIPELINE_KEY,
                 serialize_func=default_serialize):
        """Initialize pipeline.

        Parameters
        ----------
        server : StrictRedis
            Redis client instance.
        key : str
            Redis key where to store items.
        serialize_func : callable
            Items serializer function.

        """
        self.server = server
        self.key = key
        self.serialize = serialize_func

    @classmethod
    def from_settings(cls, settings):
        params = {
            'server': connection.from_settings(settings),
        }
        if settings.get('REDIS_ITEMS_KEY'):
            params['key'] = settings['REDIS_ITEMS_KEY']
        if settings.get('REDIS_ITEMS_SERIALIZER'):
            params['serialize_func'] = load_object(
                settings['REDIS_ITEMS_SERIALIZER']
            )

        return cls(**params)

    @classmethod
    def from_crawler(cls, crawler):
        return cls.from_settings(crawler.settings)

    def process_item(self, item, spider):
        return deferToThread(self._process_item, item, spider)

    def _process_item(self, item, spider):
        key = self.item_key(item, spider)
        data = self.serialize(item)
        self.server.rpush(key, data)
        return item

    def item_key(self, item, spider):
        """Returns redis key based on given spider.

        Override this function to use a different key depending on the item
        and/or spider.

        """
        return self.key % {'spider': spider.name}

先看from_settings方法,它是这个功能的入口,是通过from_crawler方法调用的。它首先是通过'server': connection.from_settings(settings) 初始化了一个Redis server,接下来会默认进入process_item方法(这点和Scrapy是一样的),process_item会调用到_process_item方法,_process_item方法会调用item_key方法为每个爬虫的item利用spider.name做区分,最后会在_process_item方法中使用Redis的rpush方法把item放入Redis队列中。

序列化工具类:picklecompat.py

这里实现了loads和dumps两个函数,其实就是实现了一个serializer,因为redis数据库不能存储复杂对象(value部分只能是字符串,字符串列表,字符串集合和hash,key部分只能是字符串),所以在存储之前都需要序列化。这里使用的就是python的pickle模块,一个兼容py2和py3的串行化工具。 源码如下:

"""A pickle wrapper module with protocol=-1 by default."""

try:
    import cPickle as pickle  # PY2
except ImportError:
    import pickle


def loads(s):
    return pickle.loads(s)


def dumps(obj):
    return pickle.dumps(obj, protocol=-1)

爬虫:spider.py

重写这个spider主要是为了从Redis中读取要爬的URL,然后执行爬取,若爬取过程中返回更多的URL,那么继续进行直至所有的Request完成。之后继续从Redis中读取URL,循环这个过程。 在这个spider中通过connect signals.spider_idle信号实现对crawler状态的监视。当idle时,返回新的make_requests_from_url(url)给引擎,进而交给调度器调度。

    def spider_idle(self):
        """Schedules a request if available, otherwise waits."""
        # XXX: Handle a sentinel to close the spider.
        self.schedule_next_requests()
        raise DontCloseSpider

总结

到现在,我们把实现分布式的三大问题都解决了,总结如下:

  1. requests队列不能集中管理:在这里提供了三种队列,使用了Redis的列表或有序集合来维护。
  2. 去重逻辑不能集中管理:去重的实现,使用了 Redis 的集合来保存 Request 的指纹来提供重复过滤。
  3. 保持数据逻辑不能集中管理:通过把各个服务器上的Spider返回的Items打个唯一key,并都使用Redis的rpush方法把Items提交到Redis队列中。

参考资料:https://cloud.tencent.com/developer/article/1006246

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java学习

Java每日一练(2017/6/8)

Java基础 | 数据库 | Android | 学习视频 | 学习资料下载 课前导读 ●回复"每日一练"获取以前的题目! ●答案公布时间:为每期发布题目的第二...

28370
来自专栏技术博客

设计模式之四(抽象工厂模式第三回合)

抽象工厂模式:提供一个创建一系列相关或相互依赖对象的接口,而无需指定它们具体的类。

13320
来自专栏wannshan(javaer,RPC)

dubbo通信消息解析过程分析(1)

由于rpc底层涉及网络编程接口,线程模型,网络数据结构,服务协议,细到字节的处理。牵涉内容较多,今天就先从一个点说起。 说说,dubbo通过netty框架做传...

55960
来自专栏JavaWeb

Mybatis源码-XXXmapper.xml中的resultMap标签解析过程

19130
来自专栏xingoo, 一个梦想做发明家的程序员

Apache Commons CLI命令行启动

今天又看了下Hangout的源码,一般来说一个开源项目有好几种启动方式——比如可以从命令行启动,也可以从web端启动。今天就看看如何设计命令行启动... ...

24570
来自专栏源码之家

word如何自动分割成多个文档

49350
来自专栏MasiMaro 的技术博文

Windows内核中的内存管理

其中PAGED_CODE是一个WDK中提供的一个宏,只在debug版本中生效,用于判断当前的中断请求级别,当级别高于DISPATCH_LEVEL(包含这个级别)...

13920
来自专栏分布式系统进阶

Kafka中Message存储相关类大揭密Kafka源码分析-汇总

24310
来自专栏大内老A

如何解决EnterLib异常处理框架最大的局限——基于异常"类型"的异常处理策略

个人觉得EnterLib的EHAB(Exception Handling Application Block)是一个不错的异常处理框架,借助于EHAB,我们可以...

22550
来自专栏Golang语言社区

Golang中巧用defer进行错误处理

问题引入 毫无疑问,错误处理是程序的重要组成部分,有效且优雅的处理错误是大多数程序员的追求。很多程序员都有C/C++的编程背景,Golang的程序员也不例外,他...

43970

扫码关注云+社区

领取腾讯云代金券