什么是AutoLine开源平台
AutoLine开源平台是一个开源自动化测试解决方案,基于RobotFramework进行二次开发,支持RobotFramework几乎所有的库。
源码地址
github地址: https://github.com/small99/AutoLine
码 云 地 址:https://gitee.com/lym51/AutoLine
运行器源码路径及源码结构
在AutoLine中我们自定义实现了RobotFramework的运行器,其路径如下图所示:
源码结构如下图所示:
说明:
一些已经实现的运行器,用于调试测试用
运行器分为自动化运行器、调试运行器、手工运行器三种模式
下面我们对源码进行注释
__author__ ="苦叶子"
"""
公众号: 开源优测
Email: lymking@foxmail.com
"""
importos
importplatform
importcodecs
importtime
importjson
importsubprocess
fromdatetimeimportdatetime
fromthreadingimportThread,Timer
importxml.etree.ElementTreeasET
fromflaskimportcurrent_app
fromflask_loginimportcurrent_user
fromsqlalchemyimportand_
from..modelsimportAutoTask,AutoProject,User
from..importdb
from..auto.builderimportBuilder
from.processimportProcess
# 同步运行器,即阻塞模式,一次只能运行一个RF进程
defrobot_run(category,id):
app = current_app._get_current_object()
iflen(app.config["RESULTS"]) >int(app.config['AUTO_PROCESS_COUNT']):
returnjson.dumps({"status":"busying","msg":"任务池已满!!!"})
builder = Builder(category,id)
builder.build()
app.config["RESULTS"].append(app.config["POOL"].apply_async(builder.test_run,(app,current_user.get_id(),)))
# app.config["POOL"].join()
returnjson.dumps({"status":"success","msg":"任务启动成功"})
# 异步运行器,采用多线程方式,可以启动多个RF进程
defrobot_async_run(category,id):
app = current_app._get_current_object()
builder = Builder(category,id)
builder.build()
thr = Thread(target=builder.test_run,args=[app,current_user.get_id()])
#app.config["RESULTS"].append(thr)
thr.start()
returnjson.dumps({"status":"success","msg":"任务启动成功"})
# 用于检查RF运行进程的状态
defcheck_process_status(app):
print("timer to check ....%d"%len(app.config["RUNNERS"]))
withapp.app_context():
try:
forrunnerinapp.config["RUNNERS"]:
ifrunner.is_finish():
runner.write_result()
app.config["RUNNERS"].remove(runner)
exceptExceptionase:
print(e)
# debug模式运行器
defdebug_run(id):
builder = Builder(id)
builder.build()
runner = Runner(builder.id,builder.build_no)
runner.debug()
return(builder.id,builder.build_no)
# RF运行器
defrun_process(category,id):
builder = Builder(id)
builder.build()
ifbuilder.has_test_case():
runner = Runner(builder.id,builder.build_no)
ifcategory =="auto":
runner.auto_run()
else:
runner.run()
app = current_app._get_current_object()
app.config["TRIGGER"].update_job(id)
app.config["RUNNERS"].append({
"project_id": builder.id,
"task_id": builder.build_no,
"runner": runner
})
returnjson.dumps({"status":"success","msg":"任务启动成功"})
else:
returnjson.dumps({"status":"fail","msg":"项目中没有创建关键字步骤,任务启动失败,请新增关键字步骤!!!"})
# 执行器类
classRunner:
def__init__(self,project_id,build_no):
self.project_id = project_id
self.build_no = build_no
self._process =None
self._timer =None
self._out_fd =None
# 用于调度器自动执行
defauto_run(self):
try:
user_id = User.query.filter_by(username="AutoExecutor").first().id
name = AutoProject.query.filter_by(id=self.project_id).first().name
task = AutoTask(project_id=self.project_id,
build_no=self.build_no,
status="running",
create_author_id=user_id,
create_timestamp=datetime.now())
db.session.add(task)
db.session.commit()
output_dir = os.getcwd() +"/logs/%s/%s"% (self.project_id,self.build_no)
output_dir = output_dir.replace("\\","/")
# -x result/output.xml -l result/log.html -r result/report.html
shell =False
if"Windows"inplatform.platform():
self._out_fd = codecs.open(output_dir +"/logs.log","a+","cp936")
command ="pybot -d %s -L DEBUG -N %s %s/testcase.robot"% (output_dir,name,output_dir)
shell =True
else:
self._out_fd = codecs.open(output_dir +"/logs.log","a+","utf-8")
command = ["pybot","-d","%s"% output_dir,"-L","DEBUG","-N","%s"% name,"%s/testcase.robot"% output_dir]
#print(command)
self._process = subprocess.Popen(command,shell=shell,stdout=self._out_fd,stderr=subprocess.STDOUT)
#self._process = Process(command)
#self._process.start()
exceptExceptionase:
print(str(e))
pass
return{"status":"success",
"msg":"任务启动成功",
"project_id":self.project_id,
"build_no":self.build_no}
# 用于手工在页面触发执行
defrun(self):
#
try:
name = AutoProject.query.filter_by(id=self.project_id).first().name
task = AutoTask(project_id=self.project_id,
build_no=self.build_no,
status="running",
create_author_id=current_user.get_id(),
create_timestamp=datetime.now())
db.session.add(task)
db.session.commit()
output_dir = os.getcwd() +"/logs/%s/%s"% (self.project_id,self.build_no)
output_dir = output_dir.replace("\\","/")
shell =False
if"Windows"inplatform.platform():
self._out_fd = codecs.open(output_dir +"/logs.log","a+","cp936")
command ="pybot -d %s -L DEBUG -N %s %s/testcase.robot"% (output_dir,name,output_dir)
shell =True
else:
self._out_fd = codecs.open(output_dir +"/logs.log","a+","utf-8")
command = ["pybot","-d","%s"% output_dir,"-L","DEBUG","-N","%s"% name,
"%s/testcase.robot"% output_dir]
# print(command)
self._process = subprocess.Popen(command,shell=shell,stdout=self._out_fd,stderr=subprocess.STDOUT)
exceptExceptionase:
print(str(e))
pass
return{"status":"success",
"msg":"任务启动成功",
"project_id":self.project_id,
"build_no":self.build_no}
# 调试模式,用于检查是否符合RF语法
defdebug(self):
try:
output_dir = os.getcwd() +"/logs/%s/%s"% (self.project_id,self.build_no)
output_dir = output_dir.replace("\\","/")
# -x result/output.xml -l result/log.html -r result/report.html
command = ["pybot","-d","%s"% output_dir,"--dryrun","-N","调试输出","%s/testcase.robot"% output_dir]
self._out_fd =open(output_dir +"/debug.log","a+")
self._process = subprocess.Popen(command,shell=False,stdout=self._out_fd,stderr=subprocess.STDOUT)
while True:
ifself._process.poll() ==:# 判断子进程是否结束
break
else:
time.sleep(0.2)
exceptExceptionase:
print(str(e))
pass
return{"status":"success",
"msg":"任务启动成功",
"project_id":self.project_id,
"build_no":self.build_no}
# 停止任务
defstop(self):
status ="success"
msg ="任务终止"
try:
self._process.stop()
msg +="成功"
exceptExceptionase:
status ="fail"
msg = msg +"异常"+str(e)
return{"status": status,
"msg": msg,
"project_id":self.project_id,
"build_no":self.build_no}
defget_output(self,wait_until_finished=False):
returnself._process.get_output(wait_until_finished)
defis_finish(self):
returnself._process.is_finished()
defwrite_result(self):
output_dir = os.getcwd() +"/logs/%s/%s"% (self.project_id,self.build_no)
output_dir = output_dir.replace("\\","/")
print("write ... result ...")
print(os.path.exists(output_dir +"/log.html"))
ifos.path.exists(output_dir +"/log.html"):
time.sleep(0.2)
task = AutoTask.query.filter(and_(AutoTask.project_id ==self.project_id,
AutoTask.build_no ==self.build_no)).first()
tree = ET.parse(output_dir +"/output.xml")
root = tree.getroot()
passed= root.find("./statistics/suite/stat").attrib["pass"]
fail = root.find("./statistics/suite/stat").attrib["fail"]
ifint(fail) !=:
task.status ='fail'
else:
task.status ='pass'
db.session.merge(task)
db.session.commit()
self._timer.canel()
说明:
在运行器中,关键的是一个Builder类,该类实现了从数据库读取数据,并序列号为RF语法的文件
Runner执行器根据类型(web、app、http)调用Builder加载不同的RobotFramework支持库和通用的库,实现对RobotFramework的完整的支持
大家主要看Runner类,这里不对代码一一解释,因为代码本身没什么难度,关键在于细节的看上几遍就懂了的
领取专属 10元无门槛券
私享最新 技术干货