【源码解读】如何充分发挥 Scrapy 的异步能力

作为一个易上手的高性能爬虫框架,Scrapy 使用 Twisted 异步网络框架处理并发请求。

但是,在日常工作和面试过程中,经常发现有些同学会笃定地认为 Scrapy 采用的是多线程并发模型。实际上,虽然 Twisted 框架提供了线程池支持,但是其核心网络部分处理逻辑依赖的是「单线程 IO 多路复用」技术,在 Linux 平台上,是围绕 epoll() 系统调用实现的 Reactor 模式。

为了利用好 Scrapy 的异步任务能力,避免写出 “使用 urllib 和 requests 库完成 HTTP 请求” 这样的错误代码,本文将 Scrapy 各个组件的异步能力及可以使用什么样的异步技术进行一些总结。

可扩展组件:

Spider Middleware - 它是处于 EngineSpider 之间的组件,可以用于处理 Spider 的输入 (response)和输出(itemrequest)。它一般可以用于:处理 Spider 回调函数的输出,可以用于修改、增加和删除 request 或者 item;处理 Spider.start_requests() 函数生成的 request;捕捉 Spider 回调函数抛出的异常等等。用户自己实现的 Spider Middleware 可以定义一个或多个如下方法:

  • process_spider_input(response, spider) - 每个响应 repsonse 进入 Spider 回调函数之前可由该 方法处理。
  • process_spider_output(response, result, spider) - Spider 处理完响应 response 产生的结果 result 可经该方法处理。
  • process_spider_exception(response, exception, spider) - Spider 回调函数、其它 Spider Middlewareprocess_spider_input 方法抛出的异常可由该方法处理。
  • process_start_requests(start_requests, spider) - Spider 启动后由 start_requests() 方法产生 的 Request 可经方法处理。

Downloader Middleware - 它是处于 EngineDownloader 之间的组件,可以用于处理从 Engine 传递 给 Downloaderrequest 和从 Downloader 传递给 Engineresponse。它一般可用于:处理即将发到网络上的请求;修改传递即将给 Spider 的响应数据;丢掉响应数据,然后生成一个新的请求;根据请求凭空构造一个响 应(并不发出实际的请求);丢弃某些请求等等。用户自己实现的 Downloader Middleware 可以定义一个或多个如下 方法:

  • process_request(request, spider) - 这个方法可以处理每一个经过该中件间的 request。它可以返回NoneResponse 实例、Request 实例或者抛出 IgnoreRequest 异常。
  • process_responsee(response, spider) -这个方法可以处理每一个经过该中件间的 response。它可以返回Response 实例、Request 实例或者抛出 IgnoreRequest 异常。
  • process_exception(request, exception, spider) - 这个方法可以处理下载器或者 Downloader Middlewareprocess_request 抛出的包括 IgnoreRequest 在内的所有异常。它可以返回 NoneResponse 实例 或者 Request 实例。

Item Pipeline - 它用于处理 Spider 生成的 item,对其进行清理、验证、持久化等处理。用户自己实现的Item Pipeline 可以定义一个或多个如下方法:

  • process_item(item, spider) - 它用来处理 Spider 生成的 item。它可以返回字段类型的数据、Item 实例、Deferred 实例或者抛出 DropItem 异常。
  • open_spider(spider) - Spider 打开时调用。
  • close_spider(spider) - Spider 关闭时调用。
  • from_crawler(cls, crawler)

Scheduler - Scheduler接收来自engine的请求,并在engine请求它们时将它们排入队列以便稍后(也引导到engine)。

Extension - 提供了向 Scrapy 中插入自定义功能的机制。Extension 是普通的类,它们在 Scrapy 启动时实例化。 通常,Extension 实现向 Scrapy 注册信号处理函数,由信号触发完成相应工作。

Spider - Spiders是由Scrapy用户编写的自定义类,用于解析响应并从中提取items(也称为下载的items)或其他要跟进的requests。

异步手段

Twisted Deferred

我们本节主要汇总一下 Scrapy 中哪些可扩展组件支持返回 Deferred 对象。

Item Pipeline

对于 Item Pipeline,我们从文档中已经得知,用户自定义 Item Pipelineprocess_item 可以返回 Deferred 实例。Itempipeline 的处理本身就是由 Deferred 驱动的,作为其回调函数使用的 process_item 返回的 Deferred便会插入到原始 Deferred 的处理流程中。

