前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从工具OneForAll代码角度学习子域名挖掘

从工具OneForAll代码角度学习子域名挖掘

作者头像
黑白天安全
发布2022-02-23 15:36:02
2K0
发布2022-02-23 15:36:02
举报

工具里用到的原理,大部分都可以通过下文找到:

子域名枚举&收集

代码分析版本为:OneForAll v0.4.3

OneForAll目录结构:

参考来源./docs/directory_structure.md

代码语言:javascript
复制
|   brute.py

依赖如下:

来源./requirements.txt

代码语言:javascript
复制
beautifulsoup4==4.9.3
bs4==0.0.1
# Beautiful Soup 是一个可以从HTML或XML文件中提取数据的Python库.
    https://beautifulsoup.readthedocs.io/zh_CN/v4.4.0/

certifi==2020.12.5
    https://appdividend.com/2020/06/19/python-certifi-example-how-to-use-ssl-certificate-in-python/

chardet==4.0.0
# 判断编码
    https://pypi.org/project/chardet/
colorama==0.4.4
# 终端颜色显示
    https://pypi.org/project/colorama/
dnspython==2.1.0
# Dns查询工具包
    https://www.dnspython.org/
exrex==0.10.5
# Exrex 是一个命令行工具和 python 模块,可生成与给定正则表达式等的所有或随机匹配的字符串。
    https://github.com/asciimoo/exrex
fire==0.4.0
# Python Fire 是一个用于从绝对任何 Python 对象自动生成命令行界面 (CLI) 的库。
    https://github.com/google/python-fire           https://blog.csdn.net/qq_17550379/article/details/79943740
future==0.18.2
# 并行
    https://docs.python.org/zh-cn/3/library/concurrent.futures.html
idna==2.10
    https://pypi.org/project/idna/
loguru==0.5.3
# 日志模块
https://blog.csdn.net/cui_yonghua/article/details/107498535
PySocks==1.7.1
# 代理模块
    https://pypi.org/project/PySocks/
requests==2.25.1
# 网页请求模块
    https://docs.python-requests.org/en/latest/
six==1.15.0
# 兼容性
    https://six.readthedocs.io/
soupsieve==2.2.1
# css选择
    https://pypi.org/project/soupsieve/
SQLAlchemy==1.3.22
# Python SQL 工具包和对象关系映射器
    https://pypi.org/project/SQLAlchemy/
tenacity==7.0.0
# 简化将重试行为添加到几乎任何内容的任务
    https://tenacity.readthedocs.io/en/latest/
termcolor==1.1.0
# 终端颜色
    https://pypi.org/project/termcolor/
tqdm==4.59.0
# 进度显示
    https://github.com/tqdm/tqdm
treelib==1.6.1
# 在 Python 中提供树数据结构的有效实现
    https://treelib.readthedocs.io/en/latest/
urllib3==1.26.4
# 网页请求
    https://urllib3.readthedocs.io/en/stable/
win32-setctime==1.0.3
# 一个小的 Python 实用程序,用于在 Windows 上设置文件创建时间。
    https://pypi.org/project/win32-setctime/

0x1 流程总结

代码语言:javascript
复制
run()->main()->detect_wildcard()泛域名解析->收集模块(Collect)->SRV爆破模块(BruteSRV)->爆破模块(Brute)->dns解析验证(resolve)->http请求验证模块(req)->爬取解析模块(Finder)->子域置换模块(Altdns)->丰富结果(enrich)->子域名接管扫描模块(Takeover)

0x2 流程分析

0x2.1 run()

默认配置&检查

代码语言:javascript
复制
for domain in self.domains:
    self.domain = utils.get_main_domain(domain)
    # 注册域名
    self.main()
代码语言:javascript
复制
def get_main_domain(domain):
    if not isinstance(domain, str):
        return None
    return Domain(domain).registered()
    # Domain类
代码语言:javascript
复制
class Domain(object):
    def __init__(self, string):
        self.string = str(string)
        self.regexp = r'\b((?=[a-z0-9-]{1,63}\.)(xn--)?[a-z0-9]+(-[a-z0-9]+)*\.)+[a-z]{2,63}\b'
        self.domain = None
        # 初始化
    def registered(self):
        """
        registered domain

        >>> d = Domain('www.example.com')
        <domain.Domain object>
        >>> d.registered()
        example.com

        :return: registered domain result
        """
        result = self.extract()
        if result:
            return result.registered_domain
        return None
        # 注册&解析(非dns解析)域名变量

0x2.2 main()

代码语言:javascript
复制
self.main()
// 调用OneForAll类 main方法
0x2.2.1
代码语言:javascript
复制
utils.init_table(self.domain)
代码语言:javascript
复制
def init_table(domain):
    db = Database()
    db.drop_table(domain)
    db.create_table(domain)
    db.close()
    //创建表(先删除,防止报错)
代码语言:javascript
复制
if self.access_internet:
    self.enable_wildcard = wildcard.detect_wildcard(self.domain)
    # 泛域名解析
    collect = Collect(self.domain)
            collect.run()
    # 跑collect模块
代码语言:javascript
复制
def to_detect_wildcard(domain):
    """
    Detect use wildcard dns record or not

    :param  str  domain:  domain
    :return bool use wildcard dns record or not
    """
    logger.log('INFOR', f'Detecting {domain} use wildcard dns record or not')
    random_subdomains = gen_random_subdomains(domain, 3) # 随机生成子域名
    if not all_resolve_success(random_subdomains):
        return False
    is_all_success, all_request_resp = all_request_success(random_subdomains)
    if not is_all_success:
        return True
    return any_similar_html(all_request_resp)
