前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【问题解决】记一次线上安全测试中误用父类属性导致数据污染的解决方案

【问题解决】记一次线上安全测试中误用父类属性导致数据污染的解决方案

原创
作者头像
sidiot
修改2024-06-28 07:52:14
1640
修改2024-06-28 07:52:14
举报
文章被收录于专栏:技术大杂烩技术大杂烩

前言

在线上安全测试的过程中,会使用 Nmap 进行端口扫描,为了提升端口扫描的效率,扫描策略通常是检测常用端口是否处于开放状态,并在父类中使用名为 all_open_ports 的属性来记录这些开放的端口。

在后续的测试过程中,需要检查所涉及的端口是否包含在 all_open_ports 中。如果不存在,就需要进一步对这些端口进行开放检测。如果端口的检测结果是开放的,测试将继续进行并将这些端口记录到 all_open_ports 中,以便在下次遇到相同端口时无需重复检测。

然而,由于安全测试是多线程进行的,某些情况下可以将 all_open_ports 理解为共享变量,这导致当两个不同的测试环境同时进行安全测试时,数据相互污染,从而影响最终测试结果的准确性。

为了解决这个问题,需要重新设计变量 all_open_ports 的存储和访问方式,以确保在多线程环境下数据的独立性和一致性,接下来由博主为各位读者进行仔细讲解。

本文代码点击此处跳转,博文中的所有代码全部收集在博主的 GitHub 仓库中。

场景复现

先创建一个父类 Parent,定义一个类属性 all_open_ports 用来记录已经开放的端口,并创建一个方法 check_port() 来模拟端口检测,代码如下所示:

代码语言:javascript
复制
class Parent:
    all_open_ports = set()

    def __init__(self, args):
        self.all_open_ports.update(args.get("open_ports", []))

    def check_port(self, port):
        # 忽略端口扫描...
        if port not in self.all_open_ports:
            self.all_open_ports.add(port)
            print(f"{port} in all_open_ports, {self.all_open_ports}")
            pass

再创建一个子类 Child 继承父类,构造 scan() 方法来模拟安全测试过程,代码如下所示:

代码语言:javascript
复制
import threading

from parent import Parent


class Child(Parent):
    def __init__(self, args):
        super().__init__(args)
        self.port = args.get("port")

    def scan(self):
        print(threading.current_thread().name, self.all_open_ports)
        self.check_port(self.port)
        pass

最后创建一个测试用例,实例化两个 Child 对象,并以多线程的方式运行对象方法 scan() 来进行场景复现,代码如下所示:

代码语言:javascript
复制
def test_thread():
    c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]})
    t1 = threading.Thread(target=c1.scan, name="Child_1")
    t1.start()
    t1.join()

    c2 = Child({"port": 5001, "open_ports": [80, 3306, 5000]})
    t2 = threading.Thread(target=c2.scan, name="Child_2")
    t2.start()
    t2.join()

    print("All tasks have finished!")

运行结果:

根因分析

造成上述问题的根本原因就是在多线程中 all_open_ports 可被当成共享变量使用,致使数据相互污染,从而影响最终测试结果的准确性。

因为 all_open_ports 是在父类中定义的一个类属性,这意味着它是类 Parent 的一部分,它被所有派生类(子类)所共享。通过这种方式,父类的所有子类都可以访问和更新 all_open_ports 属性。

每当子类的实例创建时,如果传递了 open_ports 参数,那么这些端口将被添加到 all_open_ports 集合中,并且在父类中的 check_port 方法中,判断给定端口 port 是否存在于 all_open_ports 集合中,如果不存在,则将端口添加到集合中。这样,所有子类实例都可以共享和更新这个属性。

现在我们修改部分代码,在打印时输出 all_open_ports 的地址来判断是否使用了同一变量,代码如下所示:

代码语言:javascript
复制
def scan(self):
    print(threading.current_thread().name, self.all_open_ports, "id:", id(self.all_open_ports))
    self.check_port(self.port)
    pass

运行结果:

那么有什么方法能解决当前的问题呢?

  • 重新初始化 all_open_ports
  • 上下文管理 contextvar
  • 线程本地变量 thread.local

重新初始化 all_open_ports

重新初始化 all_open_ports 的方法是最快捷的,但是会有一个问题,重新初始化 all_open_ports 会使得每个 Child 对象都有自己独立的 all_open_ports 集合,而不会共享相同的集合,这会发生重复检测端口的情况,也就违背了一开始的设计初衷。

创建一个测试用例来观察一下当前的 all_open_ports 集合使用情况,代码如下所示:

