在网上有大量公开的免费代理,或者我们也可以购买付费的代理IP,但是代理不论是免费的还是付费的,都不能保证都是可用的,因为可能此IP被其他人使用来爬取同样的目标站点而被封禁,或者代理服务器突然发生故障或网络繁忙。一旦我们选用了一个不可用的代理,这势必会影响爬虫的工作效率。
所以,我们需要提前做筛选,将不可用的代理剔除掉,保留可用代理。接下来我们就搭建一个高效易用的代理池。
首先需要成功安装Redis数据库并启动服务,另外还需要安装aiohttp、requests、RedisPy、pyquery、Flask库。
我们需要做到下面的几个目标,来实现易用高效的代理池。
基本模块分为4块:存储模块、获取模块、检测模块、接口模块。
以上内容是设计代理的一些基本思路。接下来我们设计整体的架构,然后用代码实现代理池。
根据上文的描述,代理池的架构可以如下图所示。
代理池分为4个模块:存储模块、获取模块、检测模块、接口模块。
接下来,我们用代码分别实现这4个模块。
这里我们使用Redis的有序集合,集合的每一个元素都是不重复的,对于代理池来说,集合的元素就变成了一个个代理,也就是IP加端口的形式,如60.207.237.111:8888,这样的一个代理就是集合的一个元素。另外,有序集合的每一个元素都有一个分数字段,分数是可以重复的,可以是浮点数类型,也可以是整数类型。该集合会根据每一个元素的分数对集合进行排序,数值小的排在前面,数值大的排在后面,这样就可以实现集合元素的排序了。
对于代理池来说,这个分数可以作为判断一个代理是否可用的标志,100为最高分,代表最可用,0为最低分,代表最不可用。如果要获取可用代理,可以从代理池中随机获取分数最高的代理,注意是随机,这样可以保证每个可用代理都会被调用到。
分数是我们判断代理稳定性的重要标准,设置分数规则如下所示。
这只是一种解决方案,当然可能还有更合理的方案。之所以设置此方案有如下几个原因。
上述代理分数的设置思路不一定是最优思路,但据个人实测,它的实用性还是比较强的。
现在我们需要定义一个类来操作数据库的有序集合,定义一些方法来实现分数的设置、代理的获取等。代码实现如下所示:
MAX_SCORE = 100 MIN_SCORE = 0 INITIAL_SCORE = 10 REDIS_HOST = 'localhost' REDIS_PORT = 6379 REDIS_PASSWORD = None REDIS_KEY = 'proxies' import redis from random import choice class RedisClient(object): def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD): """ 初始化 :param host: Redis 地址 :param port: Redis 端口 :param password: Redis密码 """ self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True) def add(self, proxy, score=INITIAL_SCORE): """ 添加代理,设置分数为最高 :param proxy: 代理 :param score: 分数 :return: 添加结果 """ if not self.db.zscore(REDIS_KEY, proxy): return self.db.zadd(REDIS_KEY, score, proxy) def random(self): """ 随机获取有效代理,首先尝试获取最高分数代理,如果最高分数不存在,则按照排名获取,否则异常 :return: 随机代理 """ result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE) if len(result): return choice(result) else: result = self.db.zrevrange(REDIS_KEY, 0, 100) if len(result): return choice(result) else: raise PoolEmptyError def decrease(self, proxy): """ 代理值减一分,分数小于最小值,则代理删除 :param proxy: 代理 :return: 修改后的代理分数 """ score = self.db.zscore(REDIS_KEY, proxy) if score and score > MIN_SCORE: print('代理', proxy, '当前分数', score, '减1') return self.db.zincrby(REDIS_KEY, proxy, -1) else: print('代理', proxy, '当前分数', score, '移除') return self.db.zrem(REDIS_KEY, proxy) def exists(self, proxy): """ 判断是否存在 :param proxy: 代理 :return: 是否存在 """ return not self.db.zscore(REDIS_KEY, proxy) == None def max(self, proxy): """ 将代理设置为MAX_SCORE :param proxy: 代理 :return: 设置结果 """ print('代理', proxy, '可用,设置为', MAX_SCORE) return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy) def count(self): """ 获取数量 :return: 数量 """ return self.db.zcard(REDIS_KEY) def all(self): """ 获取全部代理 :return: 全部代理列表 """ return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)
首先我们定义了一些常量,如MAX_SCORE
、MIN_SCORE
、INITIAL_SCORE
分别代表最大分数、最小分数、初始分数。REDIS_HOST
、REDIS_PORT
、REDIS_PASSWORD
分别代表了Redis的连接信息,即地址、端口、密码。REDIS_KEY
是有序集合的键名,我们可以通过它来获取代理存储所使用的有序集合。
接下来定义了一个RedisClient
类,这个类可以用来操作Redis的有序集合,其中定义了一些方法来对集合中的元素进行处理,它的主要功能如下所示。
__init__()
方法是初始化的方法,其参数是Redis的连接信息,默认的连接信息已经定义为常量,在__init__()
方法中初始化了一个StrictRedis
的类,建立Redis连接。add()
方法向数据库添加代理并设置分数,默认的分数是INITIAL_SCORE
,也就是10,返回结果是添加的结果。random()
方法是随机获取代理的方法,首先获取100分的代理,然后随机选择一个返回。如果不存在100分的代理,则此方法按照排名来获取,选取前100名,然后随机选择一个返回,否则抛出异常。decrease()
方法是在代理检测无效的时候设置分数减1的方法,代理传入后,此方法将代理的分数减1,如果分数达到最低值,那么代理就删除。exists()
方法可判断代理是否存在集合中。max()
方法将代理的分数设置为MAX_SCORE
,即100,也就是当代理有效时的设置。count()
方法返回当前集合的元素个数。all()
方法返回所有的代理列表,以供检测使用。定义好了这些方法,我们可以在后续的模块中调用此类来连接和操作数据库。如想要获取随机可用的代理,只需要调用random()
方法即可,得到的就是随机的可用代理。
获取模块的逻辑相对简单,首先要定义一个Crawler来从各大网站抓取代理,示例如下所示:
import json from .utils import get_page from pyquery import PyQuery as pq class ProxyMetaclass(type): def __new__(cls, name, bases, attrs): count = 0 attrs['__CrawlFunc__'] = [] for k, v in attrs.items(): if 'crawl_' in k: attrs['__CrawlFunc__'].append(k) count += 1 attrs['__CrawlFuncCount__'] = count return type.__new__(cls, name, bases, attrs) class Crawler(object, metaclass=ProxyMetaclass): def get_proxies(self, callback): proxies = [] for proxy in eval("self.{}()".format(callback)): print('成功获取到代理', proxy) proxies.append(proxy) return proxies def crawl_daili66(self, page_count=4): """ 获取代理66 :param page_count: 页码 :return: 代理 """ start_url = 'http://www.66ip.cn/{}.html' urls = [start_url.format(page) for page in range(1, page_count + 1)] for url in urls: print('Crawling', url) html = get_page(url) if html: doc = pq(html) trs = doc('.containerbox table tr:gt(0)').items() for tr in trs: ip = tr.find('td:nth-child(1)').text() port = tr.find('td:nth-child(2)').text() yield ':'.join([ip, port]) def crawl_proxy360(self): """ 获取Proxy360 :return: 代理 """ start_url = 'http://www.proxy360.cn/Region/China' print('Crawling', start_url) html = get_page(start_url) if html: doc = pq(html) lines = doc('div[name="list_proxy_ip"]').items() for line in lines: ip = line.find('.tbBottomLine:nth-child(1)').text() port = line.find('.tbBottomLine:nth-child(2)').text() yield ':'.join([ip, port]) def crawl_goubanjia(self): """ 获取Goubanjia :return: 代理 """ start_url = 'http://www.goubanjia.com/free/gngn/index.shtml' html = get_page(start_url) if html: doc = pq(html) tds = doc('td.ip').items() for td in tds: td.find('p').remove() yield td.text().replace(' ', '')
方便起见,我们将获取代理的每个方法统一定义为以crawl
开头,这样扩展的时候只需要添加crawl
开头的方法即可。
在这里实现了几个示例,如抓取代理66、Proxy360、Goubanjia三个免费代理网站,这些方法都定义成了生成器,通过yield
返回一个个代理。程序首先获取网页,然后用pyquery
解析,解析出IP加端口的形式的代理然后返回。
然后定义了一个get_proxies()
方法,将所有以crawl
开头的方法调用一遍,获取每个方法返回的代理并组合成列表形式返回。
你可能会想知道,如何获取所有以crawl
开头的方法名称呢?其实这里借助了元类来实现。我们定义了一个ProxyMetaclass
,Crawl
类将它设置为元类,元类中实现了__new__()
方法,这个方法有固定的几个参数,第四个参数attrs
中包含了类的一些属性。我们可以遍历attrs
这个参数即可获取类的所有方法信息,就像遍历字典一样,键名对应方法的名称。然后判断方法的开头是否crawl
,如果是,则将其加入到__CrawlFunc__
属性中。这样我们就成功将所有以crawl
开头的方法定义成了一个属性,动态获取到所有以crawl
开头的方法列表。
所以,如果要做扩展,我们只需要添加一个以crawl
开头的方法。例如抓取快代理,我们只需要在Crawler
类中增加crawl_kuaidaili()
方法,仿照其他几个方法将其定义成生成器,抓取其网站的代理,然后通过yield
返回代理即可。这样,我们可以非常方便地扩展,而不用关心类其他部分的实现逻辑。
代理网站的添加非常灵活,不仅可以添加免费代理,也可以添加付费代理。一些付费代理的提取方式也类似,也是通过Web的形式获取,然后进行解析。解析方式可能更加简单,如解析纯文本或JSON,解析之后以同样的形式返回即可,在此不再代码实现,可以自行扩展。
既然定义了Crawler
类,接下来再定义一个Getter
类,用来动态地调用所有以crawl
开头的方法,然后获取抓取到的代理,将其加入到数据库存储起来。
from db import RedisClient from crawler import Crawler POOL_UPPER_THRESHOLD = 10000 class Getter(): def __init__(self): self.redis = RedisClient() self.crawler = Crawler() def is_over_threshold(self): """ 判断是否达到了代理池限制 """ if self.redis.count() >= POOL_UPPER_THRESHOLD: return True else: return False def run(self): print('获取器开始执行') if not self.is_over_threshold(): for callback_label in range(self.crawler.__CrawlFuncCount__): callback = self.crawler.__CrawlFunc__[callback_label] proxies = self.crawler.get_proxies(callback) for proxy in proxies: self.redis.add(proxy)
Getter
类就是获取器类,它定义了一个变量POOL_UPPER_THRESHOLD
来表示代理池的最大数量,这个数量可以灵活配置,然后定义了is_over_threshold()
方法来判断代理池是否已经达到了容量阈值。is_over_threshold()
方法调用了RedisClient的count()
方法来获取代理的数量,然后进行判断,如果数量达到阈值,则返回True
,否则返回False
。如果不想加这个限制,可以将此方法永久返回True
。
接下来定义run()
方法。该方法首先判断了代理池是否达到阈值,然后在这里就调用了Crawler
类的__CrawlFunc__
属性,获取到所有以crawl
开头的方法列表,依次通过get_proxies()
方法调用,得到各个方法抓取到的代理,然后再利用RedisClient
的add()
方法加入数据库,这样获取模块的工作就完成了。
我们已经成功将各个网站的代理获取下来了,现在就需要一个检测模块来对所有代理进行多轮检测。代理检测可用,分数就设置为100,代理不可用,分数减1,这样就可以实时改变每个代理的可用情况。如要获取有效代理只需要获取分数高的代理即可。
由于代理的数量非常多,为了提高代理的检测效率,我们在这里使用异步请求库aiohttp来进行检测。
requests作为一个同步请求库,我们在发出一个请求之后,程序需要等待网页加载完成之后才能继续执行。也就是这个过程会阻塞等待响应,如果服务器响应非常慢,比如一个请求等待十几秒,那么我们使用requests完成一个请求就会需要十几秒的时间,程序也不会继续往下执行,而在这十几秒的时间里程序其实完全可以去做其他的事情,比如调度其他的请求或者进行网页解析等。
异步请求库就解决了这个问题,它类似JavaScript中的回调,即在请求发出之后,程序可以继续执行去做其他的事情,当响应到达时,程序再去处理这个响应。于是,程序就没有被阻塞,可以充分利用时间和资源,大大提高效率。
对于响应速度比较快的网站来说,requests同步请求和aiohttp异步请求的效果差距没那么大。可对于检测代理来说,检测一个代理一般需要十多秒甚至几十秒的时间,这时候使用aiohttp异步请求库的优势就大大体现出来了,效率可能会提高几十倍不止。
所以,我们的代理检测使用异步请求库aiohttp,实现示例如下所示:
VALID_STATUS_CODES = [200] TEST_URL = 'http://www.baidu.com' BATCH_TEST_SIZE = 100 class Tester(object): def __init__(self): self.redis = RedisClient() async def test_single_proxy(self, proxy): """ 测试单个代理 :param proxy: 单个代理 :return: None """ conn = aiohttp.TCPConnector(verify_ssl=False) async with aiohttp.ClientSession(connector=conn) as session: try: if isinstance(proxy, bytes): proxy = proxy.decode('utf-8') real_proxy = 'http://' + proxy print('正在测试', proxy) async with session.get(TEST_URL, proxy=real_proxy, timeout=15) as response: if response.status in VALID_STATUS_CODES: self.redis.max(proxy) print('代理可用', proxy) else: self.redis.decrease(proxy) print('请求响应码不合法', proxy) except (ClientError, ClientConnectorError, TimeoutError, AttributeError): self.redis.decrease(proxy) print('代理请求失败', proxy) def run(self): """ 测试主函数 :return: None """ print('测试器开始运行') try: proxies = self.redis.all() loop = asyncio.get_event_loop() # 批量测试 for i in range(0, len(proxies), BATCH_TEST_SIZE): test_proxies = proxies[i:i + BATCH_TEST_SIZE] tasks = [self.test_single_proxy(proxy) for proxy in test_proxies] loop.run_until_complete(asyncio.wait(tasks)) time.sleep(5) except Exception as e: print('测试器发生错误', e.args)
这里定义了一个类Tester
,__init__()
方法中建立了一个RedisClient
对象,供该对象中其他方法使用。接下来定义了一个test_single_proxy()
方法,这个方法用来检测单个代理的可用情况,其参数就是被检测的代理。注意,test_single_proxy()
方法前面加了async
关键词,这代表这个方法是异步的。方法内部首先创建了aiohttp的ClientSession
对象,此对象类似于requests的Session
对象,可以直接调用该对象的get()方法来访问页面。在这里,代理的设置是通过proxy
参数传递给get()
方法,请求方法前面也需要加上async
关键词来标明其是异步请求,这也是aiohttp使用时的常见写法。
测试的链接在这里定义为常量TEST_URL
。如果针对某个网站有抓取需求,建议将TEST_URL
设置为目标网站的地址,因为在抓取的过程中,代理本身可能是可用的,但是该代理的IP已经被目标网站封掉了。例如,某些代理可以正常访问百度等页面,但是对知乎来说可能就被封了,所以我们可以将TEST_URL
设置为知乎的某个页面的链接,当请求失败、代理被封时,分数自然会减下来,失效的代理就不会被取到了。
如果想做一个通用的代理池,则不需要专门设置TEST_URL
,可以将其设置为一个不会封IP的网站,也可以设置为百度这类响应稳定的网站。
我们还定义了VALID_STATUS_CODES
变量,这个变量是一个列表形式,包含了正常的状态码,如可以定义成[200]。当然某些目标网站可能会出现其他的状态码,可以自行配置。
程序在获取Response后需要判断响应的状态,如果状态码在VALID_STATUS_CODES
列表里,则代表代理可用,可以调用RedisClient
的max()
方法将代理分数设为100,否则调用decrease()
方法将代理分数减1,如果出现异常也同样将代理分数减1。
另外,我们设置了批量测试的最大值BATCH_TEST_SIZE
为100,也就是一批测试最多100个,这可以避免代理池过大时一次性测试全部代理导致内存开销过大的问题。
随后,在run()
方法里面获取了所有的代理列表,使用aiohttp分配任务,启动运行,这样就可以进行异步检测了。可参考aiohttp的官方示例:http://aiohttp.readthedocs.io/。
这样,测试模块的逻辑就完成了。
通过上述三个模块,我们已经可以做到代理的获取、检测和更新,数据库就会以有序集合的形式存储各个代理及其对应的分数,分数100代表可用,分数越小代表越不可用。
但是我们怎样方便地获取可用代理呢?可以用RedisClient
类直接连接Redis,然后调用random()
方法。这样做没问题,效率很高,但是会有几个弊端。
RedisClient
来获取代理。RedisClient
类或者数据库结构有更新,那么爬虫端必须同步这些更新,这样非常麻烦。综上考虑,为了使代理池可以作为一个独立服务运行,我们最好增加一个接口模块,并以Web API的形式暴露可用代理。
这样一来,获取代理只需要请求接口即可,以上的几个缺点弊端也可以避免。
我们使用一个比较轻量级的库Flask来实现这个接口模块,实现示例如下所示:
from flask import Flask, g from db import RedisClient __all__ = ['app'] app = Flask(__name__) def get_conn(): if not hasattr(g, 'redis'): g.redis = RedisClient() return g.redis @app.route('/') def index(): return '<h2>Welcome to Proxy Pool System</h2>' @app.route('/random') def get_proxy(): """ 获取随机可用代理 :return: 随机代理 """ conn = get_conn() return conn.random() @app.route('/count') def get_counts(): """ 获取代理池总量 :return: 代理池总量 """ conn = get_conn() return str(conn.count()) if __name__ == '__main__': app.run()
在这里,我们声明了一个Flask
对象,定义了三个接口,分别是首页、随机代理页、获取数量页。
运行之后,Flask
会启动一个Web服务,我们只需要访问对应的接口即可获取到可用代理。
调度模块就是调用以上所定义的三个模块,将这三个模块通过多进程的形式运行起来,示例如下所示:
TESTER_CYCLE = 20 GETTER_CYCLE = 20 TESTER_ENABLED = True GETTER_ENABLED = True API_ENABLED = True from multiprocessing import Process from api import app from getter import Getter from tester import Tester class Scheduler(): def schedule_tester(self, cycle=TESTER_CYCLE): """ 定时测试代理 """ tester = Tester() while True: print('测试器开始运行') tester.run() time.sleep(cycle) def schedule_getter(self, cycle=GETTER_CYCLE): """ 定时获取代理 """ getter = Getter() while True: print('开始抓取代理') getter.run() time.sleep(cycle) def schedule_api(self): """ 开启API """ app.run(API_HOST, API_PORT) def run(self): print('代理池开始运行') if TESTER_ENABLED: tester_process = Process(target=self.schedule_tester) tester_process.start() if GETTER_ENABLED: getter_process = Process(target=self.schedule_getter) getter_process.start() if API_ENABLED: api_process = Process(target=self.schedule_api) api_process.start()
三个常量TESTER_ENABLED
、GETTER_ENABLED
、API_ENABLED
都是布尔类型,表示测试模块、获取模块、接口模块的开关,如果都为True
,则代表模块开启。
启动入口是run()
方法,这个方法分别判断三个模块的开关。如果开关开启,启动时程序就新建一个Process进程,设置好启动目标,然后调用start()
方法运行,这样三个进程就可以并行执行,互不干扰。
三个调度方法结构也非常清晰。比如,schedule_tester()
方法用来调度测试模块,首先声明一个Tester
对象,然后进入死循环不断循环调用其run()
方法,执行完一轮之后就休眠一段时间,休眠结束之后重新再执行。在这里,休眠时间也定义为一个常量,如20秒,即每隔20秒进行一次代理检测。
最后,只需要调用Scheduler的run()
方法即可启动整个代理池。
以上内容便是整个代理池的架构和相应实现逻辑。
接下来,我们将代码整合一下,将代理运行起来,运行之后的输出结果如下图所示。
以上是代理池的控制台输出,可以看到,可用代理设置为100,不可用代理分数减1。
我们再打开浏览器,当前配置了运行在5555端口,所以打开http://127.0.0.1:5555,即可看到其首页,如下图所示。
再访问:http://127.0.0.1:5555/random,即可获取随机可用代理,如下图所示。
我们只需要访问此接口即可获取一个随机可用代理,这非常方便。
获取代理的代码如下所示:
import requests PROXY_POOL_URL = 'http://localhost:5555/random' def get_proxy(): try: response = requests.get(PROXY_POOL_URL) if response.status_code == 200: return response.text except ConnectionError: return None
之后便是一个字符串类型的代理,此代理可以按照上一节所示的方法设置,如requests的使用方法如下所示:
import requests proxy = get_proxy() proxies = { 'http': 'http://' + proxy, 'https': 'https://' + proxy, } try: response = requests.get('http://httpbin.org/get', proxies=proxies) print(response.text) except requests.exceptions.ConnectionError as e: print('Error', e.args)
有了代理池之后,我们再取出代理即可有效防止IP被封禁的情况。
本节代码地址为:https://github.com/Python3WebSpider/ProxyPool。
本节实现了一个比较高效的代理池,来获取随机可用的代理。接下来,我们会利用代理池来实现数据的抓取。
崔庆才
静觅博客博主,《Python3网络爬虫开发实战》作者
本文分享自微信公众号 - 进击的Coder(FightingCoder),作者:崔庆才
原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。
原始发表时间:2018-03-30
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句