代码语言:javascript
复制
def gen_random_subdomains(domain, count):
    """
    生成指定数量的随机子域域名列表

    :param domain: 主域
    :param count: 数量
    """
    subdomains = set()
    if count < 1:
        return subdomains
    for _ in range(count):
        token = secrets.token_hex(4)
        subdomains.add(f'{token}.{domain}')
    return subdomains
0x2.2.2 Collect()模块

以下为收集模块代码:

代码语言:javascript
复制
class Collect(object):
    def __init__(self, domain):
        self.domain = domain
        self.modules = []
        self.collect_funcs = []
    def run(self):
        """
        Class entrance
        """
        logger.log('INFOR', f'Start collecting subdomains of {self.domain}')
        self.get_mod()
        self.import_func()
        # 注意上面这两个函数
        threads = []
        # Create subdomain collection threads
        for func_obj, func_name in self.collect_funcs:
            thread = threading.Thread(target=func_obj, name=func_name,
                                      args=(self.domain,), daemon=True)
            threads.append(thread)
        # Start all threads
        for thread in threads:
            thread.start()
        # Wait for all threads to finish
        for thread in threads:
            # 挨个线程判断超时 最坏情况主线程阻塞时间=线程数*module_thread_timeout
            # 超时线程将脱离主线程 由于创建线程时已添加守护属于 所有超时线程会随着主线程结束
            thread.join(settings.module_thread_timeout)

        for thread in threads:
            if thread.is_alive():
                logger.log('ALERT', f'{thread.name} module thread timed out')
代码语言:javascript
复制
def get_mod(self):
       """
       Get modules
       """
       if settings.enable_all_module:
           # The crawl module has some problems
           modules = ['certificates', 'check', 'datasets',
                      'dnsquery', 'intelligence', 'search']
           for module in modules:
               module_path = settings.module_dir.joinpath(module)
               for path in module_path.rglob('*.py'):
                   import_module = f'modules.{module}.{path.stem}'
                   self.modules.append(import_module)
                   # modules文件导入
       else:
           self.modules = settings.enable_partial_module
代码语言:javascript
复制
def import_func(self):
    """
     Import do function
    """
    for module in self.modules:
        name = module.split('.')[-1]
        import_object = importlib.import_module(module)
        func = getattr(import_object, 'run')
        self.collect_funcs.append([func, name])
        # 获取module每个py文件run方法
代码语言:javascript
复制
for func_obj, func_name in self.collect_funcs:
            thread = threading.Thread(target=func_obj, name=func_name,
                                      args=(self.domain,), daemon=True)
            threads.append(thread)

modules每个py文件模板差不多都长这样

因此,这里用到了继承,也重点关注下这块的代码

代码语言:javascript
复制
from common.query import Query

class CensysAPI(Query):
    def run():
        self.begin()
        self.query()    //  除了query,其他方法均继承与Query类
        self.finish()
        self.save_json()
        self.gen_result()
        self.save_db()

Query继承于Module

代码语言:javascript
复制
from common.module import Module


class Query(Module):
    """
    Query base class
    """
    def __init__(self):
        Module.__init__(self)
代码语言:javascript
复制
class Module(object):
    def __init__(self):
        self.module = 'Module'
        self.source = 'BaseModule'
        self.cookie = None
        self.header = dict()
        self.proxy = None
        self.delay = 1  # 请求睡眠时延
        self.timeout = settings.request_timeout_second  # 请求超时时间
        self.verify = settings.request_ssl_verify  # 请求SSL验证
        self.domain = str()  # 当前进行子域名收集的主域
        self.subdomains = set()  # 存放发现的子域
        self.infos = dict()  # 存放子域有关信息
        self.results = list()  # 存放模块结果
        self.start = time.time()  # 模块开始执行时间
        self.end = None  # 模块结束执行时间
        self.elapse = None  # 模块执行耗时
    def begin(self):
        """
        begin log
        """
        logger.log('DEBUG', f'Start {self.source} module to '
                            f'collect subdomains of {self.domain}')

    def finish(self):
        """
        finish log
        """
        self.end = time.time()
        self.elapse = round(self.end - self.start, 1)
        logger.log('DEBUG', f'Finished {self.source} module to '
                            f'collect {self.domain}\'s subdomains')
        logger.log('INFOR', f'{self.source} module took {self.elapse} seconds '
                            f'found {len(self.subdomains)} subdomains')
        logger.log('DEBUG', f'{self.source} module found subdomains of {self.domain}\n'
                            f'{self.subdomains}')
        .......
        .......
        .......
        # 重写了get,post,head等请求
    def save_json(self):
        """
        Save the results of each module as a json file

        :return bool: whether saved successfully
        """
        if not settings.save_module_result:
            return False
        logger.log('TRACE', f'Save the subdomain results found by '
                            f'{self.source} module as a json file')
        path = settings.result_save_dir.joinpath(self.domain, self.module)
        path.mkdir(parents=True, exist_ok=True)
        name = self.source + '.json'
        path = path.joinpath(name)
        with open(path, mode='w', errors='ignore') as file:
            result = {'domain': self.domain,
                      'name': self.module,
                      'source': self.source,
                      'elapse': self.elapse,
                      'find': len(self.subdomains),
                      'subdomains': list(self.subdomains),
                      'infos': self.infos}
            json.dump(result, file, ensure_ascii=False, indent=4)
        return True

    def gen_result(self):
        """
        Generate results
        """
        logger.log('DEBUG', f'Generating final results')
        if not len(self.subdomains):  # 该模块一个子域都没有发现的情况
            logger.log('DEBUG', f'{self.source} module result is empty')
            result = {'id': None,
                      'alive': None,
                      'request': None,
                      'resolve': None,
                      'url': None,
                      'subdomain': None,
                      'port': None,
                      'level': None,
                      'cname': None,
                      'ip': None,
                      'public': None,
                      'cdn': None,
                      'status': None,
                      'reason': None,
                      'title': None,
                      'banner': None,
                      'header': None,
                      'history': None,
                      'response': None,
                      'ip_times': None,
                      'cname_times': None,
                      'ttl': None,
                      'cidr': None,
                      'asn': None,
                      'org': None,
                      'addr': None,
                      'isp': None,
                      'resolver': None,
                      'module': self.module,
                      'source': self.source,
                      'elapse': self.elapse,
                      'find': None}
            self.results.append(result)
        else:
            for subdomain in self.subdomains:
                url = 'http://' + subdomain
                level = subdomain.count('.') - self.domain.count('.')
                info = self.infos.get(subdomain)
                if info is None:
                    info = dict()
                cname = info.get('cname')
                ip = info.get('ip')
                ip_times = info.get('ip_times')
                cname_times = info.get('cname_times')
                ttl = info.get('ttl')
                if isinstance(cname, list):
                    cname = ','.join(cname)
                    ip = ','.join(ip)
                    ip_times = ','.join([str(num) for num in ip_times])
                    cname_times = ','.join([str(num) for num in cname_times])
                    ttl = ','.join([str(num) for num in ttl])
                result = {'id': None,
                          'alive': info.get('alive'),
                          'request': info.get('request'),
                          'resolve': info.get('resolve'),
                          'url': url,
                          'subdomain': subdomain,
                          'port': 80,
                          'level': level,
                          'cname': cname,
                          'ip': ip,
                          'public': info.get('public'),
                          'cdn': info.get('cdn'),
                          'status': None,
                          'reason': info.get('reason'),
                          'title': None,
                          'banner': None,
                          'header': None,
                          'history': None,
                          'response': None,
                          'ip_times': ip_times,
                          'cname_times': cname_times,
                          'ttl': ttl,
                          'cidr': info.get('cidr'),
                          'asn': info.get('asn'),
                          'org': info.get('org'),
                          'addr': info.get('addr'),
                          'isp': info.get('isp'),
                          'resolver': info.get('resolver'),
                          'module': self.module,
                          'source': self.source,
                          'elapse': self.elapse,
                          'find': len(self.subdomains)}
                self.results.append(result)
    def save_db(self):
        """
        Save module results into the database
        """
        logger.log('DEBUG', f'Saving results to database')
        lock.acquire()
        db = Database()
        db.create_table(self.domain)
        db.save_db(self.domain, self.results, self.source)
        db.close()
        lock.release()

