前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flask + Python3 实现的的

flask + Python3 实现的的

作者头像
py3study
发布2020-01-03 15:48:10
5440
发布2020-01-03 15:48:10
举报
文章被收录于专栏:python3python3

**背景: 1.平时测试接口,总是现写代码,对测试用例的管理,以及测试报告的管理持久化做的不够, 2.工作中移动端开发和后端开发总是不能并行进行,需要一个mock的依赖来让他们并行开发。 3.同时让自己锻炼去开发测试平台,掌握flask开发程序,提高自己的业务水平。

整体思路: 1.利用flask+bootstrap来进行web界面开发,对接口,接口测试用例,定时任务,测试报告的持续集成。 2.IAPTest支持接口用例管理,接口多用例测试,支持定时测试任务,测试报告持久化 3.目前mock服务支持单一path,定时任务可以开启暂停多用例执行,定时任务执行后自动发送测试报告,多用例的单次执行,单接口的调试功能。对测试环境的管理 下面来看下最后的效果图,以及附上github开源地址。

测试环境管理界面:

flask + Python3 实现的的API自动化测试平台----  IAPTest接口测试平台
flask + Python3 实现的的API自动化测试平台---- IAPTest接口测试平台

定时任务界面:

flask + Python3 实现的的API自动化测试平台----  IAPTest接口测试平台
flask + Python3 实现的的API自动化测试平台---- IAPTest接口测试平台

mock界面

flask + Python3 实现的的API自动化测试平台----  IAPTest接口测试平台
flask + Python3 实现的的API自动化测试平台---- IAPTest接口测试平台

测试报告界面

flask + Python3 实现的的API自动化测试平台----  IAPTest接口测试平台
flask + Python3 实现的的API自动化测试平台---- IAPTest接口测试平台

用例管理界面

flask + Python3 实现的的API自动化测试平台----  IAPTest接口测试平台
flask + Python3 实现的的API自动化测试平台---- IAPTest接口测试平台

接口管理界面

flask + Python3 实现的的API自动化测试平台----  IAPTest接口测试平台
flask + Python3 实现的的API自动化测试平台---- IAPTest接口测试平台

核心代码分享区:

定时任务对应视图开发

