公司最近再推接口自动化测试,基于正向业务流程做了接口自动化覆盖(比如创建订单成功、支付成功等),在过程中发现了一些程序漏洞:如一些参数异常情况,会导致程序崩溃(如数组越界、空传入)。如果这些漏洞被别有用心的人发现进而攻击的话,那生产环境将会极大的收到影响,甚至频频崩溃导致正常用户无法使用。因此,想针对接口做逆向场景的测试,来保证接口的健壮性、系统稳定性。
本次主要考虑覆盖以下几个方面的测试:
正向场景
测试人员会明确知道被测接口的信息,并且根据接口文档传入正确的业务值,进而保证被测场景畅通;并且每个接口需要针对性的传入正确的值,单独维护一条用例。
逆向场景
和正常测试场景不同的是,逆向错误场景具有普适性:每个api接口的所有参数均可覆盖以上所有错误类型。基于单个接口去编写测试用例无意义(都是重复逻辑),且低效。
因此,需要解析所有的api接口,针对其全量覆盖所有的错误类型,从而断言是否有指定错误发生。
// parser.py
from swagger_parser import SwaggerParser
class Parser:
def __init__(self, swagger_dict):
self.swagger_parser = SwaggerParser(swagger_dict=swagger_dict)
def get_specification(self):
"""
Returns the entire Swagger specification
"""
return self.swagger_parser.specification
def get_info(self):
"""
Returns the info object from the Swagger specification
"""
if "info" in self.swagger_parser.specification:
return self.swagger_parser.specification["info"]
else:
return {}
def get_swagger_version(self):
"""
Returns the Swagger version from the Swagger specification
"""
if "swagger" in self.swagger_parser.specification:
return self.swagger_parser.specification["swagger"]
else:
return ""
def get_base_path(self):
"""
Returns the base path from the Swagger specification
"""
if "basePath" in self.swagger_parser.specification:
return self.swagger_parser.specification["basePath"]
else:
return ""
def get_paths(self):
"""
Returns the paths object from the Swagger specification
"""
if "paths" in self.swagger_parser.specification:
return self.swagger_parser.specification["paths"]
else:
return {}
def get_path(self, path):
"""
Returns the path object from the Swagger specification
"""
if path in self.swagger_parser.specification["paths"]:
return self.swagger_parser.specification["paths"][path]
else:
return {}
def get_path_summary(self, path, pathMethod):
"""
Returns the summary for a path from the Swagger specification
"""
if path in self.swagger_parser.specification["paths"]:
if pathMethod in self.swagger_parser.specification["paths"][path]:
if (
"summary"
in self.swagger_parser.specification["paths"][path][pathMethod]
):
return self.swagger_parser.specification["paths"][path][pathMethod][
"summary"
]
else:
return ""
else:
return ""
else:
return ""
def get_path_methods(self, path):
"""
Returns the methods for a path from the Swagger specification
"""
if path in self.swagger_parser.specification["paths"]:
return self.swagger_parser.specification["paths"][path].keys()
else:
return []
def get_path_method(self, path, method):
"""
Returns the method object for a path from the Swagger specification
"""
if path in self.swagger_parser.specification["paths"]:
if method in self.swagger_parser.specification["paths"][path]:
return self.swagger_parser.specification["paths"][path][method]
else:
return {}
else:
return {}
def get_path_method_parameters(self, path, method):
"""
Returns the parameters for a method from the Swagger specification
"""
if "parameters" in self.swagger_parser.specification["paths"][path][method]:
return self.swagger_parser.specification["paths"][path][method][
"parameters"
]
else:
return []
def get_definitions(self):
"""
Returns the definitions object from the Swagger specification
"""
if "definitions" in self.swagger_parser.specification:
return self.swagger_parser.specification["definitions"]
else:
return {}
def get_definition(self, definition):
"""
Returns the definition object from the Swagger specification
"""
if definition in self.swagger_parser.specification["definitions"]:
return self.swagger_parser.specification["definitions"][definition]
else:
return {}
def get_definition_properties(self, definition):
"""
Returns the properties for a definition from the Swagger specification
"""
if definition in self.swagger_parser.specification["definitions"]:
if (
"properties"
in self.swagger_parser.specification["definitions"][definition]
):
return self.swagger_parser.specification["definitions"][definition][
"properties"
]
else:
return {}
else:
return {}
def get_definition_property(self, definition, property):
"""
Returns the property for a definition from the Swagger specification
"""
if definition in self.swagger_parser.specification["definitions"]:
return self.swagger_parser.specification["definitions"][definition][
"properties"
][property]
else:
return {}
def get_model_responses(self):
"""
Returns the responses object from the Swagger specification
"""
if "responses" in self.swagger_parser.specification:
return self.swagger_parser.specification["responses"]
else:
return {}
def get_model_response(self, response):
"""
Returns the response object from the Swagger specification
"""
if response in self.swagger_parser.specification["responses"]:
return self.swagger_parser.specification["responses"][response]
else:
return {}
if __name__ == "__main__":
pass
// fuzzer.py
import requests
import json
from apifuzzer.parser import Parser
class Fuzzer(object):
def __init__(
self,
api_definition_url=None,
api_definition_file=None,
api_definition_path=None,
api_definition_method=None,
):
if api_definition_url is not None:
response = requests.get(api_definition_url)
data = response.json()
elif api_definition_file is not None:
with open(api_definition_file) as file:
data = json.load(file)
else:
raise ValueError("Please provide either a file or a URL")
self.api_definition_path = api_definition_path
self.api_definition_method = api_definition_method
self.parser = Parser(swagger_dict=data)
self.paramLists = []
def prepare(self):
pass
def run(self):
if (
self.api_definition_path is not None
and self.api_definition_method is not None
):
result = self.get_swagger_parameter(
self.api_definition_path, self.api_definition_method
)
else:
result = self.get_swagger_parameters()
print(result)
def get_swagger_parameter(self, path, method):
"""Returns the parameters for a method from the Swagger specification"""
self.get_path_method_parameters(path, method)
return self.paramLists
def get_swagger_parameters(self):
"""Returns the parameters for all methods from the Swagger specification"""
for path in self.parser.get_paths():
methods = self.parser.get_path_methods(path)
for method in methods:
self.get_path_method_parameters(path, method)
return self.paramLists
def get_path_method_parameters(self, path, method):
"""Returns the parameters for a method from the Swagger specification"""
summary = self.parser.get_path_summary(path, method)
param = {"path": path, "method": method, "summary": summary}
path_params = []
query_params = []
body_params = []
parameters = self.parser.get_path_method_parameters(path, method)
for parameter in parameters:
if parameter["in"] == "path":
path_params.append(parameter["name"])
elif parameter["in"] == "query":
query_params.append(parameter["name"])
elif parameter["in"] == "body":
if "schema" in parameter:
schema = parameter["schema"]
if "$ref" in schema:
ref = schema["$ref"]
ref = ref.split("/")[-1]
properties = self.parser.get_definition_properties(ref)
body_params.append(properties)
elif "items" in schema:
items = schema["items"]
if "$ref" in items:
ref = items["$ref"]
ref = ref.split("/")[-1]
properties = self.parser.get_definition_properties(ref)
body_params.append(properties)
param["path_params"] = path_params
param["query_params"] = query_params
param["body_params"] = body_params
self.paramLists.append(param)
if __name__ == "__main__":
pass
// api.py
#!/usr/bin/env python3
import argparse
from apifuzzer.fuzzer import Fuzzer
def main():
argumentParser = argparse.ArgumentParser(description="api_fuzzer configuration")
argumentParser.add_argument(
"--swagger_file",
type=str,
required=False,
help="File path to the Swagger specification",
dest="swagger_file",
# default="swagger.json",
)
argumentParser.add_argument(
"--swagger_url",
type=str,
required=False,
help="URL of the Swagger specification",
dest="swagger_url",
# default="https://petstore.swagger.io/v2/swagger.json",
)
argumentParser.add_argument(
"--swagger_path",
type=str,
required=False,
help="Path to be fuzzed",
dest="swagger_path",
)
argumentParser.add_argument(
"--swagger_method",
type=str,
required=False,
help="Method to be fuzzed",
dest="swagger_method",
)
args = argumentParser.parse_args()
if args.swagger_file is None and args.swagger_url is None:
print("Please provide either a file or a URL")
exit()
prog = Fuzzer(
api_definition_url=args.swagger_url,
api_definition_file=args.swagger_file,
api_definition_path=args.swagger_path,
api_definition_method=args.swagger_method,
)
prog.run()
if __name__ == "__main__":
main()
// 执行上述代码解析出的swagger文档信息如下格式:
[{'path': '/pet/{petId}/uploadImage', 'method': 'post', 'summary': 'uploads an image', 'path_params': ['petId'], 'query_params': [], 'body_params': []},
{'path': '/pet/{petId}/uploadImage1', 'method': 'get', 'summary': 'uploads an image', 'path_params': ['petId'], 'query_params': [], 'body_params': []}]
将上述api接口信息,构建成可发起request请求的数据。
// An highlighted block
// 实际要丰富生成随机数据方法
from faker import Faker
from random import choice
from string import ascii_letters, digits
class BasicData:
def __init__(self):
self.fake = Faker()
def generate_string(self):
# 生成一个随机字符串
return ''.join(choice(ascii_letters + digits) for _ in range(10))
def generate_list(self):
# 生成一个随机列表
return [self.fake.word() for _ in range(5)]
def generate_dict(self):
# 生成一个随机字典
return {self.fake.word(): self.fake.word() for _ in range(5)}
def generate_bool(self):
# 生成一个随机布尔值
return choice([True, False])
def generate_sql_injection(self):
# 生成一个随机SQL注入语句
injections = [
"' OR '1'='1", # 简单条件真
";DROP TABLE users;", # 尝试删除用户表
"' UNION SELECT * FROM users --", # 联合查询攻击
"' AND 1=0 UNION SELECT user_id, password FROM users --", # 特定信息查询
"%27 OR %271=%271", # URL编码的简单条件真
]
return choice(injections)
def get_value(self):
"""
随机返回一种数据类型,包括随机SQL注入语句。
"""
generators = [
self.generate_string,
self.generate_list,
self.generate_dict,
self.generate_bool,
self.generate_sql_injection,
]
return choice(generators)()
// 1、替换path中的参数
// 2、将param、body构造dict
class ApiParameters:
"""
ApiParameters类用于构造API请求参数,包括路径参数、查询参数和请求体参数。
to do:
1. 增加属性判断,构造何种测试场景
如:参数类型错误type=0、参数超长 type=1、参数传空type=2、不合法参数type=3(异常字符串)
不一定调用调用BasicData类
2. 是否每个参数都需要覆盖上述四种?应是随机覆盖吧
"""
def __init__(self, path=None, path_params=None, query_params=None, body_params=None):
"""
:param path: swagger解析出的path
:param path_params:path参数list
:param query_params:params参数list
:param body_params:body参数list
"""
self.path = path
self.path_params = path_params
self.query_params = query_params
self.body_params = body_params
def get_api_path(self):
"""
:return: 将path中的路径参数替换为实际值
"""
if self.path_params is not None:
for path_param in self.path_params:
replace_param = BasicData().generate_string() # path中必须是字符串
self.path = self.path.replace("{" + path_param + "}", replace_param)
return self.path
def get_api_params(self):
"""
:return: 将query_params中的查询参数替换为实际值,并以字典形式返回
"""
query_params_dict = {}
if self.query_params is not None:
for query_param in self.query_params:
query_params_dict[query_param] = BasicData().get_value()
return query_params_dict
def get_api_body(self):
"""
:return: 将body_params中的请求体参数替换为实际值,并以字典形式返回
"""
body_params_dict = {}
if self.body_params is not None:
for body_param in self.body_params:
if body_param: # 判断body_param是否为空
body_param_key = list(body_param.keys())
for key in body_param_key:
body_params_dict[key] = BasicData().get_value()
return body_params_dict
// An highlighted block
class BasicRequest():
"""
封装requests请求方法,并针对返回结果进行断言
"""
def __init__(self, method=None, path='', params=None, body=None, headers=None):
"""
:param method:
:param path:
:param params:
:param body:
:param headers:
"""
self.method = method
self.url = BasicCase.base_url+path
self.params = params
self.body = body
self.headers = headers
self.response = None
def send_request(self):
"""
发送请求,并返回响应resopnse
:return: self.response
"""
if not self.method or not self.url:
raise ValueError("请传入正确的method和url")
self.log_reqeust_params()
if self.method.upper() in ["POST", "GET"]:
try:
self.response = requests.request(method=self.method,
url=self.url, headers=self.headers,
params=self.params, json=self.body)
logging.info(f"请求发送成功,响应状态码是{self.response.status_code}")
return True
except Exception as e:
logging.info(f"请求失败,报错{e}")
return False
else:
logging.info(f"不支持{self.method}请求方法")
return False
def log_reqeust_params(self):
logging.info(f"请求方式是:{self.method}")
logging.info(f"请求url是:{self.url}")
logging.info(f"请求headers是:{self.headers}")
logging.info(f"请求params是:{self.params}")
logging.info(f"请求body是:{self.body}")
def get_response_json(self):
"""
获取接口响应结果
:return:
"""
try: # 打印响应体,部分接口没有响应体
res = self.response.json()
logging.info(f"请求结果是:{res}")
return res
except Exception as e:
logging.info(f"未解析到响应体,报错{e}")
return False
def get_response_status_code(self):
"""
获取响应状态码
:return:
"""
try:
return self.response.status_code
except Exception as e:
logging.info(f"未解析到状态码,报错{e}")
return False
def basic_assert(self):
"""
断言响应结果
:return:
"""
if self.send_request(): # 如果请求成功
assert self.get_response_status_code() == 200
res = self.get_response_json()
if res: # 有响应体应体
if res.get("err_code", None) is not None: # 判断响应体是否有code字段
assert self.response.json()["err_code"] not in ['非预期错误码1', '非预期错误码2']
else:
logging.info("响应体中未包含code字段")
# else:
# assert False, "响应体为空"
else:
logging.info("请求未成功发送")
assert False, "请求未成功发送"
if __name__ == '__main__':
BasicRequest().basic_assert()
// BasicCase类初始化测试常量,比如baseurl、src_url(swagger地址)、headers等
class TestDemo2(BasicCase):
fuzzer = Fuzzer(api_definition_url=BasicCase.src_url)
swaggerList = fuzzer.get_swagger_parameters()
def setup_class(self):
logging.info("测试类级别的前置")
if not self.cookie:
raise Exception("cookie为空")
self.headers["cookie"] = BasicCase.cookie
if not self.swaggerList:
raise Exception("swagger列表为空")
def teardown_class(self):
logging.info("测试结束")
@pytest.mark.parametrize("api_params", swaggerList)
def test_apifuzzer(self, api_params):
path = api_params["path"]
# 循环执行n次,尽可能覆盖所有类型的数据生成
for i in range(1, 7):
logging.info(f"**************{path}接口第{i}次开始执行****************")
# 构造请求参数
api = ApiParameters(path=path, path_params=api_params["path_params"], query_params=api_params["query_params"],
body_params=api_params["body_params"])
# 发送请求
BasicRequest(method=api_params["method"], path=api.get_api_path(), headers=self.headers,
params=api.get_api_params(), body=api.get_api_body()).basic_assert()
logging.info(f"{path}接口结束执行")
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。