单独拎出来:(不重要的省略,但是不影响后面分析)

代码语言:javascript
复制
def run():
        self.begin()    # 输出日志
        self.query()    # 除了query,其他方法均继承与Query类
        self.finish()   # 输出日志,计算query用了多少时间
        self.save_json() # 保存json
        self.gen_result()
        self.save_db()
代码语言:javascript
复制
save_json()
    name = self.source + '.json'
    path = path.joinpath(name)
    with open(path, mode='w', errors='ignore') as file:
        result = {'domain': self.domain,
            'name': self.module,
            'source': self.source,
            'elapse': self.elapse,
            'find': len(self.subdomains),
            'subdomains': list(self.subdomains),
            'infos': self.infos}
    json.dump(result, file, ensure_ascii=False, indent=4)
代码语言:javascript
复制
gen_result()
result = ......
self.results.append(result)
代码语言:javascript
复制
save_db(self):
    db.save_db(self.domain, self.results, self.source)  # 重点可以如何存储的数据
    这里用了多线程
代码语言:javascript
复制
def save_db(self, table_name, results, module_name=None):
    table_name = table_name.replace('.', '_')
    if results:
        try:
            self.conn.bulk_query(insert data sql)
            # self.conn = self.get_conn(db_path)
                # return db.get_connection()
                    # Connection(self._engine.connect())
代码语言:javascript
复制
def bulk_query(self, query, *multiparams):
    self._conn.execute(text(query), *multiparams)
        # self._conn = Connection(self._engine.connect())
# 这里挺绕的😅
  • collect模块跑完了,最终数据都在self.results(这其中,跑完一个小模块,存数据,边跑边存)
    • 注意:此时的数据还未进行解析验证、http请求等
    • 后续还有对其导出再删除表再新建表重新插入数据的操作

Collect()总结图如下:

0x2.2.3 BruteSRV
代码语言:javascript
复制
# 回到 oneforall.py - 161行
srv = BruteSRV(self.domain)
    srv.run()
代码语言:javascript
复制
class BruteSRV(Module):
    def __init__(self, domain):
        Module.__init__(self)
        self.domain = domain
        self.module = 'BruteSRV'
        self.source = "BruteSRV"
        self.qtype = 'SRV'
        self.thread_num = 20
        self.names_queue = queue.Queue()
        self.answers_queue = queue.Queue()
    def run(self):
        self.begin()    # 继承的,跟上面一样
        self.fill_queue() # 读取 srv_prefixes.json
        self.do_brute()  # 多线程爆破
        self.deal_answers() #
        self.finish()
        self.save_json()
        self.gen_result()
        self.save_db()
# 继承于Module.py
代码语言:javascript
复制
self.fill_queue()
    path = data_storage_dir.joinpath('srv_prefixes.json')
    prefixes = utils.load_json(path)
    for prefix in prefixes:
    self.names_queue.put(prefix + self.domain)
代码语言:javascript
复制
def do_brute(self):
    for num in range(self.thread_num):
        thread = BruteThread(self.names_queue, self.answers_queue)
        # 类继承方式 多线程
        thread.name = f'BruteThread-{num}'
        thread.daemon = True
        thread.start()
    self.names_queue.join()