# scrapy.core.scraper.Scraper
    def _process_spidermw_output(self, output, request, response, spider):
        """Process each Request/Item (given in the output parameter) returned
        from the given spider
        """
        if isinstance(output, Request):
            self.crawler.engine.crawl(request=output, spider=spider)
        elif isinstance(output, (BaseItem, dict)):
            self.slot.itemproc_size += 1
            dfd = self.itemproc.process_item(output, spider)
            dfd.addBoth(self._itemproc_finished, output, response, spider)
            return dfd
        elif output is None:
            pass
        else:
            ###

Spider Middleware

对于 Spider Middleware,我们从文档得知,process_spider_inputprocess_spider_output 也均不能返回 Deferred 实例,这点我们从代码中也得到了印证:

# scrapy.core.spidermw.SpiderMiddlewareManager
def scrape_response(self, scrape_func, response, request, spider):
    # 此处 scrape_func 实际引用 scrapy.core.scraper.Scrapyer.call_spider 函数
    ...
    def process_spider_input(response):
        for method in self.methods["process_spider_input"]:
            ...
                result = method(response=response, spider=spider)
                assert ###
            ...
        return scrape_func(response, request, spider)

    def process_spider_output(response):
        for method in self.methods["process_spider_output")]:
            result = method(response=response, exception=exception, spider=spider)
            assert ###
        ...

    # scrape_func 也就是 Scraper.call_spider 函数,会将 response 包装成 0.1 秒后触发的 `Deferred`
    # 实例。这个 `Deferred` 实例由下面的 `mustbe_deferred` 函数直接返回。
        dfd = mustbe_deferred(process_spider_input, response)
    dfd.addErrback(process_spider_exception)
    dfd.addCallback(process_spider_output)
    return dfd

# scrapy.core.scraper.Scraper

def _scrape(self, response, request, spider):
    # Engine 将 Downloader 的下载结果 response 交给 Scraper 后,传递到该函数
    assert isinstance(response, (Response, Failure))

    # 此处的 `Deferred` 实例依然是由 `call_spider` 创建的那个
    dfd = self._scrape2(response, request, spider)
    dfd.addErrback(self.handle_spider_error, request, response, spider)
    dfd.addCallback(self.handle_spider_output, request, response, spider)
    return dfd

def _scrape2(self, request_result, request, spider):
    if not isinstance(request_result, Failure):
        return self.spidermw.scrape_response(
            self.call_spider, request_request, request, spider)
    ...

def call_spider(self, result, request, spider):
    result.request = request
    dfd = defer_result(result)
    dfd.addCallbacks(request.callback or spider.parse, request.errback)
    return dfd.addCallback(iterate_spider_output)

上述代码一直使用同一个 Deferred 实例,该实例由 call_spider 创建,延迟 0.1 秒后由 reactor 激活。 _scrape 函数返回后,在该 Deferred 实例上注册的 callbackerrback 有:

    callback                            errback
-------------------------------------------------------
request.callback or spider.parse        request.errback
iterate_spider_output
                                        scrape_repsonse.process_spider_exception
scrape_repsonse.process_spider_output
                                        Scraper.handle_spider_error
Scraper.handle_spider_output

根据上面的代码摘录回调函数链,Spider Middlewareprocess_spider_input 的返回值必须是 None 值 或者抛出异常,这个结论是明确的。同时,它的 process_spider_output 的输出要交由 Scraper.handle_spider_output 函数处理,这个函数的逻辑如下:

# scrapy.core.scraper.Scraper
def handle_spider_output(self, result, request, response, spider):
    ...
    it = iter_errback(result, self.handle_spider_error, request, response, spider)
    dfd = parallel(it, self.concurrent_items,
        self._process_spiderwm_output, request, response, spider)
    return dfd

def _process_spidermw_output(self, output, request, response, spider):
    """Process each Request/Item (given in the output parameter) returned
    from the given spider
    """
    if isinstance(output, Request):
        self.crawler.engine.crawl(request=output, spider=spider)
    elif isinstance(output, (BaseItem, dict)):
        self.slot.itemproc_size += 1
        dfd = self.itemproc.process_item(output, spider)
        dfd.addBoth(self._itemproc_finished, output, response, spider)
        return dfd
    elif output is None:
        pass
    else:
        ###

_process_spidermw_output 函数的逻辑可以看出,process_spider_output 如果返回 Request 实例、 BaseItem 实例 和 dict 实例以外的对象时,Scrapy 都当成错误并打错误日志。

Downloader Middleware

Downloader Middleware 来说,和 Spider Middleware 类似,文档也约定了用户实现的 process_requestprocess_response 函数不能返回 Deferred 实例。它的运行模式也和 Spider Middlerware 类似,但是 实现细节上却存在很大区别。

