前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一个帅气的py爬虫模块

一个帅气的py爬虫模块

作者头像
鬼麦子
发布2023-03-07 15:36:52
4010
发布2023-03-07 15:36:52
举报
文章被收录于专栏:鬼麦子鬼麦子

环境

windows11+Ubuntu+"python3.10+"+playwright

https://playwright.dev/python/docs/pages

功能特点

其实各种爬虫已经很多了,无论动态静态爬虫,基于http请求正则,还是基于浏览器,说大同小异可以有点夸张,但是好像都基于一个基础"URL",但在web安全领域,"URL"显得有些不够全面吧。

  1. 我认为一个优秀的爬虫是依据http请求,http请求包括"GET、POST、HEAD、OPTIONS、PUT、PATCH、DELETE、TRACE、CONNECT",但是用浏览器上重放post之类的请求就很麻烦吧,所以本脚本就解决了这个问题,在实际需求中如post xss,还有post之后才显示敏感信息,都需要这个功能。
  2. 动态爬虫的并发问题,我是通过打开多chrome tab来实现异步并发,可能和搞个浏览器池多进程多线程比有点慢,但是足够我的需求了。
  3. 还有收集足够的页面信息,这点就是大同小异。
  4. 反爬虫监控。
  5. 页面的动态操作。

需求与接口

需求列

  • 页面信息
  • 页面内url
  • 页面内触发的请求与响应
  • 响应内容

如图,都是现成跑完可以调用结果的。

获得页面信息

  • Title
  • 截图
  • http响应头
  • 状态码
  • http响应内容
  • 渲染后的html
  • 页面内url
  • 模拟点击
  • 页面内触发的http请求

功能实现

给一个http请求