代码语言:javascript
复制
    def deal_answers(self):
        while not self.answers_queue.empty():
            answer = self.answers_queue.get()
            if answer is None:
                continue
            for item in answer:
                record = str(item)
                subdomains = self.match_subdomains(record)
                self.subdomains.update(subdomains)  # set集合存结果
0x2.2.4 brute模块
代码语言:javascript
复制
# 爆破模块, 调用brute.py文件
if self.brute:
    # Due to there will be a large number of dns resolution requests,
    # may cause other network tasks to be error
    brute = Brute(self.domain, word=True, export=False)
    brute.enable_wildcard = self.enable_wildcard
    brute.in_china = self.in_china
    brute.quite = True
    brute.run()
代码语言:javascript
复制
    def main(self, domain):
        start = time.time()
        logger.log('INFOR', f'Blasting {domain} ')
        massdns_dir = settings.third_party_dir.joinpath('massdns')
        result_dir = settings.result_save_dir
        temp_dir = result_dir.joinpath('temp')
        utils.check_dir(temp_dir)
        massdns_path = utils.get_massdns_path(massdns_dir)
        timestring = utils.get_timestring()

        wildcard_ips = list()  # 泛解析IP列表
        wildcard_ttl = int()  # 泛解析TTL整型值
        ns_list = query_domain_ns(self.domain)
        ns_ip_list = query_domain_ns_a(ns_list)  # DNS权威名称服务器对应A记录列表
        if self.enable_wildcard is None:
            self.enable_wildcard = wildcard.detect_wildcard(domain)

        if self.enable_wildcard:
            wildcard_ips, wildcard_ttl = wildcard.collect_wildcard_record(domain, ns_ip_list)
        ns_path = utils.get_ns_path(self.in_china, self.enable_wildcard, ns_ip_list)

        dict_set = self.gen_brute_dict(domain)
        # 生成字典
代码语言:javascript
复制
    def gen_brute_dict(self, domain):
        logger.log('INFOR', f'Generating dictionary for {domain}')
        dict_set = set()
        # 如果domain不是self.subdomain 而是self.domain的子域则生成递归爆破字典
        if self.word:
            self.place = ''
        if not self.place:
            self.place = '*.' + domain
        wordlist = self.wordlist
        main_domain = utils.get_main_domain(domain)
        if domain != main_domain:
            wordlist = self.recursive_nextlist
        if self.word:
            word_subdomains = gen_word_subdomains(self.place, wordlist)
            dict_set.update(word_subdomains)
        if self.fuzz:
            fuzz_subdomains = gen_fuzz_subdomains(self.place, self.rule, self.fuzzlist)
            dict_set.update(fuzz_subdomains)
        count = len(dict_set)
        logger.log('INFOR', f'Dictionary size: {count}')
        if count > 10000000:
            logger.log('ALERT', f'The generated dictionary is '
                                f'too large {count} > 10000000')
        return dict_set
  • 未指定参数--wordlist,默认调用的是./data/subnames.txt字典文件,95247个子域名爆破
  • 字典数大于10000000会报。。。 dict_name = f'generated_subdomains_{domain}_{timestring}.txt' dict_path = temp_dir.joinpath(dict_name) save_brute_dict(dict_path, dict_set) del dict_set gc.collect() output_name = f'resolved_result_{domain}_{timestring}.json' output_path = temp_dir.joinpath(output_name) log_path = result_dir.joinpath('massdns.log') check_dict() logger.log('INFOR', f'Running massdns to brute subdomains') utils.call_massdns(massdns_path, dict_path, ns_path, output_path, log_path, quiet_mode=self.quite, concurrent_num=self.concurrent_num) # 调用massdns进行暴力破解 appear_times = stat_appear_times(output_path) self.infos, self.subdomains = deal_output(output_path, appear_times, wildcard_ips, wildcard_ttl) delete_file(dict_path, output_path) end = time.time() self.elapse = round(end - start, 1) logger.log('ALERT', f'{self.source} module takes {self.elapse} seconds, ' f'found {len(self.subdomains)} subdomains of {domain}') logger.log('DEBUG', f'{self.source} module found subdomains of {domain}: ' f'{self.subdomains}') self.gen_result() self.save_db() return self.subdomains # call_massdns函数cmd massdns.exe .......
    • 调用massdns爆破
0x2.2.5 数据处理
代码语言:javascript
复制
utils.deal_data(self.domain)
代码语言:javascript
复制
def deal_data(domain):
    db = Database()
    db.remove_invalid(domain)
    db.deduplicate_subdomain(domain)
    db.close()
代码语言:javascript
复制
    def remove_invalid(self, table_name):
        """
        Remove nulls or invalid subdomains in the table

        :param str table_name: table name
        """
        table_name = table_name.replace('.', '_')
        logger.log('TRACE', f'Removing invalid subdomains in {table_name} table')
        self.query(f'delete from "{table_name}" where '
                   f'subdomain is null or resolve == 0')
        # 移除无效数据
代码语言:javascript
复制
    def deduplicate_subdomain(self, table_name):
        """
        Deduplicate subdomains in the table

        :param str table_name: table name
        """
        table_name = table_name.replace('.', '_')
        logger.log('TRACE', f'Deduplicating subdomains in {table_name} table')
        self.query(f'delete from "{table_name}" where '
                   f'id not in (select min(id) '
                   f'from "{table_name}" group by subdomain)')
        # 数据去重
0x2.2.6
代码语言:javascript
复制
if not self.dns:
    self.data = self.export_data()# 导出数据
    self.datas.extend(self.data)# self.datas新列表存储数据
    return self.data               #