实际上,Downloader Middlerwareprocess_request 方法和 process_response 方法,是可以返回 Deferred 实例的。Scrapy 提供的一个下载中间件 scrapy.downloadermiddlewares.robotstxt 就利用了这种用 法,在发出实际请求之前,根据需求先去请求了网站的 robots.txt 文件。

接下来,我们从 Scrapy 这部分实现代码的角度证实一下这个结论。

首先,Engine_download 方法调用 Downloader 开始请求下载。这个方法返回 Deferred 实例。

# scrapy.core.engine.ExecutionEngine
def _download(self, request, spider):
    ...
    dwld = self.downloader.fetch(request, spider)
    dwld.addCallbacks(_on_success)
    dwld.addBoth(_on_complete)
    return dwld

然后,Downloaderfetch 方法调用 DownloaderMiddlewareManagerdownload 方法构造用于处理当 前请求的 Deferred 实例及回调函数链。

# scrapy.core.downloader.__init__.Downloader
def fetch(self, request, spider):
    ...
    dfd = self.middleware.download(self._enqueue_request, request, spider)
    return dfd.addBoth(_deactivate)

# scrapy.core.downloader.middleware.DownloaderMiddlewareManager
def download(self, download_func, request, spider):
    @defer.inlineCallbacks
    def process_request(request):
        for method in self.methods['process_request']:
            response = yield method(request=request, spider=spider)
            assert ###
                    (six.get_method_self(method).__class__.__name__, response.__class__.__name__)
            if response:
                defer.returnValue(response)
        defer.returnValue((yield download_func(request=request,spider=spider)))

    @defer.inlineCallbacks
    def process_response(response):
        assert response is not None, 'Received None in process_response'
        if isinstance(response, Request):
            defer.returnValue(response)

        for method in self.methods['process_response']:
            response = yield method(request=request, response=response,
                                    spider=spider)
            assert ###
                (six.get_method_self(method).__class__.__name__, type(response))
            if isinstance(response, Request):
                defer.returnValue(response)
        defer.returnValue(response)

    @defer.inlineCallbacks
    def process_exception(_failure):
        exception = _failure.value
        for method in self.methods['process_exception']:
            response = yield method(request=request, exception=exception,
                                    spider=spider)
            assert ###
            if response:
                defer.returnValue(response)
        defer.returnValue(_failure)

    deferred = mustbe_deferred(process_request, request)
    deferred.addErrback(process_exception)
    deferred.addCallback(process_response)
    return deferred

理清上面代码的关键是理解装饰器 twisted.internet.defer.inlineCallbacks 的用法。inlineCallbacks 装饰 的生成器函数被调用时,会返回一个生成器函数产生返回值时被激活的 Deferred 实例。

Your inlineCallbacks-enabled generator will return a Deferred object, which will result in the return value of the genrator (or will fail with a failure object if your generator raises an unhandled exception).

生成器函数中产生的 Deferred 实例使用 yield 等待求值,也就是说,inlineCallbacks 等待这些 Deferred 被激活后,将它的回调链产生的结果作为 yield 表达式的值返回。

When you call anything that results in a Deferred, you can simply yield it; your generator will automatically be resumed when the Deferred's result is availabe. The generator will be send the result of the Deferred with the send method on generators, or if the result was a failure, "throw".

Deferred 类型的值也可以被 yield 处理,此时,inlineCallbacks 仅仅把它直接作为 yield 表达式的值。

Things that are not Deferreds may also be yielded, and your generator will be resumed with the same object sent back.

回到上面的 download 函数,mustbe_deferred(process_request, request) 返回的 Deferred 实例由装饰器inlineCallbacks 生成,并且在其装饰的生成器 process_request 调用 defer.returnValue 返回值或抛出异常 时被激动,继续执行后面的 callbackerrback 链。而被 inlineCallbacks 装饰的生成器函数里被 yieldDeferred 实例由 inlineCallbacks 等待并求值。

这其中包括由 download_func 函数,也即,scrapy.core.downloader.Downloader._enqueue_request 函数生成 的 Deferred 实例。这个 Deferred 实例在对应请求被 Downloader 真正下载完成后,才被激活。

# scrapy.core.downloader.__init__.Downloader
def _process_queue(self, spider, slot):
    ...
    while slot.queue and slot.free_transfer_slots() > 0:
        ...
        request, deferred = slot.queue.popleft()
        dfd = self._download(slot, request, spider)
        dfd.chainDeferred(deferred)
        ...