class AddtimingtaskView(MethodView):br/>@login_required def get(self): return render_template('addtimingtasks.html')br/>@login_required def post(self): taskname=request.form['taskname'] tinmingtime=request.form['time'] to_email_data=request.form['to_email'] cao_email=request.form['cao_email'] weihu=request.form['weihu'] if taskname =='': flash('任务名不能为空!') return render_template('addtimingtasks.html') if tinmingtime =='': flash('任务执行时间不能为空!') return render_template('addtimingtasks.html') if to_email_data=='': flash('发送给谁邮件不能为空!') return render_template('addtimingtasks.html') if weihu=='': flash('维护人邮件不能为空!') return render_template('addtimingtasks.html') taskname_is = Task.query.filter_by(taskname=taskname).first() if taskname_is: flash('任务已经存在请重新填写!') return render_template('addtimingtasks.html') new_task=Task(taskname=taskname,taskstart=tinmingtime,taskrepor_to=to_email_data,taskrepor_cao=cao_email,task_make_email=weihu, makeuser=current_user.id) db.session.add(new_task) try: db.session.commit() flash('添加定时任务成功') return redirect(url_for('timingtask')) except Exception as e: db.session.rollback() flash('添加过程貌似异常艰难!') return redirect(url_for('addtimingtasks')) return render_template('addtimingtasks.html') class Editmingtaskview(MethodView):br/>@login_required def get(self,id): task_one=Task.query.filter_by(id=id).first() procjet=Project.query.all() if not task_one: flash('你编辑的不存在') return redirect(url_for('timingtask')) return render_template('Edittimingtasks.html',task_one=task_one,porjects=procjet) def post(self,id): task_one = Task.query.filter_by(id=id).first() procjet = Project.query.all() taskname = request.form['taskname'] tinmingtime = request.form['time'] to_email_data = request.form['to_email'] cao_email = request.form['cao_email'] weihu = request.form['weihu'] if taskname =='': flash('任务名不能为空!') return render_template('addtimingtasks.html') if tinmingtime =='': flash('任务执行时间不能为空!') return render_template('addtimingtasks.html') if to_email_data=='': flash('发送给谁邮件不能为空!') return render_template('addtimingtasks.html') if weihu=='': flash('维护人邮件不能为空!') return render_template('addtimingtasks.html') task_one.taskname=taskname task_one.taskrepor_to=to_email_data task_one.taskrepor_cao=cao_email task_one.task_make_email=weihu task_one.makeuser=current_user.id try: db.session.commit() flash('编辑成功') return redirect(url_for('timingtask')) except: db.session.rollback() flash('编辑出现问题!') return redirect(url_for('timingtask')) return render_template('Edittimingtasks.html', task_one=task_one,porjects=procjet) class DeteleTaskViee(MethodView): def get(self,id): task_one = Task.query.filter_by(id=id).first() if not task_one: flash('你编辑的不存在') return redirect(url_for('timingtask')) if task_one.status==True: flash('已经删除') return redirect(url_for('timingtask')) task_one.status=True try: db.session.commit() flash('删除任务成功') return redirect(url_for('timingtask')) except: db.session.rollback() flash('删除任务休息了') return redirect(url_for('timingtask'))br/>@app.route('/gettest',methods=['POST']) @login_required def gettest():#ajax获取项目的测试用例 projec=(request.get_data('project')).decode('utf-8') if not projec: return [] proje=Project.query.filter_by(project_name=str(projec)).first() if not proje: return [] testyong=InterfaceTest.query.filter_by(projects_id=proje.id).all() testyong_list=[] for i in testyong: testyong_list.append({'name':i.Interface_name,'id':i.id}) return jsonify({'data':testyong_list}) class TestforTaskView(MethodView):#为测试任务添加测试用例 def get(self,id): procjet = Project.query.all() task_one=Task.query.filter_by(id=id).first() return render_template('addtestyongfortask.html',task_one=task_one,procjets=procjet) def post(self,id): procjet = Project.query.all() task_one = Task.query.filter_by(id=id).first() proc_test=request.form.get('project') if proc_test =='': flash(u'不能不添加测试项目!') return render_template('addtestyongfortask.html', task_one=task_one, procjets=procjet) test_yongli=request.form.getlist('testyongli') if test_yongli=='': flash(u'亲你见过只有测试项目没有测试用例的测试任务吗!') return render_template('addtestyongfortask.html', task_one=task_one, procjets=procjet) for oldtask in task_one.interface.all(): task_one.interface.remove(oldtask) task_one.prject=Project.query.filter_by(project_name=proc_test).first().id for yongli in test_yongli: task_one.interface.append(InterfaceTest.query.filter_by(id=yongli).first()) db.session.add(task_one) try: db.session.commit() flash('任务更新用例成功') return redirect(url_for('timingtask')) except: flash('任务更新用例失败') return redirect(url_for('timingtask')) return render_template('addtestyongfortask.html', task_one=task_one, procjets=procjet) class StartTaskView(MethodView):#开始定时任务br/>@login_required def get(self,id): task=Task.query.filter_by(id=id).first() if len(task.interface.all())<=1: flash('定时任务执行过程的测试用例为多用例,请你谅解') return redirect(url_for('timingtask')) try: scheduler.add_job(func=addtask, id=str(id), args=str(id),trigger=eval(task.taskstart),replace_existing=True) task.yunxing_status='启动' db.session.commit() flash(u'定时任务启动成功!') return redirect(url_for('timingtask')) except Exception as e: flash(u'定时任务启动失败!请检查任务的各项内容各项内容是否正常') return redirect(url_for('timingtask')) class ZantingtaskView(MethodView):#暂停定时任务br/>@login_required def get(self,id): task = Task.query.filter_by(id=id).first() try: scheduler.pause_job(str(id)) task.yunxing_status = '暂停' db.session.commit() flash(u'定时任务暂停成功!') return redirect(url_for('timingtask')) except: task.yunxing_status = '创建' db.session.commit() flash(u'定时任务暂停失败!已经为您初始化') return redirect(url_for('timingtask')) class HuifutaskView(MethodView):#回复定时任务br/>@login_required def get(self,id): task = Task.query.filter_by(id=id).first() try: scheduler.resume_job(str(id)) task.yunxing_status='启动' db.session.commit() flash(u'定时任务恢复成功!') return redirect(url_for('timingtask')) except: task.yunxing_status = '创建' db.session.commit() flash(u'定时任务恢复失败!已经为您初始化') return redirect(url_for('timingtask')) class YichuTaskView(MethodView):#移除定时任务br/>@login_required def get(self,id): task = Task.query.filter_by(id=id).first() try: scheduler.delete_job(str(id)) task.yunxing_status='关闭' db.session.commit() flash(u'定时任务移除成功!') return redirect(url_for('timingtask')) except: task.yunxing_status = '创建' db.session.commit() flash(u'定时任务移除失败!已经为您初始化') return redirect(url_for('timingtask'))

