首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >千万级批量采集框架,就叫他UrlSpider吧

千万级批量采集框架,就叫他UrlSpider吧

作者头像
十四君
发布2019-11-28 16:35:03
4810
发布2019-11-28 16:35:03
举报
文章被收录于专栏:UrlteamUrlteam

我希望在整个理清楚数据采集这个行业的过程中,实现一个能高效的分布式的自带反爬虫的框架。。

在近期完成的一个数亿级别数据采集过程中凝聚的技术精华。单机速度4kw一天。代码规模200行

本项目难度较大,不建议初学者尝试。

目录:

UrlSpider特性

UrlSpider 环境依赖

代码整体源代码

一:UrlSpider特性:

  • 多线程任务分配,适用于超大任务队列的完成。
  • 可选两种请求方式可用获取js渲染后源码
  • tor代理等ip更换
  • mysql数据库主导的任务调度分布式

在开始写之前,先想明白爬虫需要怎样的模块

明确任务url  》 各种伪装  》发起请求  》 源码解析 》 存库

因此UrlSpider,就是设定一种模板,将多线程操作和反爬虫的设定融合其中。让新需求只需要给出一个url列的表。UrlSpider即可以分布式多线程的效率进行大批量采集。

二:UrlSpider 环境依赖

Python

import re ,os ,sys ,time ,json ,random ,MySQLdb ,requesocks ,threading,requests

1

import re ,os ,sys ,time ,json ,random ,MySQLdb ,requesocks ,threading,requests

主要需要的就是MySQLdb 和requests,requesocks

如果安装有phantomjs,该框架还可以请求到js渲染后的数据。

如果安装有tor,该框架可以在主进程进行调用跟换出口ip

三:代码整体源代码

待更新,目前初步版本github上的UrlSpider目录下。

https://github.com/luyishisi/Anti-Anti-Spider/tree/master/UrlSpider

之后会陆陆续续更新使用方式,和项目样例

Python