综上,虽然 Downloader Middleware 的文档虽然并没有明确说明 process_requestprocess_responseprocess_exception 的返回值可以是 Deferred 类型,但是从上面对代码分析和 Scrapy 提供的一些下载中件间的 代码可以看出,这三个函数返回 Deferred 实例也是完全合法的。但是有点一点需要注意的时,这个 Deferred 实例 的最终返回值类型必须是 NoneRequestResponse 的其中之一。

其它组件

Scrapy 框架上剩下的几个可扩展组件,Scheduler, ExtensionSpider 也均不支持直接使用 Deferred 完成异步操作。

汇总

下面是 Scrapy 可扩展组件的方法返回 Deferred 实例的汇总表:

Twisted ThreadPool

Twisted 的 reactor 提供了线程池,用于执行那些无法使用非阻塞模式(本质上不支持非阻塞或者未能找到适合 Twisted 的非阻塞函数库)的操作。

Therefore, internally, Twisted makes very little use of threads. This is not to say that is makes no use of threads; there are plenty of APIs which have no non-blocking equivalent, so when Twisted needs to call those, it calls them in a thread. One prominent example of this is system host name resolution: unless you have configured Twisted to use its own DNS client in twisted.names, it will have to use your operating system's blocking APIs to map host names to IP addresses, in the reactor's thread pool. …Twisted does most things in one thread.

由上一节对 Twisted 的介绍我们知道,使用 Twisted 框架的程序基本上都是通过 reactor 循环驱动回调函数,完成业务逻辑。reactor 循环一般运行于主线程中,由 reactor.run() 函数启动, reactor.stop() 函数退出循环。

如果某它线程需要在 reactor 循环/线程中执行某函数时,这个线程需要使用 reactor.callFromThread 将此函数转 交给 reactor 线程:

def callFromThread(callable, *args, **kw):

Cause a function to be executed by the reactor thread. Use this method when you want to run a function in the reactor's thread from another thread. … If you want to call a function in the next mainloop iteration, but you're in the same thread, use callLater with a delay of 0.

如果在某个 reactor 循环的回调函数中需要执行某个阻塞操作时,可以使用 reactor.callInThread 函数将此阻塞操 作委托给独立线程:

def callInThread(callable, *args, **kw):

Run the given callable object in a separate thread, with the given arguments and keyword arguments.

如果上面的场景下,需要在回调函数中获取阻塞操作的结果的话,这时可以使用 threads.deferToThread 函数。调用者 可以通过这个函数返回的 Deferred 实例获取阻塞操作的结果:

def deferToThread(f, *args, **kwargs):

Run a function in a thread and return the result as a Deferred. 另外,需要注意的是,这个函数使用 reactor 提供的线程池。


介绍完 Twisted 框架提供的线程接口后,我们回到 Scrapy 代码树。目前版本(1.4.0)的 Scrapy 核心代码中,只有 DNS 解析的功能使用了线程池:

# scrapy.crawler.CrawlProcess

def start(self, stop_after_crawl=True):
    ...
    reactor.installResolver(self._get_dns_resolver())

def _get_dns_resolver(self):
    ...
    return CachingThreadedResolver(
        reactor=reactor,
        cache_size=cache_size,
        timeout=self.settings.getfloat("DNS_TIMEOUT")
    )

# scrapy.resolver
from twisted.internet.base import ThreadedResolver
...
class CachingThreadedResolver(ThreadedResolver):
    ...

Scrapy 代码的非核心部分中,scrapy.pipelines.files 模块中的几个文件存储中间件也大量使用了线程池来处理阻塞 任务。使用线程可以简单地使用阻塞版本的各种客户端库和存储服务通信。


我们在业务中,经常开发 Pipeline 向 MySQL 数据库中写入数据。此时一般会使用 twisted.enterprise.dbapi 提供的非阻塞数据库操作。这个模块底层维护了一个独立于 reactor 线程池的线程池,并通过 threads.deferToThreadPool 将阻塞的数据库操作,也就是 Pipeline 中的数据库操作,委托给这个线程池处理。数据库的操作结果通过 Deferred 实 例告知调用者。

异步 Request

使用 Scrapy 开发针对业务开发爬取逻辑时,我们通过 Spider 向 Scrapy 提供初始的下载 URL 以驱动整个框架开始运转。获取到响应数据后,要从其中分析出新的 URL,然后构造 Request 实例,指定响应回调函数(callbackerrback),并交给 Scrapy 继续爬取。Scrapy 拿到 URL 的响应数据后,会调用回调函数,执行业务逻辑。

在这个过程中,我们不需要了解 Scrapy 的异步原理,就可以通过 Request 完成异步网络请求,使得整个过程非常高效。那么在 Scrapy 提供的可扩展组件中能否利用 Request 发起异步的网络请求呢?

