前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >Python模糊测试实战

Python模糊测试实战

原创
作者头像
用户10931828
发布2025-01-07 14:09:45
发布2025-01-07 14:09:45
2530
举报
文章被收录于专栏:接口测试

背景

公司最近再推接口自动化测试,基于正向业务流程做了接口自动化覆盖(比如创建订单成功、支付成功等),在过程中发现了一些程序漏洞:如一些参数异常情况,会导致程序崩溃(如数组越界、空传入)。如果这些漏洞被别有用心的人发现进而攻击的话,那生产环境将会极大的收到影响,甚至频频崩溃导致正常用户无法使用。因此,想针对接口做逆向场景的测试,来保证接口的健壮性、系统稳定性。

本次主要考虑覆盖以下几个方面的测试:

  • 不合法字符串
  • 字符串超长
  • 应该是数字类型的,传入了字母
  • 参数为空
  • 传入了中文,标点符号等
  • sql注入等等

思路

正向场景

测试人员会明确知道被测接口的信息,并且根据接口文档传入正确的业务值,进而保证被测场景畅通;并且每个接口需要针对性的传入正确的值,单独维护一条用例。

逆向场景

和正常测试场景不同的是,逆向错误场景具有普适性:每个api接口的所有参数均可覆盖以上所有错误类型。基于单个接口去编写测试用例无意义(都是重复逻辑),且低效。

因此,需要解析所有的api接口,针对其全量覆盖所有的错误类型,从而断言是否有指定错误发生。


核心实现

一、解析swagger文档,获取所有接口详情

1、具体代码

代码语言:javascript
复制
// parser.py
from swagger_parser import SwaggerParser


class Parser:
    def __init__(self, swagger_dict):
        self.swagger_parser = SwaggerParser(swagger_dict=swagger_dict)

    def get_specification(self):
        """
        Returns the entire Swagger specification
        """
        return self.swagger_parser.specification

    def get_info(self):
        """
        Returns the info object from the Swagger specification
        """
        if "info" in self.swagger_parser.specification:
            return self.swagger_parser.specification["info"]
        else:
            return {}

    def get_swagger_version(self):
        """
        Returns the Swagger version from the Swagger specification
        """
        if "swagger" in self.swagger_parser.specification:
            return self.swagger_parser.specification["swagger"]
        else:
            return ""

    def get_base_path(self):
        """
        Returns the base path from the Swagger specification
        """
        if "basePath" in self.swagger_parser.specification:
            return self.swagger_parser.specification["basePath"]
        else:
            return ""

    def get_paths(self):
        """
        Returns the paths object from the Swagger specification
        """
        if "paths" in self.swagger_parser.specification:
            return self.swagger_parser.specification["paths"]
        else:
            return {}

    def get_path(self, path):
        """
        Returns the path object from the Swagger specification
        """
        if path in self.swagger_parser.specification["paths"]:
            return self.swagger_parser.specification["paths"][path]
        else:
            return {}

    def get_path_summary(self, path, pathMethod):
        """
        Returns the summary for a path from the Swagger specification
        """
        if path in self.swagger_parser.specification["paths"]:
            if pathMethod in self.swagger_parser.specification["paths"][path]:
                if (
                    "summary"
                    in self.swagger_parser.specification["paths"][path][pathMethod]
                ):
                    return self.swagger_parser.specification["paths"][path][pathMethod][
                        "summary"
                    ]
                else:
                    return ""
            else:
                return ""
        else:
            return ""

    def get_path_methods(self, path):
        """
        Returns the methods for a path from the Swagger specification
        """
        if path in self.swagger_parser.specification["paths"]:
            return self.swagger_parser.specification["paths"][path].keys()
        else:
            return []

    def get_path_method(self, path, method):
        """
        Returns the method object for a path from the Swagger specification
        """
        if path in self.swagger_parser.specification["paths"]:
            if method in self.swagger_parser.specification["paths"][path]:
                return self.swagger_parser.specification["paths"][path][method]
            else:
                return {}
        else:
            return {}

    def get_path_method_parameters(self, path, method):
        """
        Returns the parameters for a method from the Swagger specification
        """
        if "parameters" in self.swagger_parser.specification["paths"][path][method]:
            return self.swagger_parser.specification["paths"][path][method][
                "parameters"
            ]
        else:
            return []

    def get_definitions(self):
        """
        Returns the definitions object from the Swagger specification
        """
        if "definitions" in self.swagger_parser.specification:
            return self.swagger_parser.specification["definitions"]
        else:
            return {}

    def get_definition(self, definition):
        """
        Returns the definition object from the Swagger specification
        """
        if definition in self.swagger_parser.specification["definitions"]:
            return self.swagger_parser.specification["definitions"][definition]
        else:
            return {}

    def get_definition_properties(self, definition):
        """
        Returns the properties for a definition from the Swagger specification
        """
        if definition in self.swagger_parser.specification["definitions"]:
            if (
                "properties"
                in self.swagger_parser.specification["definitions"][definition]
            ):
                return self.swagger_parser.specification["definitions"][definition][
                    "properties"
                ]
            else:
                return {}
        else:
            return {}

    def get_definition_property(self, definition, property):
        """
        Returns the property for a definition from the Swagger specification
        """
        if definition in self.swagger_parser.specification["definitions"]:
            return self.swagger_parser.specification["definitions"][definition][
                "properties"
            ][property]
        else:
            return {}

    def get_model_responses(self):
        """
        Returns the responses object from the Swagger specification
        """
        if "responses" in self.swagger_parser.specification:
            return self.swagger_parser.specification["responses"]
        else:
            return {}

    def get_model_response(self, response):
        """
        Returns the response object from the Swagger specification
        """
        if response in self.swagger_parser.specification["responses"]:
            return self.swagger_parser.specification["responses"][response]
        else:
            return {}