代码语言:javascript
复制
self.data = self.export_data()
代码语言:javascript
复制
def export_data(self):
    """
    Export data from the database

    :return: exported data
    :rtype: list
    """
    return export.export_data(self.domain, alive=self.alive, fmt=self.fmt, path=self.path)
代码语言:javascript
复制
def export_data(target, db=None, alive=False, limit=None, path=None, fmt='csv', show=False):
    """
    OneForAll export from database module

    Example:
        python3 export.py --target name --fmt csv --dir= ./result.csv
        python3 export.py --target name --tb True --show False
        python3 export.py --db result.db --target name --show False

    Note:
        --fmt csv/json (result format)
        --path   Result directory (default directory is ./results)

    :param str  target:  Table to be exported
    :param str  db:      Database path to be exported (default ./results/result.sqlite3)
    :param bool alive:   Only export the results of alive subdomains (default False)
    :param str  limit:   Export limit (default None)
    :param str  fmt:     Result format (default csv)
    :param str  path:    Result directory (default None)
    :param bool show:    Displays the exported data in terminal (default False)
    """

    database = Database(db)
    domains = utils.get_domains(target)
    datas = list()
    if domains:
        for domain in domains:
            table_name = domain.replace('.', '_')
            rows = database.export_data(table_name, alive, limit)   # !!!
            if rows is None:
                continue
            data, _, _ = do_export(fmt, path, rows, show, domain, target)# !!!
            datas.extend(data)
    database.close()
    if len(domains) > 1:
        utils.export_all(alive, fmt, path, datas)   # !!!
    return datas
代码语言:javascript
复制
    def export_data(self, table_name, alive, limit):
        """
        Get part of the data in the table

        :param str table_name: table name
        :param any alive: alive flag
        :param str limit: limit value
        """
        table_name = table_name.replace('.', '_')
        sql = f'select id, alive, request, resolve, url, subdomain, level,' \
              f'cname, ip, public, cdn, port, status, reason, title, banner,' \
              f'cidr, asn, org, addr, isp, source from "{table_name}" '
        if alive and limit:
            if limit in ['resolve', 'request']:
                where = f' where {limit} = 1'
                sql += where
        elif alive:
            where = f' where alive = 1'
            sql += where
        sql += ' order by subdomain'
        logger.log('TRACE', f'Get the data from {table_name} table')
        return self.query(sql)
代码语言:javascript
复制
def do_export(fmt, path, rows, show, domain, target):
    fmt = utils.check_format(fmt)
    path = utils.check_path(path, target, fmt)
    if show:
        print(rows.dataset)
    data = rows.export(fmt)
    utils.save_to_file(path, data)
    logger.log('ALERT', f'The subdomain result for {domain}: {path}')
    data = rows.as_dict()
    return data, fmt, path
代码语言:javascript
复制
def export_all(alive, fmt, path, datas):
    """
    将所有结果数据导出

    :param bool alive: 只导出存活子域结果
    :param str fmt: 导出文件格式
    :param str path: 导出文件路径
    :param list datas: 待导出的结果数据
    """
    fmt = check_format(fmt)
    timestamp = get_timestring()
    name = f'all_subdomain_result_{timestamp}'
    export_all_results(path, name, fmt, datas)
    export_all_subdomains(alive, path, name, datas)
代码语言:javascript
复制
def export_all_results(path, name, fmt, datas):
    path = check_path(path, name, fmt)
    logger.log('ALERT', f'The subdomain result for all main domains: {path}')
    row_list = list()
    for row in datas:
        if 'header' in row:
            row.pop('header')
        if 'response' in row:
            row.pop('response')
        keys = row.keys()
        values = row.values()
        row_list.append(Record(keys, values))
    rows = RecordCollection(iter(row_list)) //!!!
    content = rows.export(fmt)
    save_to_file(path, content)
代码语言:javascript
复制
def export_all_subdomains(alive, path, name, datas):
    path = check_path(path, name, 'txt')
    logger.log('ALERT', f'The txt subdomain result for all main domains: {path}')
    subdomains = set()
    for row in datas:
        subdomain = row.get('subdomain')
        if alive:
            if not row.get('alive'):
                continue
            subdomains.add(subdomain)
        else:
            subdomains.add(subdomain)
    data = '\n'.join(subdomains)
    save_to_file(path, data)

self.data = utils.get_data(self.domain)

代码语言:javascript
复制
def get_data(domain):
    db = Database()
    data = db.get_data(domain).as_dict()    # !!!
    db.close()
    return data

db.get_data(domain)

代码语言:javascript
复制
    def get_data(self, table_name):
        """
        Get all the data in the table

        :param str table_name: table name
        """
        table_name = table_name.replace('.', '_')
        logger.log('TRACE', f'Get all the data from {table_name} table')
        return self.query(f'select * from "{table_name}"')

as_dict()

代码语言:javascript
复制
def as_dict(self, ordered=False):
        return self.all(as_dict=not (ordered), as_ordereddict=ordered)
代码语言:javascript
复制
    def all(self, as_dict=False, as_ordereddict=False):
        """Returns a list of all rows for the RecordCollection. If they haven't
        been fetched yet, consume the iterator and cache the results."""

        # By calling list it calls the __iter__ method
        rows = list(self)

        if as_dict:
            return [r.as_dict() for r in rows]
        elif as_ordereddict:
            return [r.as_dict(ordered=True) for r in rows]

        return rows
    # OrderedDict迭代器

utils.clear_data(self.domain)

代码语言:javascript
复制
def clear_data(domain):
    db = Database()
    db.drop_table(domain)
    db.close()
    # 删除表
    # 此时表里的数据未进行resolve, http req, 前面由get_data取出了再删除