代码语言:javascript
复制
def test_init_set():
    c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]})
    c2 = Child({"port": 3002, "open_ports": [80, 443, 3306]})
    print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)
    c1.scan()
    print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)
    c2.scan()
    print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)

运行结果:

根据运行结果可以发现, all_open_ports 集合在当前情况下可以被看做是共享变量,哪怕在不同的线程中,个 Child 对象都能共享 all_open_ports 集合。

这时候,修改父类 Parent 中的 __init__ 代码,使得 all_open_ports 集合在 __init__ 时重新初始化,代码如下所示:

代码语言:javascript
复制
def __init__(self, args):
    self.all_open_ports = set()
    self.all_open_ports.update(args.get("open_ports", []))

运行结果:

根据运行结果可以发现,c1c2 中的 all_open_ports 是完全独立的集合,c1all_open_ports 集合中的增加操作不会影响到 c2,这虽然避免了数据污染,但是会导致在 c1 检测过的端口还需要在 c2 重新进行检测,这与我们一开始设计 all_open_ports 集合来提升效率的想法背道而驰了。

上下文管理 contextvar

contextvars 是 Python 3.7 引入的一个模块,用于提供上下文变量的功能。它是线程安全的,允许在异步编程和多线程环境中共享上下文相关的数据,而不会出现数据污染的问题,但是在较旧的 Python 版本中无法使用。

contextvars 模块提供了 ContextVar 类,它是一个上下文变量的容器。每个 ContextVar 对象都可以存储一个值,并且在不同的上下文中可以访问和修改这个值。上下文可以是线程、协程或其他异步任务。

不过需要注意的是,由于上下文变量的值可以在不同的上下文中共享,可能会导致代码中的隐式依赖。这可能增加代码的复杂性和维护成本。


二更:有被自己蠢到,实际是可行的。

先分析一下上一次为什么不行。

因为上一次一直在操作一个对象,所以本质上是在操作 set 而不是 ContextVar,可以在 check_port() 方法里添加打印 all_open_ports 的代码,代码如下所示:

代码语言:javascript
复制
def check_port(self, port):
    all_open_ports_ = self.all_open_ports.get()
    print(print_prefix(), id(all_open_ports_))
    ...

运行结果:

结果打印出来的 id 也证实了我们的说法,因此,接下来我们要更改代码,使其操作的是 ContextVar 对象,代码如下所示:

代码语言:javascript
复制
class ParentContext:
    all_open_ports = contextvars.ContextVar("all_open_ports", default=set())

    def __init__(self, args):
        open_ports = set(args.get("open_ports", []))
        self.all_open_ports.set(open_ports | self.all_open_ports.get())

    def check_port(self, port):
        all_open_ports_ = self.all_open_ports.get()
        print(print_prefix(), id(all_open_ports_))
        if port not in all_open_ports_:
            all_open_ports_.add(port)
            self.all_open_ports.set(all_open_ports_)
            print(f"{print_prefix()} Port {port} is added to all_open_ports, {self.all_open_ports.get()}")

运行结果:


一更:先说结论,好像不行,不知道是不是思路有问题,希望各位大神指点一下!

运行结果:

代码如下所示:

代码语言:javascript
复制
class ParentContext:
    all_open_ports = contextvars.ContextVar("all_open_ports", default=set())

    def __init__(self, args):
        open_ports = self.all_open_ports.get()
        open_ports.update(args.get("open_ports", []))
        self.all_open_ports.set(open_ports)

    def check_port(self, port):
        all_open_ports_ = self.all_open_ports.get()
        if port not in all_open_ports_:
            all_open_ports_.add(port)
            self.all_open_ports.set(all_open_ports_)
            print(f"{print_prefix()} Port {port} is added to all_open_ports, {self.all_open_ports.get()}")


class ChildContext(ParentContext):
    def __init__(self, args):
        super().__init__(args)
        self.port = args.get("port")

    def scan(self, port=None):
        self.check_port(port or self.port)
        pass


def test_contextvars(open_ports, port):
    c1 = ChildContext({"port": port, "open_ports": open_ports})
    c1.scan()