if __name__ == "__main__":
    pass
代码语言:javascript
复制
// fuzzer.py
import requests
import json

from apifuzzer.parser import Parser


class Fuzzer(object):
    def __init__(
        self,
        api_definition_url=None,
        api_definition_file=None,
        api_definition_path=None,
        api_definition_method=None,
    ):
        if api_definition_url is not None:
            response = requests.get(api_definition_url)
            data = response.json()
        elif api_definition_file is not None:
            with open(api_definition_file) as file:
                data = json.load(file)
        else:
            raise ValueError("Please provide either a file or a URL")

        self.api_definition_path = api_definition_path
        self.api_definition_method = api_definition_method

        self.parser = Parser(swagger_dict=data)
        self.paramLists = []

    def prepare(self):
        pass

    def run(self):
        if (
            self.api_definition_path is not None
            and self.api_definition_method is not None
        ):
            result = self.get_swagger_parameter(
                self.api_definition_path, self.api_definition_method
            )
        else:
            result = self.get_swagger_parameters()
        print(result)

    def get_swagger_parameter(self, path, method):
        """Returns the parameters for a method from the Swagger specification"""
        self.get_path_method_parameters(path, method)
        return self.paramLists

    def get_swagger_parameters(self):
        """Returns the parameters for all methods from the Swagger specification"""
        for path in self.parser.get_paths():
            methods = self.parser.get_path_methods(path)
            for method in methods:
                self.get_path_method_parameters(path, method)
        return self.paramLists

    def get_path_method_parameters(self, path, method):
        """Returns the parameters for a method from the Swagger specification"""
        summary = self.parser.get_path_summary(path, method)
        param = {"path": path, "method": method, "summary": summary}
        path_params = []
        query_params = []
        body_params = []
        parameters = self.parser.get_path_method_parameters(path, method)
        for parameter in parameters:
            if parameter["in"] == "path":
                path_params.append(parameter["name"])
            elif parameter["in"] == "query":
                query_params.append(parameter["name"])
            elif parameter["in"] == "body":
                if "schema" in parameter:
                    schema = parameter["schema"]
                    if "$ref" in schema:
                        ref = schema["$ref"]
                        ref = ref.split("/")[-1]
                        properties = self.parser.get_definition_properties(ref)
                        body_params.append(properties)
                    elif "items" in schema:
                        items = schema["items"]
                        if "$ref" in items:
                            ref = items["$ref"]
                            ref = ref.split("/")[-1]
                            properties = self.parser.get_definition_properties(ref)
                            body_params.append(properties)

        param["path_params"] = path_params
        param["query_params"] = query_params
        param["body_params"] = body_params
        self.paramLists.append(param)


if __name__ == "__main__":
    pass

2、使用方法

代码语言:javascript
复制
// api.py
#!/usr/bin/env python3

import argparse
from apifuzzer.fuzzer import Fuzzer