0x2.2.7 DNS解析验证
代码语言:javascript
复制
self.data = resolve.run_resolve(self.domain, self.data)
resolve.save_db(self.domain, self.data)
代码语言:javascript
复制
def run_resolve(domain, data):
    """
    调用子域解析入口函数

    :param str domain: 待解析的主域
    :param list data: 待解析的子域数据列表
    :return: 解析得到的结果列表
    :rtype: list
    """
    logger.log('INFOR', f'Start resolving subdomains of {domain}')
    subdomains = filter_subdomain(data)
    if not subdomains:
        return data

    massdns_dir = settings.third_party_dir.joinpath('massdns')
    result_dir = settings.result_save_dir
    temp_dir = result_dir.joinpath('temp')
    utils.check_dir(temp_dir)
    massdns_path = utils.get_massdns_path(massdns_dir)
    timestring = utils.get_timestring()

    save_name = f'collected_subdomains_{domain}_{timestring}.txt'
    save_path = temp_dir.joinpath(save_name)
    save_subdomains(save_path, subdomains)
    del subdomains
    gc.collect()

    output_name = f'resolved_result_{domain}_{timestring}.json'
    output_path = temp_dir.joinpath(output_name)
    log_path = result_dir.joinpath('massdns.log')
    ns_path = utils.get_ns_path()

    logger.log('INFOR', f'Running massdns to resolve subdomains')
    utils.call_massdns(massdns_path, save_path, ns_path,
                       output_path, log_path, quiet_mode=True)

    infos = deal_output(output_path)
    data = update_data(data, infos)
    logger.log('INFOR', f'Finished resolve subdomains of {domain}')
    return data
代码语言:javascript
复制
call_massdns
代码语言:javascript
复制
def call_massdns(massdns_path, dict_path, ns_path, output_path, log_path,
                 query_type='A', process_num=1, concurrent_num=10000,
                 quiet_mode=False):
    logger.log('DEBUG', 'Start running massdns')
    quiet = ''
    if quiet_mode:
        quiet = '--quiet'
    status_format = settings.brute_status_format
    socket_num = settings.brute_socket_num
    resolve_num = settings.brute_resolve_num
    cmd = f'{massdns_path} {quiet} --status-format {status_format} ' \
          f'--processes {process_num} --socket-count {socket_num} ' \
          f'--hashmap-size {concurrent_num} --resolvers {ns_path} ' \
          f'--resolve-count {resolve_num} --type {query_type} ' \
          f'--flush --output J --outfile {output_path} ' \
          f'--root --error-log {log_path} {dict_path} --filter OK ' \
          f'--sndbuf 0 --rcvbuf 0'
    logger.log('DEBUG', f'Run command {cmd}')
    subprocess.run(args=cmd, shell=True)
    logger.log('DEBUG', f'Finished massdns')

resolve.save_db(self.domain, self.data)

代码语言:javascript
复制
def save_db(name, data):
    """
    Save resolved results to database

    :param str  name: table name
    :param list data: data to be saved
    """
    logger.log('INFOR', f'Saving resolved results')
    utils.save_to_db(name, data, 'resolve') // !!!
代码语言:javascript
复制
def save_to_db(name, data, module):
    """
    Save request results to database

    :param str  name: table name
    :param list data: data to be saved
    :param str module: module name
    """
    db = Database()
    db.drop_table(name)
    db.create_table(name)
    db.save_db(name, data, module)
    db.close()
代码语言:javascript
复制
if not self.req:    # 如果没有req
    self.data = self.export_data()
    self.datas.extend(self.data)
    return self.data
    # 跟前面相同
if self.enable_wildcard:    # 如果有泛解析
    # deal wildcard
    self.data = wildcard.deal_wildcard(self.data)
# HTTP request
utils.clear_data(self.domain)
request.run_request(self.domain, self.data, self.port)

self.data = wildcard.deal_wildcard(self.data)

代码语言:javascript
复制
def deal_wildcard(data):
    new_data = list()
    appear_times = stat_times(data)
    for info in data:
        subdomain = info.get('subdomain')
        isvalid, reason = check_valid_subdomain(appear_times, info)
        logger.log('DEBUG', f'{subdomain} is {isvalid} subdomain reason because {reason}')
        if isvalid:
            new_data.append(info)   # !!!
    return new_data
0x2.2.8 http请求验证模块
代码语言:javascript
复制
utils.clear_data(self.domain)   # 跟上一样
request.run_request(self.domain, self.data, self.port)  # 跑req,更新数据
代码语言:javascript
复制
def run_request(domain, data, port):
    """
    HTTP request entrance

    :param  str domain: domain to be requested
    :param  list data: subdomains data to be requested
    :param  any port: range of ports to be requested
    :return list: result
    """
    logger.log('INFOR', f'Start requesting subdomains of {domain}')
    data = utils.set_id_none(data)
    ports = get_port_seq(port)
    req_data, req_urls = gen_req_data(data, ports)
    bulk_request(domain, req_data)
    count = utils.count_alive(domain)
    logger.log('INFOR', f'Found that {domain} has {count} alive subdomains')