#!/usr/bin/env python # -*- coding: UTF-8 -*- #------------------------------------------------------------------------- # 程序:UrlSpider.py # 版本:1 # 作者:ly # 日期:编写日期2016/12/25 # 语言:Python 2.7.x # 操作:python UrlSpider.py # 功能:指定任务表,读取url,多线程采集 # 表结构(id, ip, lon_gd, lat_gd, datetime, flag) # 采用数据库批量插入优化等表结构优化 #------------------------------------------------------------------------- import re ,os ,sys ,time ,json ,random ,MySQLdb ,requesocks ,threading,requests #-------------------------------------------------- #中文编码设置 reload(sys) sys.setdefaultencoding('utf-8') Type = sys.getfilesystemencoding() #------------------------------------------------ # 代理以及tor设置。 session = requesocks.session() # session.proxies = {'http':'socks5://127.0.0.1:9050','https':'socks5://127.0.0.1:9050'} #------------------------------------------------ # 可修改的全局变量参数 Table = "table" # 表名称需修改 HOST, USER, PASSWD, DB, PORT = 'host', 'user', 'pass', 'dbname', 3306 # 数据库连接参数 select_sql = "SELECT id,url FROM %s where flag = 3 limit 30000;" # 在数据库中i已经打乱了. Update_sql = "UPDATE "+Table+" SET date=%s, flag=%s WHERE id =%s;" #数据存储 THREAD_COUNT = 50 #开启线程数 sql_num_base = 200 #自定义的执行批量插入的随机值基数,当此值为1时则每次获取数据均直接插入。 sql_num_add = 100 #自定义的随机值加数,平均而言,当单独一个线程执行sql_num_base+1/3*sql_num_add次数时执行插入 # 不可修改全局变量参数 #------------------------------------------------ schedule = 0 # 当前线程标志 ErrorList = [] WarnList = [] class Handle_HTML(threading.Thread): """docstring for Handle_HTML""" def __init__(self, lock, ThreadID, tasklist, Total_TaskNum): super(Handle_HTML, self).__init__() self.lock = lock self.ThreadID = ThreadID self.tasklist = tasklist self.Total_TaskNum = Total_TaskNum def run(self): global schedule, ErrorList connect, cursor = ConnectDB() self.lock.acquire() print "The Thread tasklist number :", len(self.tasklist) self.lock.release() total = len(self.tasklist) user_agent = 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36' date_list = [] now_requests_num = 0 for (id, url) in self.tasklist: # ------------------------- # 每个请求开始前进行进度说明,对线程上锁 self.lock.acquire() time_Now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print "Tread-%s:" % self.ThreadID, time_Now, "Already Completed:[%s] ,Also remaining:[%s]" % (schedule, self.Total_TaskNum - schedule) self.lock.release() # ------------------------ # 可伪造的头部信息 headers = { 'User-Agent': user_agent, 'Referer':'', 'X-Forwarded-For': ip, 'Accept':'*/*', 'Accept-Encoding':'gzip, deflate, sdch', 'Accept-Language':'zh-CN,zh;q=0.8', 'Cache-Control':'no-cache', 'Connection':'keep-alive', 'Host':'ditu.amap.com', 'Pragma':'no-cache', 'Referer':'' #User-Agent:Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/53.0.2785.143 Chrome/53.0.2785.143 Safari/537.36 } URL = url date = '' now_requests_num += 1 #print '*************************************',ip,i#,date_list # ------------------------- # 请求的具体请求部分 try: # -- 发起 time.sleep(random.uniform(0, 1)) response = session.get(URL, headers=headers) result = response.text.encode('utf-8') # --- 请求解析--- 自定义使用正则还是xpath或etree,接口类数据可使用json if result: date = result date_list.append([date,1,id])# 用于批量插入,需要构建为一个列表,1作为flag存入 else: date_list.append([date,0,id])# 用于批量插入,需要构建为一个列表,0作为flag存入 except Exception as e: print e time.sleep(random.uniform(0, 3)) ErrorList.append("The ip is :[%s] Error:%s\n result:%s" %(ip, e, result)) # ------------------------ # 数据插入部分 try: global sql_num_base sql_num = int(random.uniform(sql_num_base, sql_num_base + 100)) #随机一个限制数,200-300 到则进行插入 if(now_requests_num >= sql_num): now_requests_num = 0 cursor.executemany(Update_sql , date_list) connect.commit() date_list = [] print 'up',time.ctime(),'&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&',sql_num except Exception ,e: print e time.sleep(random.uniform(0, 3)) ErrorList.append("The ip is :[%s] Error:%s\n result:%s" %(ip, e, result)) # 切换线程 self.lock.acquire() schedule += 1 self.lock.release() cursor.executemany(Update_sql , date_list)#大爷的注释,,这里要保存一次 connect.commit() connect.close() def ConnectDB(): "Connect MySQLdb " connect, cursor = None, None while True: try: connect = MySQLdb.connect( host=HOST, user=USER, passwd=PASSWD, db=DB, port=PORT, charset='utf8') cursor = connect.cursor() break except MySQLdb.Error, e: print "Error %d: %s" % (e.args[0], e.args[1]) return connect, cursor def Thread_Handle(taskList, Total_TaskNum): '''多线程启动区域--无需修改''' global THREAD_COUNT lock = threading.Lock() WorksThread = [] every_thread_number = len(taskList) / THREAD_COUNT if every_thread_number == 0: THREAD_COUNT = len(taskList) every_thread_number = 1 for i in range(THREAD_COUNT): if i != THREAD_COUNT - 1: source_list = taskList[ i * every_thread_number: (i + 1) * every_thread_number] Work = Handle_HTML(lock, i, source_list, Total_TaskNum) else: source_list = taskList[i * every_thread_number:] Work = Handle_HTML(lock, i, source_list, Total_TaskNum) Work.start() WorksThread.append(Work) for Work in WorksThread: Work.join() def main(): global ErrorList, WarnList connect, cursor = ConnectDB() # 统计表总行数,依据flag = 3 try: cursor.execute("SELECT COUNT(*) FROM %s WHERE flag = 3 ;" % Table) except Exception,e: print e TaskNum = cursor.fetchall() connect.close() if TaskNum[0][0] == 0: print "Warning:There is no need to do the task!!!" else: Total_TaskNum = int(TaskNum[0][0]) while True: connect, cursor = ConnectDB()# 建立数据库连接 try: if cursor.execute(select_sql % Table):# 取任务url rows = cursor.fetchall() Thread_Handle(rows, Total_TaskNum)# 线程启动 else: break except Exception, e: print e connect.close() print "_____************_____" if ErrorList : for error in ErrorList: print error print "Error:", len(ErrorList), "Warning:",len(WarnList) if __name__ == '__main__': print "The Program start time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) start = time.time() main() print "The Program end time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "[%s]" % (time.time() - start) # raw_input("Please enter any key to exit!")

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211