def main():
    argumentParser = argparse.ArgumentParser(description="api_fuzzer configuration")
    argumentParser.add_argument(
        "--swagger_file",
        type=str,
        required=False,
        help="File path to the Swagger specification",
        dest="swagger_file",
        # default="swagger.json",
    )
    argumentParser.add_argument(
        "--swagger_url",
        type=str,
        required=False,
        help="URL of the Swagger specification",
        dest="swagger_url",
        # default="https://petstore.swagger.io/v2/swagger.json",
    )
    argumentParser.add_argument(
        "--swagger_path",
        type=str,
        required=False,
        help="Path to be fuzzed",
        dest="swagger_path",
    )
    argumentParser.add_argument(
        "--swagger_method",
        type=str,
        required=False,
        help="Method to be fuzzed",
        dest="swagger_method",
    )
    args = argumentParser.parse_args()
    if args.swagger_file is None and args.swagger_url is None:
        print("Please provide either a file or a URL")
        exit()

    prog = Fuzzer(
        api_definition_url=args.swagger_url,
        api_definition_file=args.swagger_file,
        api_definition_path=args.swagger_path,
        api_definition_method=args.swagger_method,
    )
    prog.run()


if __name__ == "__main__":
    main()
代码语言:javascript
复制
// 执行上述代码解析出的swagger文档信息如下格式:
[{'path': '/pet/{petId}/uploadImage', 'method': 'post', 'summary': 'uploads an image', 'path_params': ['petId'], 'query_params': [], 'body_params': []},
{'path': '/pet/{petId}/uploadImage1', 'method': 'get', 'summary': 'uploads an image', 'path_params': ['petId'], 'query_params': [], 'body_params': []}]

二、组装请求数据,发送请求

将上述api接口信息,构建成可发起request请求的数据。

1、生成随机数据

代码语言:javascript
复制
// An highlighted block
// 实际要丰富生成随机数据方法
from faker import Faker
from random import choice
from string import ascii_letters, digits

class BasicData:
    def __init__(self):
        self.fake = Faker()

    def generate_string(self):
        # 生成一个随机字符串
        return ''.join(choice(ascii_letters + digits) for _ in range(10))

    def generate_list(self):
        # 生成一个随机列表
        return [self.fake.word() for _ in range(5)]

    def generate_dict(self):
        # 生成一个随机字典
        return {self.fake.word(): self.fake.word() for _ in range(5)}

    def generate_bool(self):
        # 生成一个随机布尔值
        return choice([True, False])

    def generate_sql_injection(self):
        # 生成一个随机SQL注入语句
        injections = [
            "' OR '1'='1",  # 简单条件真
            ";DROP TABLE users;",  # 尝试删除用户表
            "' UNION SELECT * FROM users --",  # 联合查询攻击
            "' AND 1=0 UNION SELECT user_id, password FROM users --",  # 特定信息查询
            "%27 OR %271=%271",  # URL编码的简单条件真
        ]
        return choice(injections)

    def get_value(self):
        """
        随机返回一种数据类型,包括随机SQL注入语句。
        """
        generators = [
            self.generate_string,
            self.generate_list,
            self.generate_dict,
            self.generate_bool,
            self.generate_sql_injection,
        ]
        return choice(generators)()

2、请求数据构建

代码语言:javascript
复制
// 1、替换path中的参数
// 2、将param、body构造dict
class ApiParameters:
    """
    ApiParameters类用于构造API请求参数,包括路径参数、查询参数和请求体参数。
    to do:
        1. 增加属性判断,构造何种测试场景
            如:参数类型错误type=0、参数超长 type=1、参数传空type=2、不合法参数type=3(异常字符串)
        不一定调用调用BasicData类
        2. 是否每个参数都需要覆盖上述四种?应是随机覆盖吧
    """

    def __init__(self, path=None, path_params=None, query_params=None, body_params=None):
        """
        :param path: swagger解析出的path
        :param path_params:path参数list
        :param query_params:params参数list
        :param body_params:body参数list
        """
        self.path = path
        self.path_params = path_params
        self.query_params = query_params
        self.body_params = body_params

    def get_api_path(self):
        """
        :return: 将path中的路径参数替换为实际值
        """
        if self.path_params is not None:
            for path_param in self.path_params:
                replace_param = BasicData().generate_string()  # path中必须是字符串
                self.path = self.path.replace("{" + path_param + "}", replace_param)
        return self.path

    def get_api_params(self):
        """
        :return: 将query_params中的查询参数替换为实际值,并以字典形式返回
        """
        query_params_dict = {}
        if self.query_params is not None:
            for query_param in self.query_params:
                query_params_dict[query_param] = BasicData().get_value()
        return query_params_dict

    def get_api_body(self):
        """
        :return: 将body_params中的请求体参数替换为实际值,并以字典形式返回
        """
        body_params_dict = {}
        if self.body_params is not None:
            for body_param in self.body_params:
                if body_param:  # 判断body_param是否为空
                    body_param_key = list(body_param.keys())
                    for key in body_param_key:
                        body_params_dict[key] = BasicData().get_value()
        return body_params_dict