代码语言:javascript
复制
def bulk_request(domain, req_data, ret=False):
    logger.log('INFOR', 'Requesting urls in bulk')
    resp_queue = Queue()
    urls_queue = Queue()
    task_count = len(req_data)
    for index, info in enumerate(req_data):
        url = info.get('url')
        urls_queue.put((index, url))
    session = get_session()
    thread_count = req_thread_count()
    if task_count <= thread_count:
        # 如果请求任务数很小不用创建很多线程了
        thread_count = task_count
    bar = get_progress_bar(task_count)

    progress_thread = Thread(target=progress, name='ProgressThread',
                             args=(bar, task_count, urls_queue), daemon=True)
    progress_thread.start()

    for i in range(thread_count):
        request_thread = Thread(target=request, name=f'RequestThread-{i}',
                                args=(urls_queue, resp_queue, session), daemon=True)
        request_thread.start()
    if ret:
        urls_queue.join()
        return resp_queue
    save_thread = Thread(target=save, name=f'SaveThread',
                         args=(domain, task_count, req_data, resp_queue), daemon=True)
    save_thread.start()
    urls_queue.join()
    save_thread.join()
0x2.2.9 爬取解析模块
代码语言:javascript
复制
# Finder module
if settings.enable_finder_module:
    finder = Finder()
    finder.run(self.domain, self.data, self.port)
代码语言:javascript
复制
def run(self, domain, data, port):
    logger.log('INFOR', f'Start Finder module')
    existing_subdomains = set(map(lambda x: x.get('subdomain'), data))  # 已有的子域
    found_subdomains = find_subdomains(domain, data)    # !!! 主要功能在此函数
    new_subdomains = found_subdomains - existing_subdomains
    if not len(new_subdomains):
    self.finish()  # 未发现新的子域就直接返回
    self.subdomains = new_subdomains
    self.finish()
    self.gen_result()
    resolved_data = resolve.run_resolve(domain, self.results)
    request.run_request(domain, resolved_data, port)
# https://github.com/GerbenJavado/LinkFinder
代码语言:javascript
复制
def find_subdomains(domain, data):
    subdomains = set()
    js_urls = set()
    db = Database()
    for infos in data:
        jump_history = infos.get('history')
        req_url = infos.get('url')
        subdomains.update(find_in_history(domain, req_url, jump_history))
        # URL跳转历史中查找子域名
        rsp_html = db.get_resp_by_url(domain, req_url)
        if not rsp_html:
            logger.log('DEBUG', f'an abnormal response occurred in the request {req_url}')
            continue
        subdomains.update(find_in_resp(domain, req_url, rsp_html))
        # 返回内容种查找子域名
        js_urls.update(find_js_urls(domain, req_url, rsp_html))
        # js中查找子域名
    req_data = convert_to_dict(js_urls)
    resp_data = request.bulk_request(domain, req_data, ret=True)
    while not resp_data.empty():
        _, resp = resp_data.get()
        if not isinstance(resp, Response):
            continue
        text = utils.decode_resp_text(resp)
        subdomains.update(find_in_resp(domain, resp.url, text))
    return subdomains
代码语言:javascript
复制
def find_in_history(domain, url, history):
    logger.log('TRACE', f'matching subdomains from history of {url}')
    return match_subdomains(domain, history)
  • URL跳转历史中查找子域名,通过match_subdomains匹配
  • def find_in_resp(domain, url, html): logger.log('TRACE', f'matching subdomains from response of {url}') return match_subdomains(domain, html)
  • 从返回内容种查找子域名
  • def find_js_urls(domain, req_url, rsp_html): js_urls = set() new_urls = find_new_urls(rsp_html) if not new_urls: return js_urls for rel_url in new_urls: url = convert_url(req_url, rel_url) if not filter_url(domain, url): js_urls.add(url) return js_urls
  • 从js中查找子域名
0x2.2.10 子域置换模块
代码语言:javascript
复制
# altdns module
if settings.enable_altdns_module:
    altdns = Altdns(self.domain)
    altdns.run(self.data, self.port)
# 根据已有的子域,使用子域替换技术再次发现新的子域
代码语言:javascript
复制
    def run(self, data, port):
        logger.log('INFOR', f'Start altdns module')
        self.now_subdomains = utils.get_subdomains(data)
        self.get_words()
        self.extract_words()
        self.gen_new_subdomains()
        self.subdomains = self.new_subdomains - self.now_subdomains
        count = len(self.subdomains)
        logger.log('INFOR', f'The altdns module generated {count} new subdomains')
        self.end = time.time()
        self.elapse = round(self.end - self.start, 1)
        self.gen_result()
        resolved_data = resolve.run_resolve(self.domain, self.results)
        valid_data = wildcard.deal_wildcard(resolved_data)  # 强制开启泛解析处理
        request.run_request(self.domain, valid_data, port)
代码语言:javascript
复制
self.get_words()    # 得到words,words来源./data/altdns_wordlist.txt
self.extract_words() # 根据目标的域命名约定扩展字典
self.gen_new_subdomains()
代码语言:javascript
复制
    def get_words(self):
        path = settings.data_storage_dir.joinpath('altdns_wordlist.txt')
        with open(path) as fd:
            for line in fd:
                word = line.lower().strip()
                if word:
                    self.words.add(word)
                    # 读取altdns_wordlist.txt内容, 生成words元组
代码语言:javascript
复制
def extract_words(self):
       """
       Extend the dictionary based on target's domain naming conventions
       """
 
       for subdomain in self.now_subdomains:
           _, parts = split_domain(subdomain)
           tokens = set(itertools.chain(*[word.lower().split('-') for word in parts]))
           tokens = tokens.union({word.lower() for word in parts})
           for token in tokens:
               if len(token) >= self.wordlen:
                   self.words.add(token)
                   # 从上面收集的数据中,得到words
                   # eg: xiaotao.tao.com -> token=xiaotao