#!/usr/bin/env python# -*- coding: UTF-8 -*-#-------------------------------------------------------------------------#   程序:UrlSpider.py#   版本:1#   作者:ly#   日期:编写日期2016/12/25#   语言:Python 2.7.x#   操作:python UrlSpider.py#   功能:指定任务表,读取url,多线程采集#         表结构(id, ip, lon_gd, lat_gd, datetime, flag)#        采用数据库批量插入优化等表结构优化#-------------------------------------------------------------------------import re ,os ,sys ,time ,json ,random ,MySQLdb ,requesocks ,threading,requests #--------------------------------------------------#中文编码设置reload(sys)sys.setdefaultencoding('utf-8')Type = sys.getfilesystemencoding() #------------------------------------------------# 代理以及tor设置。session = requesocks.session()# session.proxies = {'http':'socks5://127.0.0.1:9050','https':'socks5://127.0.0.1:9050'} #------------------------------------------------#   可修改的全局变量参数Table = "table" # 表名称需修改HOST, USER, PASSWD, DB, PORT = 'host', 'user', 'pass', 'dbname', 3306 # 数据库连接参数select_sql = "SELECT id,url FROM %s where flag = 3 limit 30000;" # 在数据库中i已经打乱了.Update_sql = "UPDATE "+Table+" SET date=%s, flag=%s WHERE id =%s;"  #数据存储 THREAD_COUNT =  50  #开启线程数sql_num_base = 200 #自定义的执行批量插入的随机值基数,当此值为1时则每次获取数据均直接插入。sql_num_add = 100 #自定义的随机值加数,平均而言,当单独一个线程执行sql_num_base+1/3*sql_num_add次数时执行插入#   不可修改全局变量参数#------------------------------------------------schedule = 0 # 当前线程标志ErrorList = []WarnList = [] class Handle_HTML(threading.Thread):    """docstring for Handle_HTML"""    def __init__(self, lock, ThreadID, tasklist, Total_TaskNum):        super(Handle_HTML, self).__init__()        self.lock = lock        self.ThreadID = ThreadID        self.tasklist = tasklist        self.Total_TaskNum = Total_TaskNum     def run(self):         global schedule, ErrorList        connect, cursor = ConnectDB()        self.lock.acquire()        print "The Thread tasklist number :", len(self.tasklist)        self.lock.release()        total = len(self.tasklist)        user_agent = 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36'        date_list = []        now_requests_num  = 0        for (id, url) in self.tasklist:            # -------------------------            # 每个请求开始前进行进度说明,对线程上锁            self.lock.acquire()            time_Now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())            print "Tread-%s:" % self.ThreadID, time_Now, "Already Completed:[%s] ,Also remaining:[%s]" % (schedule, self.Total_TaskNum - schedule)            self.lock.release()             # ------------------------            # 可伪造的头部信息            headers = {                    'User-Agent': user_agent,                    'Referer':'',                    'X-Forwarded-For': ip,                    'Accept':'*/*',                    'Accept-Encoding':'gzip, deflate, sdch',                    'Accept-Language':'zh-CN,zh;q=0.8',                    'Cache-Control':'no-cache',                    'Connection':'keep-alive',                    'Host':'ditu.amap.com',                    'Pragma':'no-cache',                    'Referer':''                    #User-Agent:Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/53.0.2785.143 Chrome/53.0.2785.143 Safari/537.36                    }            URL = url            date = ''            now_requests_num += 1            #print '*************************************',ip,i#,date_list            # -------------------------            # 请求的具体请求部分            try:                # -- 发起                time.sleep(random.uniform(0, 1))                response = session.get(URL, headers=headers)                result = response.text.encode('utf-8')                 # --- 请求解析--- 自定义使用正则还是xpath或etree,接口类数据可使用json                if result:                    date = result                    date_list.append([date,1,id])# 用于批量插入,需要构建为一个列表,1作为flag存入                else:                    date_list.append([date,0,id])# 用于批量插入,需要构建为一个列表,0作为flag存入             except Exception as e:                print e                time.sleep(random.uniform(0, 3))                ErrorList.append("The ip is :[%s] Error:%s\n result:%s" %(ip, e, result))             # ------------------------            # 数据插入部分            try:                global sql_num_base                sql_num = int(random.uniform(sql_num_base, sql_num_base + 100)) #随机一个限制数,200-300 到则进行插入                if(now_requests_num >= sql_num):                    now_requests_num = 0                    cursor.executemany(Update_sql , date_list)                    connect.commit()                    date_list = []                    print 'up',time.ctime(),'&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&',sql_num            except Exception ,e:                print e                time.sleep(random.uniform(0, 3))                ErrorList.append("The ip is :[%s] Error:%s\n result:%s" %(ip, e, result))            # 切换线程            self.lock.acquire()            schedule += 1            self.lock.release()        cursor.executemany(Update_sql , date_list)#大爷的注释,,这里要保存一次        connect.commit()        connect.close()  def ConnectDB():    "Connect MySQLdb "    connect, cursor = None, None    while True:        try:            connect = MySQLdb.connect(                host=HOST, user=USER, passwd=PASSWD, db=DB, port=PORT, charset='utf8')            cursor = connect.cursor()            break        except MySQLdb.Error, e:            print "Error %d: %s" % (e.args[0], e.args[1])    return connect, cursor  def Thread_Handle(taskList, Total_TaskNum):    '''多线程启动区域--无需修改'''    global THREAD_COUNT    lock = threading.Lock()    WorksThread = []    every_thread_number = len(taskList) / THREAD_COUNT    if every_thread_number == 0:        THREAD_COUNT = len(taskList)        every_thread_number = 1     for i in range(THREAD_COUNT):        if i != THREAD_COUNT - 1:            source_list = taskList[                i * every_thread_number: (i + 1) * every_thread_number]            Work = Handle_HTML(lock, i, source_list, Total_TaskNum)        else:            source_list = taskList[i * every_thread_number:]            Work = Handle_HTML(lock, i, source_list, Total_TaskNum)        Work.start()        WorksThread.append(Work)    for Work in WorksThread:        Work.join()  def main():    global ErrorList, WarnList    connect, cursor = ConnectDB()     # 统计表总行数,依据flag = 3    try:        cursor.execute("SELECT COUNT(*) FROM %s WHERE flag = 3 ;" % Table)    except Exception,e:        print e    TaskNum = cursor.fetchall()    connect.close()     if TaskNum[0][0] == 0:        print "Warning:There is no need to do the task!!!"    else:        Total_TaskNum = int(TaskNum[0][0])        while True:            connect, cursor = ConnectDB()# 建立数据库连接            try:                if cursor.execute(select_sql % Table):# 取任务url                    rows = cursor.fetchall()                    Thread_Handle(rows, Total_TaskNum)# 线程启动                else:                    break            except Exception, e:                print e            connect.close()    print "_____************_____"    if ErrorList :        for error in ErrorList:            print error    print "Error:", len(ErrorList), "Warning:",len(WarnList) if __name__ == '__main__':    print "The Program start time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())    start = time.time()    main()    print "The Program end time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "[%s]" % (time.time() - start)    # raw_input("Please enter any key to exit!")

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-12-252,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档