import logging#python自带的
'''
调用logging:日志按照你的想法去输出的话,必须经过两道门槛。
1.logger是收集日志。debug info error,指定一个级别,就会输出指定级别的日志。
2.handdler是输出日志的渠道。输出到指定的文件,还是控制台,还是默认到控制台。
因为我们收集日志和输出渠道是一一对应的。如果没有给这个logger这个收集器指定的渠道的话,
默认到控制台。
代码里设置的,不指定的话就默认到控制台,指定的话,就输出到你指定的地方。
日志收集器取得名字叫root。root是我们不指定,它就自己创建一个这样的收集器,名字叫root。
logging就是默认的root收集器,默认到控制台。
源码指出WARNING或者WARNING级别以上的才可输出,debug和info级别太低。
'''
# logging.debug('今天天气很好')
# logging.info('lagom666')
# logging.warning('kevin')
# logging.error('Monica')
# logging.critical('清菡 还是那么不优秀')
#注意:千万不要以logging这样的内置库命名,不然就会报错。
#定义一个日志收集器 my_logger
my_logger=logging.getLogger('python2020')#调用getLogger定义一个日志收集器
#设定级别,setLevel
my_logger.setLevel('DEBUG')
#formatter设置日志最终输出格式
formatter=logging.Formatter('%(asctime)s-%(levelname)s-%(filename)s-%(name)s-日志信息:%(message)s')
#创建一个我们自己的输出渠道,StreamHandler是默认把它输出到控制台,但是它可以设置级别。
ch=logging.StreamHandler()
ch.setLevel('ERROR')
ch.setFormatter(formatter)
#输出到文本渠道,指定一个文件
fh=logging.FileHandler('py11.txt',encoding='UTF-8')
fh.setLevel('DEBUG')
fh.setFormatter(formatter)
#两者对接--指定输出渠道
my_logger.addHandler(ch)
my_logger.addHandler(fh)
#收集日志
my_logger.debug("python2020年学习python")
my_logger.error('python2020年,一定要挺过去!')
#收集如果不设置级别,就默认从warning或warning以上去收集。输出如果不设置级别,
# 就默认从warning或warning以上去输出。
# getLogger和StreamHandler都可以调用setLevel函数帮它们设置级别。
formatter=logging.Formatter('%(asctime)s-%(levelname)s-%(filename)s-%(name)s-日志信息:%(message)s')
# 安装 mysql 驱动 pip install mysql-connector
import mysql.connector
from tools import project_path2
from tools.read_config2 import ReadConfig
class DoMysql:
def do_msql(self,query_sql,state='all'):#query_sql--->查询语句,state---all多条 1 一条
# 利用这个类从配置文件里读取db info
db_config=eval(ReadConfig().get_config(project_path2.case_config_path,'DB','db_config'))
# 创建一个数据库连接
cnn=mysql.connector.connect(**db_config)
cnn=mysql.connector.connect(**db_config)
#游标 cursor
cursor=cnn.cursor()
#写sql语句--字符串
# query_sql='select mobilephone from people where id<5622'
#执行语句
cursor.execute(query_sql)
#获取结果 打印结果
if state==1:
res=cursor.fetchone()#针对一条数据用fetchone,返回元组类型的数据
else:
res=cursor.fetchall()#针对多行数据,返回列表嵌套元组类型的数据
# print(res)
# print(int(res[0])+1)
#关闭游标
cursor.close()
#关闭连接
cnn.close
return res
if __name__ =='__main__':
from tools.get_data2 import GetData
query_sql = 'select max(id) from loans where MemberID={0}'.format(getattr(GetData, 'loan_member_id'))
# query_sql='select max(mobilephone) from people where mobilephone like "132%"'
#这种模糊查询,只查询132字段开头的最大的手机号
res = DoMysql().do_msql(query_sql)#返回列表嵌套元组
print(res)
# print(res[0][0])#取手机号码
# res=DoMysql().do_msql(query_sql,1)#返回的就是个元组
# print(res[0])
每次从数据库里查询到最大的手机号,在这个基础上加1。
logging引入日志模块。
1.添加投资,充值,提现,加标等业务流程。
2.测试用例增加数据库校验。
由于小编没有p2p的接口,随便找的接口模拟的,所以没跑通,准备留下代码,工作中再实践,以下代码具有可质疑性,如有代码问题,请和小编联系。
来自文件case2.config
来自文件test_data_xiejinjieguo_shoujihao2.xlsx
来自文件do_excel_shoujihao2.py
from openpyxl import load_workbook
from tools.read_config2 import ReadConfig
from tools import project_path2
from tools.get_data2 import GetData
class DoExcel:
@classmethod
def get_data(cls,file_name):
wb = load_workbook(file_name)
mode=eval(ReadConfig.get_config(project_path2.case_config_path, 'MODE', 'mode'))
tel=getattr(GetData,'NoRegTel')#利用反射拿到数据
#利用python查询数据库的方式,来拿到最大的手机号--这里可以加,也可以放到get_data里面
test_data = []#把字典里所有数据都拿到
for key in mode:#遍历这个存在配置文件里的字典
sheet = wb[key]#key是表单名
if mode[key]=='all':
for i in range(2,sheet.max_row+1):#
row_data={}#字典
row_data['case_id']= sheet.cell(i, 1).value #行号 第1列第2行
row_data['url']=sheet.cell(i,2).value
# row_data['data'] = sheet.cell(i, 3).value
if sheet.cell(i,3).value.find('${tel}')!=-1:#有找到这个${tel}
row_data['data']=sheet.cell(i,3).value.replace('${tel}',str(tel))#替换的时候只能用字符串去替换
tel = tel + 1#每次完成tel的调用后就加1
elif sheet.cell(i,3).value.find('${admin_tel}')!=-1:
row_data['data'] = sheet.cell(i, 3).value.replace('${admin_tel}', str(getattr(GetData,'admin_tel')))
elif sheet.cell(i,3).value.find('${loan_member_id}')!=-1:
row_data['data'] = sheet.cell(i, 3).value.replace('${loan_member_id}', str(getattr(GetData,'loan_member_id')))
elif sheet.cell(i, 3).value.find('${normal_tel}') != -1:
row_data['data'] = sheet.cell(i, 3).value.replace('${normal_tel}',str(getattr(GetData, 'normal_tel')))
elif sheet.cell(i, 3).value.find('${memberID}') != -1:
row_data['data'] = sheet.cell(i, 3).value.replace('${memberID }',str(getattr(GetData, 'memberID')))
else:#如果没有找到的话
row_data['data'] = sheet.cell(i, 3).value
row_data['title'] = sheet.cell(i, 4).value
row_data['http_method'] = sheet.cell(i,5).value
row_data['expected'] = sheet.cell(i, 6).value#添加了一个期望值到测试数据里面去
row_data['sheet_name'] = key
test_data.append(row_data)
cls.update_tel(tel,file_name,'init')#更新手机号,针对Excel的操作。
#这里也是可以优化的?什么时候对手机号进行更新?更新的手机号是进行加1,还是加2?还是加3?
else:
for case_id in mode[key]:#
row_data = {} # 字典
row_data['case_id'] = sheet.cell(case_id+1, 1).value # 行号 第1列第2行
row_data['url'] = sheet.cell(case_id+1, 2).value
#做手机号的替换
if sheet.cell(case_id+1, 3).value.find('${tel}') != -1:
row_data['data'] = sheet.cell(case_id+1, 3).value.replace('${tel}', str(tel))
tel+=1#有找到这个${tel}
elif sheet.cell(case_id+1, 3).value.find('${admin_tel}') != -1:
row_data['data'] = sheet.cell(case_id+1, 3).value.replace('${admin_tel}',
str(getattr(GetData, 'admin_tel')))
elif sheet.cell(case_id+1, 3).value.find('${loan_member_id}') != -1:
row_data['data'] = sheet.cell(case_id+1, 3).value.replace('${loan_member_id}',
str(getattr(GetData, 'loan_member_id')))
elif sheet.cell(case_id+1, 3).value.find('${normal_tel}') != -1:
row_data['data'] = sheet.cell(case_id+1, 3).value.replace('${normal_tel}',
str(getattr(GetData, 'normal_tel')))
elif sheet.cell(case_id+1, 3).value.find('${memberID}') != -1:
row_data['data'] = sheet.cell(case_id+1, 3).value.replace('${memberID }', str(getattr(GetData, 'memberID')))
else: # 如果没有找到的话
row_data['data'] = sheet.cell(i, 3).value
row_data['title'] = sheet.cell(case_id+1, 4).value
row_data['http_method'] = sheet.cell(case_id+1,5).value
row_data['expected'] = sheet.cell(case_id+1, 6).value # 添加了一个期望值到测试数据里面去
row_data['sheet_name']=key
test_data.append(row_data)
cls.update_tel(tel + 2, file_name, 'init') # 更新手机号,改造理由同上
return test_data
@staticmethod
#把结果写进Excel
def write_back(file_name,sheet_name,i,result,Testresult):#专门写回数据
wb=load_workbook(file_name)
sheet=wb[sheet_name]
sheet.cell(i,7).value=result
sheet.cell(i,8).value=Testresult
wb.save(file_name)#保存结果
@classmethod
def update_tel(cls,tel,filename,sheet_name):
#函数更新Excel里面手机号的数据
wb=load_workbook(filename)
sheet=wb[sheet_name]
sheet.cell(2,1).value=tel#赋值操作
wb.save(filename)
if __name__ =='__main__':
test_data=DoExcel().get_data(project_path2.test_case_path)
print(test_data)
来自文件do_mysql.py
# 安装 mysql 驱动 pip install mysql-connector
import mysql.connector
from tools import project_path2
from tools.read_config2 import ReadConfig
class DoMysql:
def do_msql(self,query_sql,state='all'):#query_sql--->查询语句,state---all多条 1 一条
# 利用这个类从配置文件里读取db info
db_config=eval(ReadConfig().get_config(project_path2.case_config_path,'DB','db_config'))
# 创建一个数据库连接
cnn=mysql.connector.connect(**db_config)
cnn=mysql.connector.connect(**db_config)
#游标 cursor
cursor=cnn.cursor()
#写sql语句--字符串
# query_sql='select mobilephone from people where id<5622'
#执行语句
cursor.execute(query_sql)
#获取结果 打印结果
if state==1:
res=cursor.fetchone()#针对一条数据用fetchone,返回元组类型的数据
else:
res=cursor.fetchall()#针对多行数据,返回列表嵌套元组类型的数据
# print(res)
# print(int(res[0])+1)
#关闭游标
cursor.close()
#关闭连接
cnn.close
return res
if __name__ =='__main__':
from tools.get_data2 import GetData
query_sql = 'select max(id) from loans where MemberID={0}'.format(getattr(GetData, 'loan_member_id'))
# query_sql='select max(mobilephone) from people where mobilephone like "132%"'
#这种模糊查询,只查询132字段开头的最大的手机号
res = DoMysql().do_msql(query_sql)#返回列表嵌套元组
print(res)
# print(res[0][0])#取手机号码
# res=DoMysql().do_msql(query_sql,1)#返回的就是个元组
# print(res[0])
来自文件get_data2.py
#反射:不管何时何地,只要传入一个类名,它就可以帮你操作。
from tools import project_path2
import pandas as pd
from tools.read_config2 import ReadConfig
class GetData:
Cookie=None
loan_id=None
check_list=eval(ReadConfig().get_config(project_path2.case_config_path,'CHECKLEAVEAMOUNT','check_list'))
NoRegTel = pd.read_excel(project_path2.test_case_path, sheet_name='init').iloc[0, 0] # 从Excel里面拿到的数据
# 参照init表单查看我们这个变量的用处
normal_tel=pd.read_excel(project_path2.test_case_path, sheet_name='init').iloc[1,0]
admin_tel=pd.read_excel(project_path2.test_case_path, sheet_name='init').iloc[2,0]
loan_member_id=pd.read_excel(project_path2.test_case_path,sheet_name='init').iloc[3,0]
memberID = pd.read_excel(project_path2.test_case_path, sheet_name='init').iloc[4, 0]
# 利用pandas从Excel里面拿到的手机号
print(GetData.admin_tel)
来自文件http_request2.py
import requests
from tools.my_log2 import MyLog
my_logger=MyLog()
class HttpRequest:
@staticmethod
def http_request(url,data,http_method,cookie=None):
try :
if http_method.upper()=='POST':
res=requests.post(url,json=data,cookies=cookie)
# elif http_method.upper()=='POST':
# res=requests.post(url,data,cookies=cookie)
else:
my_logger.info("输入的请求方法不对")
except Exception as e:
my_logger.error("请求报错了:{0}".format(e))
raise e
return res#返回结果
if __name__ =='__main__':
#注册
register_url= 'http://api.nnzhp.cn/api/user/add_stu'
register_data= { "name":"niuhanyang","grade":"天蝎座","phone":'18614711314'}
#登录
login_url= 'http://api.nnzhp.cn/api/user/login'
login_data={"username": "niuhanyang", "passwd": 'aA123456'}
# 充值
recharge_url= 'http://api.nnzhp.cn/api/user/gold_add'
recharge_data={"stu_id": "2170", "gold": '1'}
login_res=HttpRequest().http_request(login_url,login_data,'post')
recharge_res=HttpRequest().http_request(recharge_url,recharge_data,'post',login_res.cookies)
print("充值结果:{}".format(recharge_res.json()))
来自文件my_log2.py
# 学习logging:引入日志模块
import logging#python自带的
class MyLog:
def my_log(self,msg,level):
#定义一个日志收集器 my_logger
my_logger=logging.getLogger('python2020')#调用getLogger定义一个日志收集器
#设定级别,setLevel
my_logger.setLevel('DEBUG')
#formatter设置日志最终输出格式
formatter=logging.Formatter('%(asctime)s-%(levelname)s-%(filename)s-%(name)s-日志信息:%(message)s')
#创建一个我们自己的输出渠道,StreamHandler是默认把它输出到控制台,但是它可以设置级别。
ch=logging.StreamHandler()
ch.setLevel('ERROR')
ch.setFormatter(formatter)
#输出到文本渠道,指定一个文件
fh=logging.FileHandler('py11.txt',encoding='UTF-8')
fh.setLevel('DEBUG')
fh.setFormatter(formatter)
#两者对接--指定输出渠道
my_logger.addHandler(ch)
my_logger.addHandler(fh)
#收集日志
if level=='DEBUG':
my_logger.debug(msg)
elif level=='INFO':
my_logger.info(msg)
elif level=='WARNING':
my_logger.warning(msg)
elif level=='ERROR':
my_logger.error(msg)
elif level=='CRITICAL':
my_logger.critical(msg)
#关闭渠道
my_logger.removeHandler(ch)
my_logger.removeHandler(fh)
def debug(self,msg):
self.my_log(msg,'DEBUG')
def info(self,msg):
self.my_log(msg,'INFO')
def error(self,msg):
self.my_log(msg,'ERROR')
if __name__ == '__main__':
pass
来自文件project_path2.py
#路径的可配置
import os
'''专门来读取路径的值'''
project_path=os.path.split(os.path.split(os.path.realpath(__file__))[0])[0]
#对路径进行了切割,返回了这样一个元组
# path=os.path.realpath(__file__)
#测试用例的路径
test_case_path=os.path.join(project_path,'test_data','test_data_xiejinjieguo_shoujihao2.xlsx')
#这就是绝对路径,顶级目录变,它就跟着变
#测试报告的路径
test_report_path=os.path.join(project_path,'test_result','html_report','test_api.html')
#配置文件的路径
case_config_path=os.path.join(project_path,'conf','case2.config')
# print(case_config_path)
#前提是test_data的上一级目录打开,并执行run文件。这种办法,文件在哪里执行都行。
#某个时间,相对路径很好用,但是如果参照物变了就不行了。绝对路径,换台电脑就不行了。
来自文件read_config2.py
import configparser
class ReadConfig:
@staticmethod
def get_config(file_path,section,option):
cf=configparser.ConfigParser()
cf.read(file_path)
return cf[section][option]
if __name__ == '__main__':
from tools import project_path2
print(ReadConfig.get_config(project_path2.case_config_path, 'MODE', 'mode'))
来自文件test_http_request_shoujihao2.py
import unittest
from tools.project_path2 import *
from tools.http_request2 import HttpRequest
from ddt import ddt,data
from tools.do_excel_shoujihao2 import DoExcel
from tools.my_log2 import MyLog
from tools.do_mysql import DoMysql
from tools.get_data2 import GetData
test_data=DoExcel.get_data(test_case_path)#执行所有的用例
my_logger=MyLog()
@ddt#装饰测试类
class TestHttpRequest(unittest.TestCase):
def setUp(self):
pass
@data(*test_data)
def test_api(self,item):
#请求之前完成load_id的替换
my_logger.info('开始执行用例{0}:{1}'.format(item['case_id'],item['title']))
# 方式1
if item['data'].find('${loan_id')!=-1:
if hasattr(GetData,'load_id')==None:
query_sql='select max(id) from loans where MemberID={0}'.format(getattr(GetData,'loan_member_id'))
loan_id=DoMysql().do_msql(query_sql)[0][0]
item['data']=item['data'].replace('${loan_id}',str(loan_id))
setattr(GetData,'loan_id',loan_id)#利用这个去做反射去存储结果
my_logger.info(loan_id)
else:
my_logger.info(getattr(GetData,'loan_id'))
item['data'] = item['data'].replace('${loan_id}', str(getattr(GetData,'loan_id')))
my_logger.info('获取到的请求结果是:{0}'.format(item['data']))
my_logger('-----开始http----接口请求')
if item['sheet_name'] in getattr(GetData,'check_list'):
# 请求之前查询余额
#查询数据库
query_sql='select LeaveAmount from member where mobilephone={0}'.format(eval(item['data'])['mobilephone'])
Before_Amount=DoMysql().do_msql(query_sql,1)[0]
res=HttpRequest.http_request(item['url'],eval(item['data']),item['http_method'],getattr(GetData,'Cookie'))
#请求之后校验余额是否正确
After_Amount = DoMysql().do_msql(query_sql, 1)[0]
if abs(Before_Amount-After_Amount)==eval(item['data'])['amount']:
check_res='金额正确'
else:
check_res = '金额不对'
#1.并不是所有的用例都能这么做。是否做数据库检查?可否放到excel里面去呢?
#2.怎么把检查结果写入excel里面去?
#3.拓展:如果要检查多个语句怎么办?
else:
res = HttpRequest.http_request(item['url'], eval(item['data']), item['http_method'], getattr(GetData, 'Cookie'))
if res.cookies:#利用反射存储cookie值
setattr(GetData,'Cookie',res.cookies)
try:
self.assertEqual(item['expected'],res.json()['error_code'])#做断言,是拿实际结果和期望结果去比对,判断用例通不通过,不加断言,根本没有期望结果,用例都是通过的。
#预期结果
TestResult='PASS'
except AssertionError as e:
TestResult='Failed'
my_logger.info('执行用例出错:{0}'.format(e))
raise e
finally:#加finally,不管用例有没有执行通过,它里面的代码是一定会执行的。
DoExcel.write_back(test_case_path,item['sheet_name'],item['case_id']+1,str(res.json()),TestResult)
my_logger.error('获取到的结果是:{0}'.format(res.json())) # 打印结果
def tearDown(self):
pass
来自文件run2.py
#方法二
#通过配置文件去决定执行哪个模块的用例。通过配置文件,以字典的形式key去存它的表单,
# value去存它执行所有用例还是些其它的用例
import unittest#引入单元测试
import HTMLTestRunner#HTML测试报告
from tools.project_path2 import *
from tools.test_http_request_shoujihao2 import TestHttpRequest
suite=unittest.TestSuite()
# suite.addTest(TestHttpRequest('test_api'))#测试类的实例
loader=unittest.TestLoader()
suite.addTest(loader.loadTestsFromTestCase(TestHttpRequest))
with open(test_report_path,'wb') as file:
#执行用例
runner=HTMLTestRunner.HTMLTestRunner(stream=file,
title='第一阶段项目实战',
description='这个是单元测试报告1115',
tester='清菡')
runner.run(suite)
我的代码结构如下:
运行run2.py文件,可输出结果。正确结果可根据实际项目,我就不附图了。
# s='hello'
# print(s.find('h'))#find是字符串里面的一个方法,寻找字符串里面的子字符或者子字符串
#找到的话,返回的是字符所在的索引
# 找不到的话,返回的是-1
# if s.find('9'):#if后面非空非零,成立表达式都为True,只要是True,if下面的表达式都执行
# new_s=s.replace('o','kevin')
# print(new_s)