代码语言:javascript
复制
    def gen_new_subdomains(self):
        for subdomain in self.now_subdomains:
            subname, parts = split_domain(subdomain)
            subnames = subname.split('.')
            if settings.altdns_increase_num:
                self.increase_num(subname)
        # test.1.foo.example.com -> test.2.foo.example.com, test.3.foo.example.com, ...
        # test1.example.com -> test2.example.com, test3.example.com, ...
        # test01.example.com -> test02.example.com, test03.example.com, ...
            if settings.altdns_decrease_num:
                self.decrease_num(subname)
        # test.4.foo.example.com -> test.3.foo.example.com, test.2.foo.example.com, ...
        # test4.example.com -> test3.example.com, test2.example.com, ...
        # test04.example.com -> test03.example.com, test02.example.com, ...
            if settings.altdns_replace_word:
                self.replace_word(subname)
        # WORD1.1.foo.example.com -> WORD2.1.foo.example.com,
        #                            WORD3.1.foo.example.com,
        #                            WORD4.1.foo.example.com,
        #                            ..
            if settings.altdns_insert_word:
                self.insert_word(parts)
        # test.1.foo.example.com -> WORD.test.1.foo.example.com,
        #                           test.WORD.1.foo.example.com,
        #                           test.1.WORD.foo.example.com,
        #                           test.1.foo.WORD.example.com,
        #                           ...
            if settings.altdns_add_word:
                self.add_word(subnames)
        # Prepend with `-`
        # test.1.foo.example.com -> WORD-test.1.foo.example.com
        # Prepend with `-`
        # test.1.foo.example.com -> test-WORD.1.foo.example.com

5种置换方式, 具体规则如上面代码注释

代码语言:javascript
复制
......
resolved_data = resolve.run_resolve(self.domain, self.results)
        valid_data = wildcard.deal_wildcard(resolved_data)  # 强制开启泛解析处理
        request.run_request(self.domain, valid_data, port)

对生成的子域名,该模块会对其进行dns验证解析和http请求,利用的原理跟上面一样

0x2.2.11 丰富结果
代码语言:javascript
复制
# Information enrichment module
if settings.enable_enrich_module:
    enrich = Enrich(self.domain)
    enrich.run()
    # 对结果信息进行丰富

public、cidr、asn、org、addr、isp等内容,丰富更新数据库结果,不进行域名收集

代码语言:javascript
复制
public: 是否是公网IP
cidr: ip2location库查询出的CIDR
asn: ip2location库查询出的ASN
org: 
addr:ip2region库查询出的物理地址
isp: ip2region库查询出的网络服务提供商
0x2.2.12 导出数据至csv文件
代码语言:javascript
复制
self.data = self.export_data()  # 导出数据至csv文件
self.datas.extend(self.data)

前面提及过export_data函数,于上面一样

0x2.2.13 子域名接管扫描模块
代码语言:javascript
复制
# Scan subdomain takeover
if self.takeover:
    subdomains = utils.get_subdomains(self.data)
    takeover = Takeover(targets=subdomains)
    takeover.run()
# 扫描检测是否存在子域名接管漏洞
代码语言:javascript
复制
    def run(self):
        start = time.time()
        logger.log('INFOR', f'Start running {self.source} module')
        if isinstance(self.targets, set):
            self.subdomains = self.targets
        else:
            self.subdomains = utils.get_domains(self.target, self.targets)
        self.fmt = utils.check_format(self.fmt)
        timestamp = utils.get_timestamp()
        name = f'takeover_check_result_{timestamp}'
        self.path = utils.check_path(self.path, name, self.fmt)
        if self.subdomains:
            logger.log('INFOR', f'Checking subdomain takeover')
            self.fingerprints = get_fingerprint()
            self.results.headers = ['subdomain', 'cname']
            # 创建待检查的子域队列
            for domain in self.subdomains:
                self.queue.put(domain)
            # 进度线程
            progress_thread = Thread(target=self.progress, name='ProgressThread',
                                     daemon=True)
            progress_thread.start()
            # 检查线程
            for i in range(self.thread):
                check_thread = Thread(target=self.check, name=f'CheckThread{i}',
                                      daemon=True)
                check_thread.start()

            self.queue.join()
            self.save()
        else:
            logger.log('FATAL', f'Failed to obtain domain')
        end = time.time()
        elapse = round(end - start, 1)
        logger.log('ALERT', f'{self.source} module takes {elapse} seconds, '
                            f'There are {len(self.results)} subdomains exists takeover')
        logger.log('INFOR', f'Subdomain takeover results: {self.path}')
        logger.log('INFOR', f'Finished {self.source} module')

具体参考:https://www.hackerone.com/application-security/guide-subdomain-takeovers

代码语言:javascript
复制
return self.data

0x3 总结

看到这里,如果你仔细跟看几遍,那么你会明白的!最后,结合自己的需求进行二开,才是最好的利器!!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-02-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 黑白天实验室 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0x1 流程总结
  • 0x2 流程分析
    • 0x2.1 run()
      • 0x2.2 main()
        • 0x2.2.1
        • 0x2.2.2 Collect()模块
        • 0x2.2.3 BruteSRV
        • 0x2.2.4 brute模块
        • 0x2.2.5 数据处理
        • 0x2.2.6
        • 0x2.2.7 DNS解析验证
        • 0x2.2.8 http请求验证模块
        • 0x2.2.9 爬取解析模块
        • 0x2.2.10 子域置换模块
        • 0x2.2.11 丰富结果
        • 0x2.2.12 导出数据至csv文件
        • 0x2.2.13 子域名接管扫描模块
    • 0x3 总结
    相关产品与服务
    命令行工具
    腾讯云命令行工具 TCCLI 是管理腾讯云资源的统一工具。使用腾讯云命令行工具,您可以快速调用腾讯云 API 来管理您的腾讯云资源。此外,您还可以基于腾讯云的命令行工具来做自动化和脚本处理,以更多样的方式进行组合和重用。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档