定时任务所执行的func代码

def addtask(id):#定时任务执行的时候所用的函数 in_id=int(id) task=Task.query.filter_by(id=in_id).first() starttime = datetime.datetime.now() star = time.time() day = time.strftime("%Y%m%d%H%M", time.localtime(time.time())) basedir = os.path.abspath(os.path.dirname(file)) file_dir = os.path.join(basedir, 'upload') file = os.path.join(file_dir, (day + '.log')) if os.path.exists(file) is False: os.system('touch %s' % file) filepath = os.path.join(file_dir, (day + '.html')) if os.path.exists(filepath) is False: os.system(r'touch %s' % filepath) projecct_list = [] model_list = [] Interface_name_list = [] Interface_url_list = [] Interface_meth_list = [] Interface_pase_list = [] Interface_assert_list = [] Interface_headers_list = [] id_list = [] for task_yongli in task.interface.all(): id_list.append(task_yongli.id) projecct_list.append(task_yongli.projects) model_list.append(task_yongli.models) Interface_url_list.append(task_yongli.Interface_url) Interface_name_list.append(task_yongli.Interface_name) Interface_meth_list.append(task_yongli.Interface_meth) Interface_pase_list.append(task_yongli.Interface_pase) Interface_assert_list.append(task_yongli.Interface_assert) Interface_headers_list.append(task_yongli.Interface_headers) apitest = ApiTestCase(Interface_url_list, Interface_meth_list, Interface_pase_list, Interface_assert_list, file, Interface_headers_list) result_toal, result_pass, result_fail, relusts, bask_list = apitest.testapi() endtime = datetime.datetime.now() end = time.time() createHtml(titles=u'接口测试报告', filepath=filepath, starttime=starttime, endtime=endtime, passge=result_pass, fail=result_fail, id=id_list, name=projecct_list, headers=Interface_headers_list, coneent=Interface_url_list, url=Interface_meth_list, meth=Interface_pase_list, yuqi=Interface_assert_list, json=bask_list, relusts=relusts) hour = end - star user_id = User.query.filter_by(role_id=2).first().id new_reust = TestResult(Test_user_id=user_id, test_num=result_toal, pass_num=result_pass, fail_num=result_fail, test_time=starttime, hour_time=hour, test_rep=(day + '.html'), test_log=(day + '.log')) email = EmailReport.query.filter_by(role_id=2, default_set=True).first() send_emails(sender=email.send_email, receivers=task.taskrepor_to, password=email.send_email_password, smtp=email.stmp_email, port=email.port, fujian1=file, fujian2=filepath, subject=u'%s自动用例执行测试报告' % day, url='%stest_rep'%(request.url_root)) db.session.add(new_reust) db.session.commit()

mock服务的一个请求方式的代码