代码语言:javascript
复制
{'headers': {
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
            'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"',
            'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1',
            'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'GET',
         'body': 'Null', 'url': 'http://192.168.72.6:80/web_vul_test/http_method/index.html'
         }

得如下结果

代码语言:javascript
复制
{
'headers': {'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36', 'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"', 'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1', 'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'},
    'method': 'GET', 
    'body': 'Null', 
    'url': 'http://192.168.72.6/web_vul_test/http_method/index.html', 
    'signal': '2fcd137a11b690821d1af032781295b0',
    'status': 0, 
    'html_md5': '2d9be5170ac9826cc42737d52709eefe',
    'title': 'HTTP 请求方法', 
    'screenshot': 'C:/Users/guimaizi/source/Web_Security_Test_Framework/Web_Test/static/image/2022-11-09/8khyOWQEsnXd4ql.jpeg', 
    'http_status_code': 200, 
    'headers_response': {'accept-ranges': 'bytes', 'connection': 'close', 'content-encoding': 'gzip', 'content-length': '1730', 'content-type': 'text/html', 'date': 'Wed, 09 Nov 2022 10:16:31 GMT', 'etag': '"b8aa-5e22f1d737740-gzip"', 'last-modified': 'Fri, 24 Jun 2022 10:34:29 GMT', 'server': 'Apache/2.4.52 (Ubuntu)', 'vary': 'Accept-Encoding'}
}

亮点功能

无论传入的是get\post\put 都通过这个函数在浏览器重放,就是通过浏览器打开url,发现这个url是post包,通过hook浏览器请求包把get变成post包,细节就不讲了,反正有代码,自己调试研究吧。

代码语言:javascript
复制
{
"headers": {"Origin": "http://192.168.72.6", "Cookie": "PHPSESSID=gd7a57gisgbef82jkukndj585r; security=low", "Accept": "application/json, text/javascript, */*; q=0.01", "X-Requested-With": "XMLHttpRequest", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36", "Referer": "http://192.168.72.6/web_vul_test/http_method/index.html", "Connection": "close", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9", "Content-Type": "application/json; charset=UTF-8"}, 
"method": "POST", 
"body": "{\"name\":\"John\",\"time\":\"2p1m\",\"url\":\"http%3A%2F%2Fwww.guimaizi.com%2F\"}",
    "url": "http://192.168.72.6:80/web_vul_test/http_method/http_method.php?method=POST_json_method&dsaa=aaasda1sdas&id=32&string=flag"
    }

{
'headers': {'Origin': 'http://192.168.72.6', 'Cookie': 'PHPSESSID=gd7a57gisgbef82jkukndj585r; security=low', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'X-Requested-With': 'XMLHttpRequest', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36', 'Referer': 'http://192.168.72.6/web_vul_test/http_method/index.html', 'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Content-Type': 'application/json; charset=UTF-8'}, 
'method': 'POST', 
'body': '{"name":"John","time":"2p1m","url":"http%3A%2F%2Fwww.guimaizi.com%2F"}',
    'url': 'http://192.168.72.6/web_vul_test/http_method/http_method.php?method=POST_json_method&dsaa=aaasda1sdas&id=32&string=flag', 
    'signal': '8a9ddef651abb095651e4fb7555d9836',
    'status': 0, 
    'html_md5': 'f88f221f86d5319f2a541766120de429', 
    'title': '',
    'screenshot': 'C:/Users/guimaizi/source/Web_Security_Test_Framework/Web_Test/static/image/2022-11-09/wXxmdaq7QOA4YJN.jpeg',
    'http_status_code': 200, 
    'headers_response': {'cache-control': 'no-store, no-cache, must-revalidate', 'connection': 'close', 'content-length': '26', 'content-type': 'text/plain;charset=UTF-8', 'date': 'Wed, 09 Nov 2022 11:06:42 GMT', 'expires': 'Thu, 19 Nov 1981 08:52:00 GMT', 'pragma': 'no-cache', 'server': 'Apache/2.4.52 (Ubuntu)'}
    }

截图:

反"爬虫检测"

其实反爬虫无非是用js检测,大概就是UA和webdriver判断,可能再加一个浏览器操作系统判断。

基本上Ubuntu跑,没毛病。

模拟点击

这里是好多年前抄的一个大佬的JS代码,原文忘记了,挺好用的,感谢。

模拟点击的目的是,尽可能的触发页面内的http请求,便于收集到更多测试接口,接口即攻击面, 真的"攻击面"这个词太装逼了,记得好像是奇某信还是大数字发明的。

其他问题

什么alert处理、点击后跳转到其他页面、点击后打开新标签,都有现成api作为处理。

遇到的截图导致运行太慢的问题,设置页面size,和截图质量搞定,所以默认的截图比较模糊,有需求的可以自己改改。

功能太多了,细节说三天三夜也说不完,自己看代码调代码吧。

代码

spider.py

代码语言:javascript
复制
# coding: utf-8
"""
@Time :    11/7/2022 13:42 
@Author:  fff
@File: test2.py
@Software: PyCharm
"""
import asyncio,Class_function,random
from playwright.async_api import async_playwright

class spider:
    def __init__(self):
        self.Core_Function = Class_function.Class_function()
        self.page_result_list=[]
        self.request_list = []
        self.response_list = []
        self.list_url = []
        self.HTML_list=[]

    async def click_function(self,page):
        '''
        ;模拟遍历点击
        :return:
        '''
        try:
            num222 = await page.evaluate('''
                window.stop();
                num222=document.querySelectorAll('*').length;
                num222;
                ''')
            # self.Core_Function.callback_logging.info(num222)
            #print(num222)
            await page.evaluate('''
                window.scrollBy(1920, 50);
                treeWalker = document.createTreeWalker(document);
                num111=0 
                while (treeWalker.nextNode() && num111<1500) {
                    console.log("[*] processing node " + treeWalker.currentNode.tagName + ' ' + treeWalker.currentNode.id);
                    if (treeWalker.currentNode.click) {
                        treeWalker.currentNode.target='';
                        treeWalker.currentNode.click();
                        num111=num111+1;
                    }
                }
            ''')
            flag_num=0
            while await page.evaluate('num111') < 666:
                #self.Core_Function.callback_logging().info('while')
                await asyncio.sleep(0.5)
                flag_num=flag_num+1
                if flag_num<3:
                    break
        except Exception as e:
            self.Core_Function.callback_logging().error(e)

    async def hook_requset(self,route):
        '''
        # hook 请求包
        :param route:
        :return:
        '''
        # print(route.request.url)
        if route.request.url == self.target_request['url']:
            if self.target_request['body'] != 'Null':
                await route.continue_(headers=self.target_request['headers'], method=self.target_request['method'],
                                      post_data=self.target_request['body'])
            elif self.target_request['method'] == 'GET':
                await route.continue_(headers=self.target_request['headers'], method=self.target_request['method'])
            else:
                await route.continue_()
        elif route.request.url != 'about:blank' and route.request.is_navigation_request():
            # print(route.request.url)
            if route.request.method == 'GET':
                request = {"headers": route.request.headers, "method": route.request.method, "url": route.request.url,
                           "body": "Null"}
                self.request_list.append(request)
            elif self.target_request['body'] != 'Null':
                request = {"headers": route.request.headers, "method": route.request.method,
                           "url": route.request.url, "body": route.request.post_data}
                self.request_list.append(request)
            # await route.continue_()
            await route.abort(error_code='aborted')
        else:
            await route.continue_()

    async def handle_popup(self, page):
        '''
        # 关闭click打开的新窗口
        :param page:
        :return:
        '''
        await page.close()

    async def handle_dialog(self, dialog):
        '''
        # 处理alert之类
        :param dialog:
        :return:
        '''
        await dialog.dismiss()

    async def handle_network_http_request(self, request):
        '''
        ; 获取页面http请求
        :param request:
        :return:
        '''
        try:
            if request.resource_type not in ['image', 'stylesheet', 'websocket', 'media', 'font']:
                request_data = {}
                # print(request.url)
                if request.post_data == None:
                    request_data['body'] = 'Null'
                else:
                    request_data['body'] = request.post_data
                request_data['url'] = request.url
                request_data['headers'] = request.headers
                request_data['method'] = request.method
                request_data['time'] = self.Core_Function.callback_time(0)
                request_data['describe'] = 'Null'
                request_data['status'] = 0
                #print(request_data)
                self.request_list.append(request_data)
        except Exception as e:
            self.Core_Function.callback_logging().error(e)

    async def handle_http_response(self, response):
        '''
        # 处理http响应
        '''
        try:
            # print(response.request.url)
            response_data = {}
            html_data={}
            if response.request.url == self.target_request['url']:
                if response.status in [200, 301, 302, 404, 500]:
                    response_data['body'] = self.target_request['body']
                    response_data['url'] = self.target_request['url']
                    response_data['headers'] = self.target_request['headers']
                    response_data['method'] = self.target_request['method']
                    response_data['http_status_code'] = response.status
                    response_data['headers_response'] = response.headers
                    html=await response.text()
                    html_md5=self.Core_Function.md5_convert(html)
                    html_data['html']=html
                    html_data['html_md5'] = html_md5
                    html_data['time'] = self.Core_Function.callback_time(0)
                    html_data['status']=0
                    response_data['html_md5']=html_md5
                    response_data['time'] = self.Core_Function.callback_time(0)
                    response_data['describe'] = 'Null'
                    response_data['status'] = 0
                    # print(response_data)
                    self.response_list.append(response_data)
                    self.HTML_list.append(html_data)
        except Exception as e:
            self.Core_Function.callback_logging().error(e)


    async def page_data(self,page,request):
        '''
        ;页面信息获取
        :param page:
        :param request:
        :return:
        '''
        html=await page.content()
        html_md5 = self.Core_Function.md5_convert(html)
        html_data={}
        html_data['html'] = html
        html_data['html_md5'] = html_md5
        html_data['time'] = self.Core_Function.callback_time(0)
        html_data['status'] = 0
        request['status'] = 0
        request['html_md5'] = html_md5
        request['title'] = await page.title()
        print(request['title'])
        await page.evaluate('''
                list_href=[]
                window.open = function(url) { console.log("new link: " + url);list_href.push(url); };
                window.close = function () { return false; };
            ''')
        await page.evaluate('''
                list_href=[]
                for(i=0;i<document.getElementsByTagName("a").length;i++){
                    list_href.push(document.getElementsByTagName("a")[i].href); //输出该页面的所有链接。
                }
            ''')
        # print(await page.content())
        filename_img = "%s/%s.jpeg" % (self.Core_Function.create_image_path(), ''.join(
            random.sample('ABCDEFGHIJKLMNOPQRSTUVWXYZ012345678zyxwvutsrqponmlkjihgfedcba', 15)))
        try:
            await page.screenshot(path=filename_img, type='jpeg', quality=15)
        except Exception as error:
            filename_img = ''
            print(error)
        request['screenshot'] = filename_img
        self.page_result_list.append(request)
        self.HTML_list.append(html_data)

    async def pages_process(self,page_context,request_list):
        '''
        ;页面控制
        :param page_context:
        :param request_list:
        :return:
        '''
        [await page_context.new_page() for http_request in  request_list]
        num=0
        for page in page_context.pages:
            try:
                page.on("request", self.handle_network_http_request)
                page.on("response", self.handle_http_response)
                page.on("dialog", self.handle_dialog)
                page.on("popup", self.handle_popup)
                #print(request_list[num])
                # 反爬虫
                js = """
                    Object.defineProperties(navigator, {webdriver:{get:()=>false}});
                    Object.defineProperties(navigator, {platform:{get:()=>'Win32'}});
                    """
                # self.page.on("response", lambda response: print("<<", response.headers, response.url))
                await page.add_init_script(js)
                self.target_request = request_list[num]
                await page.route('*', self.hook_requset)
                await page.goto(request_list[num]['url'],wait_until="commit")
                #print(page.request)
                num=num+1
            except  Exception as error:
                self.Core_Function.callback_logging().error(error)
        num=0
        for page in page_context.pages:
            await page.evaluate('''window.stop();''')
            await self.page_data(page,request_list[num])
            await asyncio.wait_for(self.click_function(page),timeout=3)
            list_href = await page.evaluate('list_href')
            #print(list(set(list_href)))
            self.list_url.extend(list(set(list_href)))
            num = num + 1
    async def browser(self,request_list):
        '''
        ;浏览器全局控制
        :param request_list:
        :return:
        '''
        request_list=self.set_request_signal(self.Core_Function.callback_http_request_list(request_list))
        async with async_playwright() as p:
            try:
                # proxy={"server": "http://127.0.0.1:8080"}
                browser = await p.chromium.launch(executable_path='%s' % self.Core_Function.Load_Dict['chrome_path'],
                                                  headless=True, timeout=19000, args=[])
                page_context = await browser.new_context(viewport={'width': 1024, 'height': 768},
                                                         user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4577.63 Safari/537.36')
                await asyncio.wait_for(self.pages_process(page_context, request_list), timeout=150)
                # print(self.response_list)
                # await asyncio.sleep(8)
                # print(self.request_list)
                # print(self.http_list)
                # print(list(set(self.list_url)))

            except Exception as e:
                self.Core_Function.callback_logging().error(e)
            finally:
                await browser.close()
    def callback_result(self):
        for line in self.page_result_list:
            for response in self.callback_result_response_list():
                if line['signal']==response['signal']:
                    line['http_status_code']=response['http_status_code']
                    line['headers_response'] = response['headers_response']
        print(self.page_result_list)


    def callback_result_request_list(self):
        print(self.request_list)

    def callback_result_response_list(self):
        return self.set_request_signal(self.response_list)

    def callback_list_url(self):
        return self.list_url

    def set_request_signal(self,request_list):
        for request in request_list:
            request['signal']=self.Core_Function.md5_convert(request['url']+request['body'])
        return request_list

    def create_request(self, url):
        '''
        ;传入url创建一个http get请求
        :param url:
        :return:
        '''
        request = {}
        request['url'] = url
        request['method'] = "GET"
        request['headers'] = {
            "Sec-Ch-Ua": "\"(Not(A:Brand\";v=\"8\", \"Chromium\";v=\"98\"",
            "Accept": "*/*",
            "Sec-Ch-Ua-Platform": "\"Windows\"",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4727.0 Safari/537.36",
            "Connection": "close", "Sec-Fetch-Site": "none", "Sec-Fetch-Dest": "document",
            "Accept-Encoding": "gzip, deflate", "Sec-Fetch-Mode": "navigate", "Upgrade-Insecure-Requests": "1",
            "Sec-Fetch-User": "?1", "Accept-Language": "zh-CN,zh;q=0.9", "Sec-Ch-Ua-Mobile": "?0",
            "referer": self.Core_Function.callback_split_url(url, 0),
            "origin": self.Core_Function.callback_split_url(url, 0)
        }
        request['body'] = "Null"
        request['content_type'] = 1
        request['status'] = 0
        # print(request)
        return request
if __name__ == '__main__':
    request_list=[{"headers": {"Origin": "http://192.168.72.6", "Cookie": "PHPSESSID=9j2v086n48q71h2aq6totqliib; security=low", "Accept": "*/*", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36", "Referer": "http://192.168.72.6/web_vul_test/test_sql.html", "Connection": "close", "Accept-Encoding": "gzip, deflate", "Content-type": "application/json", "Accept-Language": "zh-CN,zh;q=0.9"}, "method": "POST", "body": "{\"name\":\"lisi\",\"age\":50,\"data\":{\"aa\":\"c\",\"id\":3,\"bbb\":133}}", "url": "http://192.168.72.6:80/web_vul_test/php_api/json_sql.php?method=sql_inj_json_method"},{
    "headers": {
        "Cookie": "PHPSESSID=9j2v086n48q71h2aq6totqliib; security=low",
        "Accept": "*/*",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36",
        "Referer": "http://192.168.72.6/web_vul_test/test_sql.html",
        "Connection": "close",
        "Accept-Encoding": "gzip, deflate",
        "Accept-Language": "zh-CN,zh;q=0.9"
    },
    "method": "GET",
    "body": "Null",
    "url": "http://192.168.72.6:80/web_vul_test/php_api/json_sql.php?method=sql_inj_get_method&id=1dasdsadsa&ida=13243234&id=1",
    "param": {
        "param_name": "id",
        "param_type": "Int",
        "param_lenght": 1,
        "param_value": "1"
    },
    "Vul_Type": "SQL_where_int",
    "time": "2022-11-05_16-30-14"
},{'headers': {'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36', 'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"', 'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1', 'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'GET', 'body': 'Null', 'url': 'http://192.168.72.6:80/web_vul_test/http_method/index.html'
   },{"headers": {"Origin": "http://192.168.72.6", "Cookie": "PHPSESSID=gd7a57gisgbef82jkukndj585r; security=low", "Accept": "application/json, text/javascript, */*; q=0.01", "X-Requested-With": "XMLHttpRequest", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36", "Referer": "http://192.168.72.6/web_vul_test/http_method/index.html", "Connection": "close", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9", "Content-Type": "application/json; charset=UTF-8"}, "method": "POST", "body": "{\"name\":\"John\",\"time\":\"2p1m\",\"url\":\"http%3A%2F%2Fwww.guimaizi.com%2F\"}", "url": "http://192.168.72.6:80/web_vul_test/http_method/http_method.php?method=POST_json_method&dsaa=aaasda1sdas&id=32&string=flag"}
    ]
    request_list1=[
        {"headers": {"Origin": "http://192.168.72.6", "Cookie": "PHPSESSID=gd7a57gisgbef82jkukndj585r; security=low", "Accept": "application/json, text/javascript, */*; q=0.01", "X-Requested-With": "XMLHttpRequest", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36", "Referer": "http://192.168.72.6/web_vul_test/http_method/index.html", "Connection": "close", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9", "Content-Type": "application/json; charset=UTF-8"}, "method": "POST", "body": "{\"name\":\"John\",\"time\":\"2p1m\",\"url\":\"http%3A%2F%2Fwww.guimaizi.com%2F\"}",
                    "url": "http://192.168.72.6:80/web_vul_test/http_method/http_method.php?method=POST_json_method&dsaa=aaasda1sdas&id=32&string=flag"}
        ,{"headers": {"Origin": "http://192.168.72.6", "Cookie": "PHPSESSID=gd7a57gisgbef82jkukndj585r; security=low", "Accept": "application/json, text/javascript, */*; q=0.01", "X-Requested-With": "XMLHttpRequest", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36", "Referer": "http://192.168.72.6/web_vul_test/http_method/index.html", "Connection": "close", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9", "Content-Type": "application/json; charset=UTF-8"}, "method": "POST", "body": "{\"name\":\"John\",\"time\":\"2p1m\",\"url\":\"http%3A%2F%2Fwww.guimaizi.com%2F\"}", "url": "http://192.168.72.6:80/web_vul_test/http_method/http_method.php?method=POST_json_method&dsaa=aaasda1sdas&id=32&string=flag"},
        {'headers': {
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
            'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"',
            'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1',
            'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'GET',
         'body': 'Null', 'url': 'http://192.168.72.6:80/web_vul_test/http_method/index.html'
         },
        {'headers': {
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
            'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"',
            'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1',
            'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'GET',
            'body': 'Null', 'url': 'http://192.168.72.6/sleep.php'
        },        {'headers': {
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
            'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"',
            'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1',
            'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'GET',
         'body': 'Null', 'url': 'http://192.168.72.6:80/web_vul_test/http_method/333.html'
         },{'headers': {
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
            'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"',
            'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1',
            'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'GET',
         'body': 'Null', 'url': 'http://192.168.72.6:80/browser_ua.html'
         }
    ]
    task=spider()
    request_list2=[]
    for num in range(10):
        url='http://192.168.72.6/web_vul_test/spider/%s.html'%str(num)
        request_list2.append(task.create_request(url))
    request_list2.append(task.create_request('http://192.168.72.6/web_vul_test/spider/1.php'))
    asyncio.run(task.browser(request_list1))
    task.callback_result()
    #task.set_request_signal(request_list1)

Class_function.py

代码语言:javascript
复制
# coding: utf-8
"""
@Time :    2022/02/18
@Author:  guimaizi
@File: Class_function.py
@Software: PyCharm
"""
import queue, threadpool, json, random, logging, time, traceback, os, requests, string, hashlib,concurrent.futures
from urllib.parse import urlparse
from pymongo import MongoClient
from functools import wraps


class Class_function:
    def __init__(self):
        self.Path = os.path.dirname(__file__)
        self.Path = self.Path.replace('\\', '/')
        with open(r'%s/config.json' % self.Path, 'r') as load_f:
            self.Load_Dict = json.load(load_f)
        self.logger = logging.getLogger()

    def callback_mongodb(self):
        return MongoClient(self.Load_Dict['MongoDB']['IP'], username=self.Load_Dict['MongoDB']['username'],
                           password=self.Load_Dict['MongoDB']['password'],
                           authSource=self.Load_Dict['MongoDB']['DB_name'])

    def callback_logging(self):
        '''
        # 由于日志基本配置中级别设置为DEBUG,所以一下打印信息将会全部显示在控制台上
        [#]INFO: this is a loggging info message 2021-02-13 12:03:59,481 - test.py[line:6]
        logging.info('this is a loggging info message')
        logging.debug('this is a loggging debug message')
        logging.warning('this is loggging a warning message')
        logging.error('this is an loggging error message')
        logging.critical('this is a loggging critical message')
        :return:
        '''
        # logger = logging.getLogger()
        self.logger.setLevel('INFO')
        formatter = logging.Formatter("[#]%(levelname)s: %(message)s     %(asctime)s - %(filename)s[line:%(lineno)d]")
        chlr = logging.StreamHandler()  # 输出到控制台的handler
        chlr.setFormatter(formatter)
        # chlr.setLevel('INFO')  # 也可以不设置,不设置就默认用logger的level
        fhlr = logging.FileHandler('%s/Data/debuglog.log' % self.Path)  # 输出到文件的handler
        fhlr.setFormatter(formatter)
        if not self.logger.handlers:
            self.logger.addHandler(chlr)
            self.logger.addHandler(fhlr)
        return self.logger

    def callback_ranstr(self, num=8):
        # 随机字符串
        H = 'abcdefghijklmnopqrstuvwxyz0123456789.-'
        salt = random.sample(H, num)
        return ''.join(salt)

    def md5_convert(self, string):
        """
        计算字符串md5值
        :param string: 输入字符串
        :return: 字符串md5
        """
        m = hashlib.md5()
        m.update(string.encode())
        return m.hexdigest()

    def callback_time(self, num):
        '''
        0 时间+日期
        1 日期
        2 时间
        :param num:
        :return:
        '''
        if num == 1:
            return time.strftime('%Y-%m-%d', time.localtime())
        elif num == 2:
            return time.strftime('%H-%M-%S', time.localtime())
        elif num == 0:
            return time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime())

    def threadpool_function(self, function, lists, num):
        '''
        thread多线程池
        :fun 函数
        :lists 数据
        :num 线程数
        '''
        with concurrent.futures.ThreadPoolExecutor(num) as executor:
            executor.map(function, lists,timeout=10)
            #executor.shutdown(wait=True)

    def callback_txt_list(self, filename):
        '''
        返回文件txt
        :param filename:
        :return:
        '''
        try:
            list_url = []
            for i in open(filename):
                list_url.append(i.strip())
            # os.remove(filename)
            return list(set(list_url))
        except:
            return []

    def callback_domain(self, target_domain):
        '''
        www.dasda.com   result: false
        :param target_domain:
        :return:
        '''
        for domain in self.Load_Dict['Domain_List']:
            if str(target_domain.lower()).endswith('.' + domain) or str(target_domain.lower()) in self.Load_Dict['Domain_List']:
                return domain
        return False

    def callback_split_url(self, url, type):
        '''
            callback domain
            :type
                http?s://www.xxx.com:80/aaaaa?dassda=aaa
                0 http?s://www.xxx.com:80/
                1 www.xxx.com:80
                2 www.xxx.com
                3 http?s://www.xxx.com:80/aaaaa
                4 http?s://www.xxx.com:80/aaaaa?dassda=aaa
        '''
        try:
            url = urlparse(url)
            if type == 0:
                return url.scheme + '://' + url.netloc + '/'
            elif type == 1:
                return url.netloc
            elif type == 3:
                return url.scheme + '://' + url.netloc + url.path
            elif type == 4:
                return url
            elif type == 2:
                if ':' in url.netloc:
                    return url.netloc.split(':')[0]
                return url.netloc
        except:
            return False

    def callbck_requests_obj(self, url):
        headers = {
            'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36'}
        r = requests.get(url, headers=headers, timeout=8, allow_redirects=False)
        return r

    def create_image_path(self):
        '''

        :return: 图片保存路径
        '''
        image_path = '%s/Web_Test/static/image/%s' % (self.Path, self.callback_time(1))
        isExists = os.path.exists(image_path)
        if not isExists: os.makedirs(image_path)
        return image_path

    def callback_url(self, url):
        '''
        http://192.168.0.36:80/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit
        http://192.168.0.36/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit
        :param url:
        :return:
        '''
        url_urlparse = urlparse(url)
        netloc = url_urlparse.netloc.split(':')
        if len(netloc) == 2:
            if netloc[1] == '80' or netloc[1] == '443':
                url = url_urlparse._replace(netloc=netloc[0]).geturl()
                url_urlparse = urlparse(url)
        url = url_urlparse._replace(scheme=url_urlparse.scheme.lower(), netloc=url_urlparse.netloc.lower()).geturl()
        return url

    def callback_http_request_list(self, http_request_list):
        '''
        ;标准化请求
        http://192.168.0.36:80/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit
        http://192.168.0.36/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit
        :param http_request_list:
        :return:
        '''
        for line in http_request_list:
            line['url'] = self.callback_url(line['url'])
        return http_request_list

    def create_request(self, url):
        '''
        ;创建请求
        :param url:
        :return:
        '''
        request = {}
        request['url'] = url
        request['method'] = "GET"
        request['headers'] = {
            "Sec-Ch-Ua": "\"(Not(A:Brand\";v=\"8\", \"Chromium\";v=\"98\"",
            "Accept": "*/*",
            "Sec-Ch-Ua-Platform": "\"Windows\"",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4727.0 Safari/537.36",
            "Connection": "close", "Sec-Fetch-Site": "none", "Sec-Fetch-Dest": "document",
            "Accept-Encoding": "gzip, deflate", "Sec-Fetch-Mode": "navigate", "Upgrade-Insecure-Requests": "1",
            "Sec-Fetch-User": "?1", "Accept-Language": "zh-CN,zh;q=0.9", "Sec-Ch-Ua-Mobile": "?0",
            "referer": self.callback_split_url(url, 0),
            "origin": self.callback_split_url(url, 0)
        }
        request['body'] = "Null"
        request['content_type'] = 1
        request['status'] = 0
        # print(request)
        return request




if __name__ == '__main__':
    task = Class_function()
    # print(task.callback_domain('asda.dasdas.com.qq.com'))
    # print(task.callback_split_url('javascript:window.open()',4))
    print(task.callback_url('http://192.168.72.6:80/web_vul_test/test_sql.html'))

代码从我扫描器里扒下来的,可能有几个小报错,自己修修改改下,反正核心功能就是基于playwright的爬虫,不行就查api,有逻辑问题再联系我。

欢迎技术交流,欢迎找bug后联系我,我再修修改改,闲人勿扰,wechat: guimaizi。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-11-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 鬼麦子 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 环境
  • 功能特点
  • 需求与接口
  • 功能实现
  • 其他问题
  • 代码
相关产品与服务
云数据库 MongoDB
腾讯云数据库 MongoDB(TencentDB for MongoDB)是腾讯云基于全球广受欢迎的 MongoDB 打造的高性能 NoSQL 数据库,100%完全兼容 MongoDB 协议,支持跨文档事务,提供稳定丰富的监控管理,弹性可扩展、自动容灾,适用于文档型数据库场景,您无需自建灾备体系及控制管理系统。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档