首先,对于约定方法可以返回 Request 实例的扩展组件,我们只需要像开发 Spider 代码一样,为 Request 指定实现了业务逻辑的回调函数,然后将该 Request 作为方法返回值返回给 Scrapy 框架即可。

其次,对于约定方法不支持 Request 类型返回值的扩展组件,比如 Item PipelineDownloader Middleware, 我们可以利用这些约定方法支持 Deferred 类型返回值的特性,将网络请求和 Deferred 结合起来。网络请求完成后, 才激活该 Deferred,这样原来的处理流程就可以继续向下进行了。

从 Scrapy 框架的代码中,我们可以找到这样的用法。

比如,scrapy.downloadermiddleware.robotstxt.RobotsTxtMiddleware 中间件就使用了这种方式。这个中间件的 主要任务是根据网站的 robots.txt 规则,判断当前即将发出的请求是否合法。robots.txt 文件由该中间件创建新 HTTP 请求下载。文件下载完成后,根据其中规则对原始请求进行检查,然后根据规则决定丢弃或继续原始请求的处理流程。

# scrapy.downloadermiddlewares.robotstxt.RobotsTxtMiddleware

def process_request(self, request, spider):
    ...
    d = maybeDeferred(self, robot_parser, request, spider)
    d.addCallback(self.process_request_2, request, spider)
    return d

def robot_parser(self, request, spider)
    ...
    if netloc not in self._parsers:
        # 还未下载 netloc 对应的 robots.txt 文件时,创建请求该文件的 HTTP 请求。该请求使用
        # Engine.download 函数处理,请求完成后,该函数返回的 Deferred 实例被激活。
        self._parsers[netloc] = Deferred()
        robotsurl = "%s://%s/robots.txt" % (url.scheme, url.netloc)
        robotsreq = Request(robotsurl, priority=self.DOWNLOAD_PRIORITY, meta={'dont_obey_robotstxt': True})
        dfd = self.crawler.engine.download(robotsreq, spider)
        dfd.addCallback(self._parse_robots, netloc)
        dfd.addErrback(self._logerror, robotsreq, spider)
        dfd.addErrback(self._robots_error, netloc)

    if isinstance(self._parsers[netloc], Deferred):
        # 在 robots.txt 下载成功之前,Engine 发来的请求都会通过 Deferred 实例暂缓执行。这个
        # Deferred 实例在 robots.txt 下载完成并在 _parse_robots 构建完成 RobotFileParser 对象
        # 后被激活。
        d = Deferred()
        def cb(result):
            d.callback(result)
            return result
        self._parsers[netloc].addCallback(cb)
        return d
    else:
        # netloc 对应的 robots.txt 下载完成后,直接返回对应的 RobotFileParser 对象
        return self._parsers[netloc]

def _parse_robots(self, response, netloc):
    # robots.txt 下载完成后,使用该文件内容构造 RobotFileParser 对象
    ...
    rp_dfd = self._parser[netloc]
    self._parsers[netloc] = rp
    rp_dfd.callback(rp)

def process_request_2(self, rp, request, spider):
    ...
        raise IgnoreRequest()

最后,我们还可以在任何可扩展组件中构造请求 Request 对象,在其回调函数中实现业务逻辑。然后使用scrapy.core.engine.ExecutionEngine.crawl 函数将该请求交给 Scrapy 重新调度处理。Scrapy 使用和普通 Request 相同的逻辑处理该请求。实际上,在 scrapy.core.engine.ExecutionEnginescrapy.core.scraper.Scraper 内部,都是使用该方法调度由 Spider MiddlewareDownloader Middleware 生成的 Request 对象的。


另外,从上面的分析我们可以看到,scrapy.core.engine.ExecutionEngine 提供了两种提交 Request 并异步下载 该请求的方法。我们将其用法描述如下:

  • crawl(request, spider) - 用户通过该方法向 Scrapy 提交请求,该请求和其它普通请求一样,由 Scrapy 框架统 一调度,由 Downloader 控制并发和发起频率等。并由 Downloader MiddlewareSpider Middleware 等组件处 理。该方法无返回值,业务处理需要通过请求的回调函数完成。
  • download(request, spider) - 用户通过该方法向 Scrapy 提交的请求,直接交由 Downloader 处理,由其控制 并发和发起频率。该请求不会被 Spider MiddlewareScraper 处理,也就是说请求的回调函数不会被调用。该 方法返回 Deferred 实例,请求的响应数据需要从该 Deferred 实例中获取。

原文发布于微信公众号 - Python爬虫与算法进阶(zhangslob)

原文发表时间:2019-04-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券