AutoLine源码之RobotFramework运行器

什么是AutoLine开源平台

AutoLine开源平台是一个开源自动化测试解决方案,基于RobotFramework进行二次开发,支持RobotFramework几乎所有的库。

源码地址

github地址: https://github.com/small99/AutoLine

码 云 地 址:https://gitee.com/lym51/AutoLine

运行器源码路径及源码结构

在AutoLine中我们自定义实现了RobotFramework的运行器,其路径如下图所示:

源码结构如下图所示:

说明:

一些已经实现的运行器,用于调试测试用

运行器分为自动化运行器、调试运行器、手工运行器三种模式

下面我们对源码进行注释

__author__ ="苦叶子"

"""

公众号: 开源优测

Email: lymking@foxmail.com

"""

importos

importplatform

importcodecs

importtime

importjson

importsubprocess

fromdatetimeimportdatetime

fromthreadingimportThread,Timer

importxml.etree.ElementTreeasET

fromflaskimportcurrent_app

fromflask_loginimportcurrent_user

fromsqlalchemyimportand_

from..modelsimportAutoTask,AutoProject,User

from..importdb

from..auto.builderimportBuilder

from.processimportProcess

# 同步运行器,即阻塞模式,一次只能运行一个RF进程

defrobot_run(category,id):

app = current_app._get_current_object()

iflen(app.config["RESULTS"]) >int(app.config['AUTO_PROCESS_COUNT']):

returnjson.dumps({"status":"busying","msg":"任务池已满!!!"})

builder = Builder(category,id)

builder.build()

app.config["RESULTS"].append(app.config["POOL"].apply_async(builder.test_run,(app,current_user.get_id(),)))

# app.config["POOL"].join()

returnjson.dumps({"status":"success","msg":"任务启动成功"})

# 异步运行器,采用多线程方式,可以启动多个RF进程

defrobot_async_run(category,id):

app = current_app._get_current_object()

builder = Builder(category,id)

builder.build()

thr = Thread(target=builder.test_run,args=[app,current_user.get_id()])

#app.config["RESULTS"].append(thr)

thr.start()

returnjson.dumps({"status":"success","msg":"任务启动成功"})

# 用于检查RF运行进程的状态

defcheck_process_status(app):

print("timer to check ....%d"%len(app.config["RUNNERS"]))

withapp.app_context():

try:

forrunnerinapp.config["RUNNERS"]:

ifrunner.is_finish():

runner.write_result()

app.config["RUNNERS"].remove(runner)

exceptExceptionase:

print(e)

# debug模式运行器

defdebug_run(id):

builder = Builder(id)

builder.build()

runner = Runner(builder.id,builder.build_no)

runner.debug()

return(builder.id,builder.build_no)

# RF运行器

defrun_process(category,id):

builder = Builder(id)

builder.build()

ifbuilder.has_test_case():

runner = Runner(builder.id,builder.build_no)

ifcategory =="auto":

runner.auto_run()

else:

runner.run()

app = current_app._get_current_object()

app.config["TRIGGER"].update_job(id)

app.config["RUNNERS"].append({

"project_id": builder.id,

"task_id": builder.build_no,

"runner": runner

})

returnjson.dumps({"status":"success","msg":"任务启动成功"})

else:

returnjson.dumps({"status":"fail","msg":"项目中没有创建关键字步骤,任务启动失败,请新增关键字步骤!!!"})

# 执行器类

classRunner:

def__init__(self,project_id,build_no):

self.project_id = project_id

self.build_no = build_no

self._process =None

self._timer =None

self._out_fd =None

# 用于调度器自动执行

defauto_run(self):

try:

user_id = User.query.filter_by(username="AutoExecutor").first().id

name = AutoProject.query.filter_by(id=self.project_id).first().name

task = AutoTask(project_id=self.project_id,

build_no=self.build_no,

status="running",

create_author_id=user_id,

create_timestamp=datetime.now())

db.session.add(task)

db.session.commit()

output_dir = os.getcwd() +"/logs/%s/%s"% (self.project_id,self.build_no)

output_dir = output_dir.replace("\\","/")

# -x result/output.xml -l result/log.html -r result/report.html

shell =False

if"Windows"inplatform.platform():

self._out_fd = codecs.open(output_dir +"/logs.log","a+","cp936")

command ="pybot -d %s -L DEBUG -N %s %s/testcase.robot"% (output_dir,name,output_dir)

shell =True

else:

self._out_fd = codecs.open(output_dir +"/logs.log","a+","utf-8")

command = ["pybot","-d","%s"% output_dir,"-L","DEBUG","-N","%s"% name,"%s/testcase.robot"% output_dir]

#print(command)

self._process = subprocess.Popen(command,shell=shell,stdout=self._out_fd,stderr=subprocess.STDOUT)

#self._process = Process(command)

#self._process.start()

exceptExceptionase:

print(str(e))

pass

return{"status":"success",

"msg":"任务启动成功",

"project_id":self.project_id,

"build_no":self.build_no}

# 用于手工在页面触发执行

defrun(self):

#

try:

name = AutoProject.query.filter_by(id=self.project_id).first().name

task = AutoTask(project_id=self.project_id,

build_no=self.build_no,

status="running",

create_author_id=current_user.get_id(),

create_timestamp=datetime.now())

db.session.add(task)

db.session.commit()

output_dir = os.getcwd() +"/logs/%s/%s"% (self.project_id,self.build_no)

output_dir = output_dir.replace("\\","/")

shell =False

if"Windows"inplatform.platform():

self._out_fd = codecs.open(output_dir +"/logs.log","a+","cp936")

command ="pybot -d %s -L DEBUG -N %s %s/testcase.robot"% (output_dir,name,output_dir)

shell =True

else:

self._out_fd = codecs.open(output_dir +"/logs.log","a+","utf-8")

command = ["pybot","-d","%s"% output_dir,"-L","DEBUG","-N","%s"% name,

"%s/testcase.robot"% output_dir]

# print(command)

self._process = subprocess.Popen(command,shell=shell,stdout=self._out_fd,stderr=subprocess.STDOUT)

exceptExceptionase:

print(str(e))

pass

return{"status":"success",

"msg":"任务启动成功",

"project_id":self.project_id,

"build_no":self.build_no}

# 调试模式,用于检查是否符合RF语法

defdebug(self):

try:

output_dir = os.getcwd() +"/logs/%s/%s"% (self.project_id,self.build_no)

output_dir = output_dir.replace("\\","/")

# -x result/output.xml -l result/log.html -r result/report.html

command = ["pybot","-d","%s"% output_dir,"--dryrun","-N","调试输出","%s/testcase.robot"% output_dir]

self._out_fd =open(output_dir +"/debug.log","a+")

self._process = subprocess.Popen(command,shell=False,stdout=self._out_fd,stderr=subprocess.STDOUT)

while True:

ifself._process.poll() ==:# 判断子进程是否结束

break

else:

time.sleep(0.2)

exceptExceptionase:

print(str(e))

pass

return{"status":"success",

"msg":"任务启动成功",

"project_id":self.project_id,

"build_no":self.build_no}

# 停止任务

defstop(self):

status ="success"

msg ="任务终止"

try:

self._process.stop()

msg +="成功"

exceptExceptionase:

status ="fail"

msg = msg +"异常"+str(e)

return{"status": status,

"msg": msg,

"project_id":self.project_id,

"build_no":self.build_no}

defget_output(self,wait_until_finished=False):

returnself._process.get_output(wait_until_finished)

defis_finish(self):

returnself._process.is_finished()

defwrite_result(self):

output_dir = os.getcwd() +"/logs/%s/%s"% (self.project_id,self.build_no)

output_dir = output_dir.replace("\\","/")

print("write ... result ...")

print(os.path.exists(output_dir +"/log.html"))

ifos.path.exists(output_dir +"/log.html"):

time.sleep(0.2)

task = AutoTask.query.filter(and_(AutoTask.project_id ==self.project_id,

AutoTask.build_no ==self.build_no)).first()

tree = ET.parse(output_dir +"/output.xml")

root = tree.getroot()

passed= root.find("./statistics/suite/stat").attrib["pass"]

fail = root.find("./statistics/suite/stat").attrib["fail"]

ifint(fail) !=:

task.status ='fail'

else:

task.status ='pass'

db.session.merge(task)

db.session.commit()

self._timer.canel()

说明:

在运行器中,关键的是一个Builder类,该类实现了从数据库读取数据,并序列号为RF语法的文件

Runner执行器根据类型(web、app、http)调用Builder加载不同的RobotFramework支持库和通用的库,实现对RobotFramework的完整的支持

大家主要看Runner类,这里不对代码一一解释,因为代码本身没什么难度,关键在于细节的看上几遍就懂了的

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180730A0CWN700?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券