windows11+Ubuntu+"python3.10+"+playwright
https://playwright.dev/python/docs/pages
其实各种爬虫已经很多了,无论动态静态爬虫,基于http请求正则,还是基于浏览器,说大同小异可以有点夸张,但是好像都基于一个基础"URL",但在web安全领域,"URL"显得有些不够全面吧。
需求列
如图,都是现成跑完可以调用结果的。
获得页面信息
给一个http请求
{'headers': {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"',
'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1',
'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'GET',
'body': 'Null', 'url': 'http://192.168.72.6:80/web_vul_test/http_method/index.html'
}
得如下结果
{
'headers': {'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36', 'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"', 'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1', 'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'},
'method': 'GET',
'body': 'Null',
'url': 'http://192.168.72.6/web_vul_test/http_method/index.html',
'signal': '2fcd137a11b690821d1af032781295b0',
'status': 0,
'html_md5': '2d9be5170ac9826cc42737d52709eefe',
'title': 'HTTP 请求方法',
'screenshot': 'C:/Users/guimaizi/source/Web_Security_Test_Framework/Web_Test/static/image/2022-11-09/8khyOWQEsnXd4ql.jpeg',
'http_status_code': 200,
'headers_response': {'accept-ranges': 'bytes', 'connection': 'close', 'content-encoding': 'gzip', 'content-length': '1730', 'content-type': 'text/html', 'date': 'Wed, 09 Nov 2022 10:16:31 GMT', 'etag': '"b8aa-5e22f1d737740-gzip"', 'last-modified': 'Fri, 24 Jun 2022 10:34:29 GMT', 'server': 'Apache/2.4.52 (Ubuntu)', 'vary': 'Accept-Encoding'}
}
亮点功能
无论传入的是get\post\put 都通过这个函数在浏览器重放,就是通过浏览器打开url,发现这个url是post包,通过hook浏览器请求包把get变成post包,细节就不讲了,反正有代码,自己调试研究吧。
{
"headers": {"Origin": "http://192.168.72.6", "Cookie": "PHPSESSID=gd7a57gisgbef82jkukndj585r; security=low", "Accept": "application/json, text/javascript, */*; q=0.01", "X-Requested-With": "XMLHttpRequest", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36", "Referer": "http://192.168.72.6/web_vul_test/http_method/index.html", "Connection": "close", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9", "Content-Type": "application/json; charset=UTF-8"},
"method": "POST",
"body": "{\"name\":\"John\",\"time\":\"2p1m\",\"url\":\"http%3A%2F%2Fwww.guimaizi.com%2F\"}",
"url": "http://192.168.72.6:80/web_vul_test/http_method/http_method.php?method=POST_json_method&dsaa=aaasda1sdas&id=32&string=flag"
}
{
'headers': {'Origin': 'http://192.168.72.6', 'Cookie': 'PHPSESSID=gd7a57gisgbef82jkukndj585r; security=low', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'X-Requested-With': 'XMLHttpRequest', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36', 'Referer': 'http://192.168.72.6/web_vul_test/http_method/index.html', 'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Content-Type': 'application/json; charset=UTF-8'},
'method': 'POST',
'body': '{"name":"John","time":"2p1m","url":"http%3A%2F%2Fwww.guimaizi.com%2F"}',
'url': 'http://192.168.72.6/web_vul_test/http_method/http_method.php?method=POST_json_method&dsaa=aaasda1sdas&id=32&string=flag',
'signal': '8a9ddef651abb095651e4fb7555d9836',
'status': 0,
'html_md5': 'f88f221f86d5319f2a541766120de429',
'title': '',
'screenshot': 'C:/Users/guimaizi/source/Web_Security_Test_Framework/Web_Test/static/image/2022-11-09/wXxmdaq7QOA4YJN.jpeg',
'http_status_code': 200,
'headers_response': {'cache-control': 'no-store, no-cache, must-revalidate', 'connection': 'close', 'content-length': '26', 'content-type': 'text/plain;charset=UTF-8', 'date': 'Wed, 09 Nov 2022 11:06:42 GMT', 'expires': 'Thu, 19 Nov 1981 08:52:00 GMT', 'pragma': 'no-cache', 'server': 'Apache/2.4.52 (Ubuntu)'}
}
截图:
反"爬虫检测"
其实反爬虫无非是用js检测,大概就是UA和webdriver判断,可能再加一个浏览器操作系统判断。
基本上Ubuntu跑,没毛病。
模拟点击
这里是好多年前抄的一个大佬的JS代码,原文忘记了,挺好用的,感谢。
模拟点击的目的是,尽可能的触发页面内的http请求,便于收集到更多测试接口,接口即攻击面, 真的"攻击面"这个词太装逼了,记得好像是奇某信还是大数字发明的。
什么alert处理、点击后跳转到其他页面、点击后打开新标签,都有现成api作为处理。
遇到的截图导致运行太慢的问题,设置页面size,和截图质量搞定,所以默认的截图比较模糊,有需求的可以自己改改。
功能太多了,细节说三天三夜也说不完,自己看代码调代码吧。
spider.py
# coding: utf-8
"""
@Time : 11/7/2022 13:42
@Author: fff
@File: test2.py
@Software: PyCharm
"""
import asyncio,Class_function,random
from playwright.async_api import async_playwright
class spider:
def __init__(self):
self.Core_Function = Class_function.Class_function()
self.page_result_list=[]
self.request_list = []
self.response_list = []
self.list_url = []
self.HTML_list=[]
async def click_function(self,page):
'''
;模拟遍历点击
:return:
'''
try:
num222 = await page.evaluate('''
window.stop();
num222=document.querySelectorAll('*').length;
num222;
''')
# self.Core_Function.callback_logging.info(num222)
#print(num222)
await page.evaluate('''
window.scrollBy(1920, 50);
treeWalker = document.createTreeWalker(document);
num111=0
while (treeWalker.nextNode() && num111<1500) {
console.log("[*] processing node " + treeWalker.currentNode.tagName + ' ' + treeWalker.currentNode.id);
if (treeWalker.currentNode.click) {
treeWalker.currentNode.target='';
treeWalker.currentNode.click();
num111=num111+1;
}
}
''')
flag_num=0
while await page.evaluate('num111') < 666:
#self.Core_Function.callback_logging().info('while')
await asyncio.sleep(0.5)
flag_num=flag_num+1
if flag_num<3:
break
except Exception as e:
self.Core_Function.callback_logging().error(e)
async def hook_requset(self,route):
'''
# hook 请求包
:param route:
:return:
'''
# print(route.request.url)
if route.request.url == self.target_request['url']:
if self.target_request['body'] != 'Null':
await route.continue_(headers=self.target_request['headers'], method=self.target_request['method'],
post_data=self.target_request['body'])
elif self.target_request['method'] == 'GET':
await route.continue_(headers=self.target_request['headers'], method=self.target_request['method'])
else:
await route.continue_()
elif route.request.url != 'about:blank' and route.request.is_navigation_request():
# print(route.request.url)
if route.request.method == 'GET':
request = {"headers": route.request.headers, "method": route.request.method, "url": route.request.url,
"body": "Null"}
self.request_list.append(request)
elif self.target_request['body'] != 'Null':
request = {"headers": route.request.headers, "method": route.request.method,
"url": route.request.url, "body": route.request.post_data}
self.request_list.append(request)
# await route.continue_()
await route.abort(error_code='aborted')
else:
await route.continue_()
async def handle_popup(self, page):
'''
# 关闭click打开的新窗口
:param page:
:return:
'''
await page.close()
async def handle_dialog(self, dialog):
'''
# 处理alert之类
:param dialog:
:return:
'''
await dialog.dismiss()
async def handle_network_http_request(self, request):
'''
; 获取页面http请求
:param request:
:return:
'''
try:
if request.resource_type not in ['image', 'stylesheet', 'websocket', 'media', 'font']:
request_data = {}
# print(request.url)
if request.post_data == None:
request_data['body'] = 'Null'
else:
request_data['body'] = request.post_data
request_data['url'] = request.url
request_data['headers'] = request.headers
request_data['method'] = request.method
request_data['time'] = self.Core_Function.callback_time(0)
request_data['describe'] = 'Null'
request_data['status'] = 0
#print(request_data)
self.request_list.append(request_data)
except Exception as e:
self.Core_Function.callback_logging().error(e)
async def handle_http_response(self, response):
'''
# 处理http响应
'''
try:
# print(response.request.url)
response_data = {}
html_data={}
if response.request.url == self.target_request['url']:
if response.status in [200, 301, 302, 404, 500]:
response_data['body'] = self.target_request['body']
response_data['url'] = self.target_request['url']
response_data['headers'] = self.target_request['headers']
response_data['method'] = self.target_request['method']
response_data['http_status_code'] = response.status
response_data['headers_response'] = response.headers
html=await response.text()
html_md5=self.Core_Function.md5_convert(html)
html_data['html']=html
html_data['html_md5'] = html_md5
html_data['time'] = self.Core_Function.callback_time(0)
html_data['status']=0
response_data['html_md5']=html_md5
response_data['time'] = self.Core_Function.callback_time(0)
response_data['describe'] = 'Null'
response_data['status'] = 0
# print(response_data)
self.response_list.append(response_data)
self.HTML_list.append(html_data)
except Exception as e:
self.Core_Function.callback_logging().error(e)
async def page_data(self,page,request):
'''
;页面信息获取
:param page:
:param request:
:return:
'''
html=await page.content()
html_md5 = self.Core_Function.md5_convert(html)
html_data={}
html_data['html'] = html
html_data['html_md5'] = html_md5
html_data['time'] = self.Core_Function.callback_time(0)
html_data['status'] = 0
request['status'] = 0
request['html_md5'] = html_md5
request['title'] = await page.title()
print(request['title'])
await page.evaluate('''
list_href=[]
window.open = function(url) { console.log("new link: " + url);list_href.push(url); };
window.close = function () { return false; };
''')
await page.evaluate('''
list_href=[]
for(i=0;i<document.getElementsByTagName("a").length;i++){
list_href.push(document.getElementsByTagName("a")[i].href); //输出该页面的所有链接。
}
''')
# print(await page.content())
filename_img = "%s/%s.jpeg" % (self.Core_Function.create_image_path(), ''.join(
random.sample('ABCDEFGHIJKLMNOPQRSTUVWXYZ012345678zyxwvutsrqponmlkjihgfedcba', 15)))
try:
await page.screenshot(path=filename_img, type='jpeg', quality=15)
except Exception as error:
filename_img = ''
print(error)
request['screenshot'] = filename_img
self.page_result_list.append(request)
self.HTML_list.append(html_data)
async def pages_process(self,page_context,request_list):
'''
;页面控制
:param page_context:
:param request_list:
:return:
'''
[await page_context.new_page() for http_request in request_list]
num=0
for page in page_context.pages:
try:
page.on("request", self.handle_network_http_request)
page.on("response", self.handle_http_response)
page.on("dialog", self.handle_dialog)
page.on("popup", self.handle_popup)
#print(request_list[num])
# 反爬虫
js = """
Object.defineProperties(navigator, {webdriver:{get:()=>false}});
Object.defineProperties(navigator, {platform:{get:()=>'Win32'}});
"""
# self.page.on("response", lambda response: print("<<", response.headers, response.url))
await page.add_init_script(js)
self.target_request = request_list[num]
await page.route('*', self.hook_requset)
await page.goto(request_list[num]['url'],wait_until="commit")
#print(page.request)
num=num+1
except Exception as error:
self.Core_Function.callback_logging().error(error)
num=0
for page in page_context.pages:
await page.evaluate('''window.stop();''')
await self.page_data(page,request_list[num])
await asyncio.wait_for(self.click_function(page),timeout=3)
list_href = await page.evaluate('list_href')
#print(list(set(list_href)))
self.list_url.extend(list(set(list_href)))
num = num + 1
async def browser(self,request_list):
'''
;浏览器全局控制
:param request_list:
:return:
'''
request_list=self.set_request_signal(self.Core_Function.callback_http_request_list(request_list))
async with async_playwright() as p:
try:
# proxy={"server": "http://127.0.0.1:8080"}
browser = await p.chromium.launch(executable_path='%s' % self.Core_Function.Load_Dict['chrome_path'],
headless=True, timeout=19000, args=[])
page_context = await browser.new_context(viewport={'width': 1024, 'height': 768},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4577.63 Safari/537.36')
await asyncio.wait_for(self.pages_process(page_context, request_list), timeout=150)
# print(self.response_list)
# await asyncio.sleep(8)
# print(self.request_list)
# print(self.http_list)
# print(list(set(self.list_url)))
except Exception as e:
self.Core_Function.callback_logging().error(e)
finally:
await browser.close()
def callback_result(self):
for line in self.page_result_list:
for response in self.callback_result_response_list():
if line['signal']==response['signal']:
line['http_status_code']=response['http_status_code']
line['headers_response'] = response['headers_response']
print(self.page_result_list)
def callback_result_request_list(self):
print(self.request_list)
def callback_result_response_list(self):
return self.set_request_signal(self.response_list)
def callback_list_url(self):
return self.list_url
def set_request_signal(self,request_list):
for request in request_list:
request['signal']=self.Core_Function.md5_convert(request['url']+request['body'])
return request_list
def create_request(self, url):
'''
;传入url创建一个http get请求
:param url:
:return:
'''
request = {}
request['url'] = url
request['method'] = "GET"
request['headers'] = {
"Sec-Ch-Ua": "\"(Not(A:Brand\";v=\"8\", \"Chromium\";v=\"98\"",
"Accept": "*/*",
"Sec-Ch-Ua-Platform": "\"Windows\"",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4727.0 Safari/537.36",
"Connection": "close", "Sec-Fetch-Site": "none", "Sec-Fetch-Dest": "document",
"Accept-Encoding": "gzip, deflate", "Sec-Fetch-Mode": "navigate", "Upgrade-Insecure-Requests": "1",
"Sec-Fetch-User": "?1", "Accept-Language": "zh-CN,zh;q=0.9", "Sec-Ch-Ua-Mobile": "?0",
"referer": self.Core_Function.callback_split_url(url, 0),
"origin": self.Core_Function.callback_split_url(url, 0)
}
request['body'] = "Null"
request['content_type'] = 1
request['status'] = 0
# print(request)
return request
if __name__ == '__main__':
request_list=[{"headers": {"Origin": "http://192.168.72.6", "Cookie": "PHPSESSID=9j2v086n48q71h2aq6totqliib; security=low", "Accept": "*/*", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36", "Referer": "http://192.168.72.6/web_vul_test/test_sql.html", "Connection": "close", "Accept-Encoding": "gzip, deflate", "Content-type": "application/json", "Accept-Language": "zh-CN,zh;q=0.9"}, "method": "POST", "body": "{\"name\":\"lisi\",\"age\":50,\"data\":{\"aa\":\"c\",\"id\":3,\"bbb\":133}}", "url": "http://192.168.72.6:80/web_vul_test/php_api/json_sql.php?method=sql_inj_json_method"},{
"headers": {
"Cookie": "PHPSESSID=9j2v086n48q71h2aq6totqliib; security=low",
"Accept": "*/*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36",
"Referer": "http://192.168.72.6/web_vul_test/test_sql.html",
"Connection": "close",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9"
},
"method": "GET",
"body": "Null",
"url": "http://192.168.72.6:80/web_vul_test/php_api/json_sql.php?method=sql_inj_get_method&id=1dasdsadsa&ida=13243234&id=1",
"param": {
"param_name": "id",
"param_type": "Int",
"param_lenght": 1,
"param_value": "1"
},
"Vul_Type": "SQL_where_int",
"time": "2022-11-05_16-30-14"
},{'headers': {'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36', 'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"', 'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1', 'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'GET', 'body': 'Null', 'url': 'http://192.168.72.6:80/web_vul_test/http_method/index.html'
},{"headers": {"Origin": "http://192.168.72.6", "Cookie": "PHPSESSID=gd7a57gisgbef82jkukndj585r; security=low", "Accept": "application/json, text/javascript, */*; q=0.01", "X-Requested-With": "XMLHttpRequest", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36", "Referer": "http://192.168.72.6/web_vul_test/http_method/index.html", "Connection": "close", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9", "Content-Type": "application/json; charset=UTF-8"}, "method": "POST", "body": "{\"name\":\"John\",\"time\":\"2p1m\",\"url\":\"http%3A%2F%2Fwww.guimaizi.com%2F\"}", "url": "http://192.168.72.6:80/web_vul_test/http_method/http_method.php?method=POST_json_method&dsaa=aaasda1sdas&id=32&string=flag"}
]
request_list1=[
{"headers": {"Origin": "http://192.168.72.6", "Cookie": "PHPSESSID=gd7a57gisgbef82jkukndj585r; security=low", "Accept": "application/json, text/javascript, */*; q=0.01", "X-Requested-With": "XMLHttpRequest", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36", "Referer": "http://192.168.72.6/web_vul_test/http_method/index.html", "Connection": "close", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9", "Content-Type": "application/json; charset=UTF-8"}, "method": "POST", "body": "{\"name\":\"John\",\"time\":\"2p1m\",\"url\":\"http%3A%2F%2Fwww.guimaizi.com%2F\"}",
"url": "http://192.168.72.6:80/web_vul_test/http_method/http_method.php?method=POST_json_method&dsaa=aaasda1sdas&id=32&string=flag"}
,{"headers": {"Origin": "http://192.168.72.6", "Cookie": "PHPSESSID=gd7a57gisgbef82jkukndj585r; security=low", "Accept": "application/json, text/javascript, */*; q=0.01", "X-Requested-With": "XMLHttpRequest", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36", "Referer": "http://192.168.72.6/web_vul_test/http_method/index.html", "Connection": "close", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9", "Content-Type": "application/json; charset=UTF-8"}, "method": "POST", "body": "{\"name\":\"John\",\"time\":\"2p1m\",\"url\":\"http%3A%2F%2Fwww.guimaizi.com%2F\"}", "url": "http://192.168.72.6:80/web_vul_test/http_method/http_method.php?method=POST_json_method&dsaa=aaasda1sdas&id=32&string=flag"},
{'headers': {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"',
'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1',
'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'GET',
'body': 'Null', 'url': 'http://192.168.72.6:80/web_vul_test/http_method/index.html'
},
{'headers': {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"',
'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1',
'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'GET',
'body': 'Null', 'url': 'http://192.168.72.6/sleep.php'
}, {'headers': {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"',
'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1',
'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'GET',
'body': 'Null', 'url': 'http://192.168.72.6:80/web_vul_test/http_method/333.html'
},{'headers': {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
'Connection': 'close', 'Accept-Encoding': 'gzip, deflate', 'If-None-Match': '"b8aa-5e22f1d770eee-gzip"',
'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1',
'If-Modified-Since': 'Fri, 24 Jun 2022 10:34:29 GMT', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'GET',
'body': 'Null', 'url': 'http://192.168.72.6:80/browser_ua.html'
}
]
task=spider()
request_list2=[]
for num in range(10):
url='http://192.168.72.6/web_vul_test/spider/%s.html'%str(num)
request_list2.append(task.create_request(url))
request_list2.append(task.create_request('http://192.168.72.6/web_vul_test/spider/1.php'))
asyncio.run(task.browser(request_list1))
task.callback_result()
#task.set_request_signal(request_list1)
Class_function.py
# coding: utf-8
"""
@Time : 2022/02/18
@Author: guimaizi
@File: Class_function.py
@Software: PyCharm
"""
import queue, threadpool, json, random, logging, time, traceback, os, requests, string, hashlib,concurrent.futures
from urllib.parse import urlparse
from pymongo import MongoClient
from functools import wraps
class Class_function:
def __init__(self):
self.Path = os.path.dirname(__file__)
self.Path = self.Path.replace('\\', '/')
with open(r'%s/config.json' % self.Path, 'r') as load_f:
self.Load_Dict = json.load(load_f)
self.logger = logging.getLogger()
def callback_mongodb(self):
return MongoClient(self.Load_Dict['MongoDB']['IP'], username=self.Load_Dict['MongoDB']['username'],
password=self.Load_Dict['MongoDB']['password'],
authSource=self.Load_Dict['MongoDB']['DB_name'])
def callback_logging(self):
'''
# 由于日志基本配置中级别设置为DEBUG,所以一下打印信息将会全部显示在控制台上
[#]INFO: this is a loggging info message 2021-02-13 12:03:59,481 - test.py[line:6]
logging.info('this is a loggging info message')
logging.debug('this is a loggging debug message')
logging.warning('this is loggging a warning message')
logging.error('this is an loggging error message')
logging.critical('this is a loggging critical message')
:return:
'''
# logger = logging.getLogger()
self.logger.setLevel('INFO')
formatter = logging.Formatter("[#]%(levelname)s: %(message)s %(asctime)s - %(filename)s[line:%(lineno)d]")
chlr = logging.StreamHandler() # 输出到控制台的handler
chlr.setFormatter(formatter)
# chlr.setLevel('INFO') # 也可以不设置,不设置就默认用logger的level
fhlr = logging.FileHandler('%s/Data/debuglog.log' % self.Path) # 输出到文件的handler
fhlr.setFormatter(formatter)
if not self.logger.handlers:
self.logger.addHandler(chlr)
self.logger.addHandler(fhlr)
return self.logger
def callback_ranstr(self, num=8):
# 随机字符串
H = 'abcdefghijklmnopqrstuvwxyz0123456789.-'
salt = random.sample(H, num)
return ''.join(salt)
def md5_convert(self, string):
"""
计算字符串md5值
:param string: 输入字符串
:return: 字符串md5
"""
m = hashlib.md5()
m.update(string.encode())
return m.hexdigest()
def callback_time(self, num):
'''
0 时间+日期
1 日期
2 时间
:param num:
:return:
'''
if num == 1:
return time.strftime('%Y-%m-%d', time.localtime())
elif num == 2:
return time.strftime('%H-%M-%S', time.localtime())
elif num == 0:
return time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime())
def threadpool_function(self, function, lists, num):
'''
thread多线程池
:fun 函数
:lists 数据
:num 线程数
'''
with concurrent.futures.ThreadPoolExecutor(num) as executor:
executor.map(function, lists,timeout=10)
#executor.shutdown(wait=True)
def callback_txt_list(self, filename):
'''
返回文件txt
:param filename:
:return:
'''
try:
list_url = []
for i in open(filename):
list_url.append(i.strip())
# os.remove(filename)
return list(set(list_url))
except:
return []
def callback_domain(self, target_domain):
'''
www.dasda.com result: false
:param target_domain:
:return:
'''
for domain in self.Load_Dict['Domain_List']:
if str(target_domain.lower()).endswith('.' + domain) or str(target_domain.lower()) in self.Load_Dict['Domain_List']:
return domain
return False
def callback_split_url(self, url, type):
'''
callback domain
:type
http?s://www.xxx.com:80/aaaaa?dassda=aaa
0 http?s://www.xxx.com:80/
1 www.xxx.com:80
2 www.xxx.com
3 http?s://www.xxx.com:80/aaaaa
4 http?s://www.xxx.com:80/aaaaa?dassda=aaa
'''
try:
url = urlparse(url)
if type == 0:
return url.scheme + '://' + url.netloc + '/'
elif type == 1:
return url.netloc
elif type == 3:
return url.scheme + '://' + url.netloc + url.path
elif type == 4:
return url
elif type == 2:
if ':' in url.netloc:
return url.netloc.split(':')[0]
return url.netloc
except:
return False
def callbck_requests_obj(self, url):
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36'}
r = requests.get(url, headers=headers, timeout=8, allow_redirects=False)
return r
def create_image_path(self):
'''
:return: 图片保存路径
'''
image_path = '%s/Web_Test/static/image/%s' % (self.Path, self.callback_time(1))
isExists = os.path.exists(image_path)
if not isExists: os.makedirs(image_path)
return image_path
def callback_url(self, url):
'''
http://192.168.0.36:80/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit
http://192.168.0.36/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit
:param url:
:return:
'''
url_urlparse = urlparse(url)
netloc = url_urlparse.netloc.split(':')
if len(netloc) == 2:
if netloc[1] == '80' or netloc[1] == '443':
url = url_urlparse._replace(netloc=netloc[0]).geturl()
url_urlparse = urlparse(url)
url = url_urlparse._replace(scheme=url_urlparse.scheme.lower(), netloc=url_urlparse.netloc.lower()).geturl()
return url
def callback_http_request_list(self, http_request_list):
'''
;标准化请求
http://192.168.0.36:80/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit
http://192.168.0.36/DVWA/vulnerabilities/sqli/?id=1&Submit=Submit
:param http_request_list:
:return:
'''
for line in http_request_list:
line['url'] = self.callback_url(line['url'])
return http_request_list
def create_request(self, url):
'''
;创建请求
:param url:
:return:
'''
request = {}
request['url'] = url
request['method'] = "GET"
request['headers'] = {
"Sec-Ch-Ua": "\"(Not(A:Brand\";v=\"8\", \"Chromium\";v=\"98\"",
"Accept": "*/*",
"Sec-Ch-Ua-Platform": "\"Windows\"",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4727.0 Safari/537.36",
"Connection": "close", "Sec-Fetch-Site": "none", "Sec-Fetch-Dest": "document",
"Accept-Encoding": "gzip, deflate", "Sec-Fetch-Mode": "navigate", "Upgrade-Insecure-Requests": "1",
"Sec-Fetch-User": "?1", "Accept-Language": "zh-CN,zh;q=0.9", "Sec-Ch-Ua-Mobile": "?0",
"referer": self.callback_split_url(url, 0),
"origin": self.callback_split_url(url, 0)
}
request['body'] = "Null"
request['content_type'] = 1
request['status'] = 0
# print(request)
return request
if __name__ == '__main__':
task = Class_function()
# print(task.callback_domain('asda.dasdas.com.qq.com'))
# print(task.callback_split_url('javascript:window.open()',4))
print(task.callback_url('http://192.168.72.6:80/web_vul_test/test_sql.html'))
代码从我扫描器里扒下来的,可能有几个小报错,自己修修改改下,反正核心功能就是基于playwright的爬虫,不行就查api,有逻辑问题再联系我。
欢迎技术交流,欢迎找bug后联系我,我再修修改改,闲人勿扰,wechat: guimaizi。