if __name__ == '__main__':
    t1 = threading.Thread(target=test_contextvars, name="Child_1", args=([80, 3306, 5000], 5001,))
    t2 = threading.Thread(target=test_contextvars, name="Child_2", args=([22, 3306, 6000], 6001,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

线程本地变量 thread.local

threading.local() 是 Python 标准库中的一个类,它提供了一种在多线程环境下创建线程本地存储的机制。它允许每个线程都有自己独立的变量副本,这些变量在不同线程之间是相互隔离的,不会相互干扰。

当多个线程同时执行时,它们可以访问和修改各自的线程本地变量,而不会影响其他线程的变量。这对于需要在线程之间共享数据,但又需要保持数据独立性的情况非常有用。

接下来,我们创建父类 ParentLocal,并使用 threading.local() 来存储集合 all_open_ports,代码如下所示:

代码语言:javascript
复制
class ParentLocal:
    local = threading.local()

    def __init__(self, args):
        self.local.all_open_ports = getattr(self.local, "all_open_ports", set())
        self.local.all_open_ports.update(args.get("open_ports", []))

    def check_port(self, port):
        if port not in self.local.all_open_ports:
            self.local.all_open_ports.add(port)
            print(f"{self.print_prefix()} Port {port} is added to all_open_ports, {self.local.all_open_ports}")

    def print_prefix(self):
        return f"[{time.strftime('%H:%M:%S', time.localtime())} {threading.current_thread().name}]"

在上述代码中,ParentLocal 类定义了初始化方法 __init__,通过 getattr() 函数来获取 self.local.all_open_ports 的值。如果 self.local.all_open_ports 不存在,则使用 set() 创建一个空的集合,并将其赋值给 self.local.all_open_ports。然后,我们使用 update() 方法将 args.get("open_ports", []) 中的端口添加到 self.local.all_open_ports 中。

通过使用 ParentLocal 类,我们可以在多线程环境中创建多个实例,并且每个实例都有自己独立的 all_open_ports 变量。这样,不同线程的实例之间的数据不会相互干扰。

Child 类与之前基本保持不变,代码如下所示:

代码语言:javascript
复制
class ChildLocal(ParentLocal):
    def __init__(self, args):
        super().__init__(args)
        self.port = args.get("port")

    def scan(self, port=None):
        self.check_port(port or self.port)
        pass

在上述代码中,ChildLocal 类是继承自 ParentLocal 类的子类,通过继承关系它可以访问父类的 self.local.all_open_ports 集合。这使得 ChildLocal 实例可以在同一线程下共享数据,同时不会受到其他线程中的 ChildLocal 实例的影响。

编写测试代码如下所示:

代码语言:javascript
复制
def tset_local(open_ports, port):
    c1 = ChildLocal({"port": port, "open_ports": open_ports})
    c1.scan()
    args = {"port": generate_random_numbers(1)[0], "open_ports": generate_random_numbers(3)}
    print(threading.current_thread().name, args)
    c2 = ChildLocal(args)
    c2.scan()
    time.sleep(3)
    c1.scan(random.randint(8000, 9999))


if __name__ == '__main__':
    t1 = threading.Thread(target=tset_local, name="Child_1", args=([80, 3306, 5000], 5001,))
    t2 = threading.Thread(target=tset_local, name="Child_2", args=([22, 3306, 6000], 6001,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

运行结果:

如我们所料,Child1Child2 线程中的 ChildLocal 实例相互之间共享 all_open_ports 集合的数据,但是不同线程之间的 ChildLocal 实例不能相互共享数据。

需要注意的是,threading.local() 对象在不同的线程中具有相同的 id 值,这是因为它们实际上是同一个对象的不同实例。每个线程都有自己独立的 threading.local() 对象,但它们共享相同的类定义。

当在不同的线程中创建 threading.local() 对象时,每个线程都会创建一个新的实例,但这些实例的类定义是相同的。因此,它们的 id 值是相同的。

后记

幸好我们及时发现了这个问题,并没有造成安全事故。现在我们将这次经历分享出来,希望能给其他开发团队带来启发,共同提高系统的安全性和稳定性。

以上就是 记一次线上安全测试中误用父类属性导致数据污染的解决方案 的所有内容了,希望本篇博文对大家有所帮助!欢迎大家持续关注我的博客,一起分享学习和成长的乐趣!✨

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 场景复现
  • 根因分析
    • 重新初始化 all_open_ports
      • 上下文管理 contextvar
        • 线程本地变量 thread.local
        • 后记
        相关产品与服务
        手游安全测试
        手游安全测试(Security Radar,SR)为企业提供私密的安全测试服务,通过主动挖掘游戏业务安全漏洞(如钻石盗刷、服务器宕机、无敌秒杀等40多种漏洞),提前暴露游戏潜在安全风险,提供解决方案及时修复,最大程度降低事后外挂危害与外挂打击成本。该服务为腾讯游戏开放的手游安全漏洞挖掘技术,杜绝游戏外挂损失。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档