1 class MakemockserverView(MethodView):#做一个mock服务 2 def get(self,path):#get请求方法 3 huoqupath=Mockserver.query.filter_by(path=path,status=True).first() 4 heders=request.headers 5 method=request.method 6 if not huoqupath: 7 abort(404) 8 if method.lower() !=huoqupath.methods: 9 return jsonify({'code':'-1','message':'请求方式错误!','data':''}) 10 if huoqupath.is_headers==True: 11 if comp_dict(heders,huoqupath.headers) ==True: 12 if huoqupath.ischeck==True: 13 paerm = request.values.to_dict() 14 if dict_par(paerm,huoqupath.params)==True: 15 if huoqupath.rebacktype == 'json': 16 try: 17 json_fan = json.dumps(huoqupath.fanhui) 18 return jsonify({'code': '1', 'message': 'successs', 'data': json_fan}) 19 except: 20 return jsonify({'code': '-2', 'message': '你写入的返回不能正常json!请检查', 'data': ''}) 21 elif huoqupath.rebacktype == 'xml': 22 response = make_response(huoqupath.fanhui) 23 response.content_type = 'application/xml' 24 return response 25 else: 26 return jsonify({'code': '-2', 'message': '你写入的类型目前系统不支持', 'data': ''}) 27 else: 28 return jsonify({'code': '-4', 'message': '你输入的参数不正确', 'data': ''}) 29 else: 30 if huoqupath.rebacktype=='json': 31 try: 32 json_fan=json.dumps(huoqupath.fanhui) 33 return jsonify({'code': '1', 'message': 'successs', 'data':json_fan}) 34 except: 35 return jsonify({'code': '-2', 'message': '你写入的返回不能正常json!请检查', 'data': ''}) 36 elif huoqupath.rebacktype =='xml': 37 response=make_response(huoqupath.fanhui) 38 response.content_type='application/xml' 39 return response 40 else: 41 return jsonify({'code': '-2', 'message': '你写入的类型目前系统不支持', 'data': ''}) 42 else: 43 return jsonify({'code': '-3', 'message': '安全校验失败!', 'data': ''}) 44 if huoqupath.ischeck == True: 45 paerm = request.values.to_dict() 46 if dict_par(paerm, huoqupath.params) == True: 47 if huoqupath.rebacktype == 'json': 48 try: 49 json_fan = json.dumps(huoqupath.fanhui) 50 return jsonify({'code': '1', 'message': 'successs', 'data': json_fan}) 51 except: 52 return jsonify({'code': '-2', 'message': '你写入的返回不能正常json!请检查', 'data': ''}) 53 elif huoqupath.rebacktype == 'xml': 54 response = make_response(huoqupath.fanhui) 55 response.content_type = 'application/xml' 56 return response 57 else: 58 return jsonify({'code': '-2', 'message': '你写入的类型目前系统不支持', 'data': ''}) 59 else: 60 return jsonify({'code': '-4', 'message': '你输入的参数不正确', 'data': ''}) 61 else: 62 if huoqupath.rebacktype == 'json': 63 try: 64 json_fan = json.dumps(huoqupath.fanhui) 65 return jsonify({'code': '1', 'message': 'successs', 'data': json_fan}) 66 except: 67 return jsonify({'code': '-2', 'message': '你写入的返回不能正常json!请检查', 'data': ''}) 68 elif huoqupath.rebacktype == 'xml': 69 response = make_response(huoqupath.fanhui) 70 response.content_type = 'application/xml' 71 return response 72 else: 73 return jsonify({'code': '-2', 'message': '你写入的类型目前系统不支持', 'data': ''}) #

开源地址:https://github.com/liwanlei/FXTest

使用说明:

1.依赖包为requirements.txt文件下的,可能部分不全,需要什么可以自己使用中增加。 2.目前由于考虑后续迁移内部使用的话,所以移除了注册功能,改为管理员后台添加方式,默认登录:liwanlei 密码:liwanlei 3.部分功能调试还存在一定的问题,欢迎各位多提宝贵意见,

部分功能欠缺:

1.定时任务的持久化,现在处理容易受到运行过程中的宕机等情况重新启动服务器的定时任务全部需要开启 2、mock接口只能支持单一的path

  1. 测试环境没有改为动态配置,动态支持。目前测试环境管理以及上线 4。部分地方可能还会有不严谨性,但是工具可以使用。 5、目前仅支持 http请求中的json格式的, 6.大家可以多提意见,后续会优化,最近一直熬夜加班可能有些消息不能及时回复,还望谅解。

有问题可以联系我:QQ:952943386 email:leileili126@163.com qq群:194704520 新群:683894834

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
持续集成
CODING 持续集成(CODING Continuous Integration,CODING-CI)全面兼容 Jenkins 的持续集成服务,支持 Java、Python、NodeJS 等所有主流语言,并且支持 Docker 镜像的构建。图形化编排,高配集群多 Job 并行构建全面提速您的构建任务。支持主流的 Git 代码仓库,包括 CODING 代码托管、GitHub、GitLab 等。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档