3、请求发送与基础断言

代码语言:javascript
复制
// An highlighted block
class BasicRequest():
    """
    封装requests请求方法,并针对返回结果进行断言
    """

    def __init__(self, method=None, path='', params=None, body=None, headers=None):
        """
        :param method:
        :param path:
        :param params:
        :param body:
        :param headers:
        """
        self.method = method
        self.url = BasicCase.base_url+path
        self.params = params
        self.body = body
        self.headers = headers
        self.response = None

    def send_request(self):
        """
        发送请求,并返回响应resopnse
        :return: self.response
        """
        if not self.method or not self.url:
            raise ValueError("请传入正确的method和url")
        self.log_reqeust_params()
        if self.method.upper() in ["POST", "GET"]:
            try:
                self.response = requests.request(method=self.method,
                                                 url=self.url, headers=self.headers,
                                                 params=self.params, json=self.body)
                logging.info(f"请求发送成功,响应状态码是{self.response.status_code}")
                return True
            except Exception as e:
                logging.info(f"请求失败,报错{e}")
                return False
        else:
            logging.info(f"不支持{self.method}请求方法")
            return False

    def log_reqeust_params(self):
        logging.info(f"请求方式是:{self.method}")
        logging.info(f"请求url是:{self.url}")
        logging.info(f"请求headers是:{self.headers}")
        logging.info(f"请求params是:{self.params}")
        logging.info(f"请求body是:{self.body}")

    def get_response_json(self):
        """
        获取接口响应结果
        :return:
        """
        try:  # 打印响应体,部分接口没有响应体
            res = self.response.json()
            logging.info(f"请求结果是:{res}")
            return res
        except Exception as e:
            logging.info(f"未解析到响应体,报错{e}")
            return False

    def get_response_status_code(self):
        """
        获取响应状态码
        :return:
        """
        try:
            return self.response.status_code
        except Exception as e:
            logging.info(f"未解析到状态码,报错{e}")
            return False

    def basic_assert(self):
        """
        断言响应结果
        :return:
        """
        if self.send_request():  # 如果请求成功
            assert self.get_response_status_code() == 200
            res = self.get_response_json()
            if res:  # 有响应体应体
                if res.get("err_code", None) is not None:  # 判断响应体是否有code字段
                    assert self.response.json()["err_code"] not in ['非预期错误码1', '非预期错误码2']
                else:
                    logging.info("响应体中未包含code字段")
            # else:
            #     assert False, "响应体为空"
        else:
            logging.info("请求未成功发送")
            assert False, "请求未成功发送"


if __name__ == '__main__':
    BasicRequest().basic_assert()

三、测试用例实现

代码语言:javascript
复制
// BasicCase类初始化测试常量,比如baseurl、src_url(swagger地址)、headers等
class TestDemo2(BasicCase):
    fuzzer = Fuzzer(api_definition_url=BasicCase.src_url)
    swaggerList = fuzzer.get_swagger_parameters()

    def setup_class(self):
        logging.info("测试类级别的前置")
        if not self.cookie:
            raise Exception("cookie为空")
        self.headers["cookie"] = BasicCase.cookie
        if not self.swaggerList:
            raise Exception("swagger列表为空")

    def teardown_class(self):
        logging.info("测试结束")

    @pytest.mark.parametrize("api_params", swaggerList)
    def test_apifuzzer(self, api_params):
        path = api_params["path"]
        # 循环执行n次,尽可能覆盖所有类型的数据生成
        for i in range(1, 7):
            logging.info(f"**************{path}接口第{i}次开始执行****************")
            # 构造请求参数
            api = ApiParameters(path=path, path_params=api_params["path_params"], query_params=api_params["query_params"],
                                body_params=api_params["body_params"])
            # 发送请求
            BasicRequest(method=api_params["method"], path=api.get_api_path(), headers=self.headers,
                         params=api.get_api_params(), body=api.get_api_body()).basic_assert()
        logging.info(f"{path}接口结束执行")

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 思路
  • 核心实现
    • 一、解析swagger文档,获取所有接口详情
      • 1、具体代码
      • 2、使用方法
    • 二、组装请求数据,发送请求
      • 1、生成随机数据
      • 2、请求数据构建
      • 3、请求发送与基础断言
    • 三、测试用例实现
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档