前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AutoLine源码之RobotFramework运行器

AutoLine源码之RobotFramework运行器

作者头像
苦叶子
发布2018-08-06 14:42:39
5230
发布2018-08-06 14:42:39
举报
文章被收录于专栏:开源优测开源优测开源优测
什么是AutoLine开源平台

AutoLine开源平台是一个开源自动化测试解决方案,基于RobotFramework进行二次开发,支持RobotFramework几乎所有的库。

源码地址

github地址: https://github.com/small99/AutoLine 码 云 地 址:https://gitee.com/lym51/AutoLine

运行器源码路径及源码结构

在AutoLine中我们自定义实现了RobotFramework的运行器,其路径如下图所示:

源码结构如下图所示:

说明:

  1. 一些已经实现的运行器,用于调试测试用
  2. 运行器分为自动化运行器、调试运行器、手工运行器三种模式

下面我们对源码进行注释

__author__ = "苦叶子"

"""

公众号: 开源优测

Email: lymking@foxmail.com

"""

import os
import platform
import codecs
import time
import json
import subprocess
from datetime import datetime
from threading import Thread, Timer
import xml.etree.ElementTree as ET
from flask import current_app
from flask_login import current_user
from sqlalchemy import and_
from ..models import AutoTask, AutoProject, User
from .. import db
from ..auto.builder import Builder
from .process import Process

# 同步运行器,即阻塞模式,一次只能运行一个RF进程
def robot_run(category, id):
    app = current_app._get_current_object()
    if len(app.config["RESULTS"]) > int(app.config['AUTO_PROCESS_COUNT']):
        return json.dumps({"status": "busying", "msg": "任务池已满!!!"})

    builder = Builder(category, id)
    builder.build()
    app.config["RESULTS"].append(app.config["POOL"].apply_async(builder.test_run, (app, current_user.get_id(),)))

    # app.config["POOL"].join()

    return json.dumps({"status": "success", "msg": "任务启动成功"})

# 异步运行器,采用多线程方式,可以启动多个RF进程
def robot_async_run(category, id):
    app = current_app._get_current_object()

    builder = Builder(category, id)
    builder.build()

    thr = Thread(target=builder.test_run, args=[app, current_user.get_id()])
    #app.config["RESULTS"].append(thr)

    thr.start()

    return json.dumps({"status": "success", "msg": "任务启动成功"})

# 用于检查RF运行进程的状态
def check_process_status(app):
    print("timer to check ....%d" % len(app.config["RUNNERS"]))
    with app.app_context():

        try:
            for runner in app.config["RUNNERS"]:
                if runner.is_finish():
                    runner.write_result()
                    app.config["RUNNERS"].remove(runner)
        except Exception as e:
            print(e)

# debug模式运行器
def debug_run(id):
    builder = Builder(id)
    builder.build()
    runner = Runner(builder.id, builder.build_no)

    runner.debug()

    return (builder.id, builder.build_no)

# RF运行器
def run_process(category, id):
    builder = Builder(id)
    builder.build()

    if builder.has_test_case():

        runner = Runner(builder.id, builder.build_no)
        if category == "auto":
            runner.auto_run()
        else:
            runner.run()

        app = current_app._get_current_object()

        app.config["TRIGGER"].update_job(id)

        app.config["RUNNERS"].append({
            "project_id": builder.id,
            "task_id": builder.build_no,
            "runner": runner
        })

        return json.dumps({"status": "success", "msg": "任务启动成功"})
    else:
        return json.dumps({"status": "fail", "msg": "项目中没有创建关键字步骤,任务启动失败,请新增关键字步骤!!!"})

# 执行器类
class Runner:
    def __init__(self, project_id, build_no):
        self.project_id = project_id
        self.build_no = build_no
        self._process = None
        self._timer = None
        self._out_fd = None

    # 用于调度器自动执行
    def auto_run(self):
        try:
            user_id = User.query.filter_by(username="AutoExecutor").first().id
            name = AutoProject.query.filter_by(id=self.project_id).first().name
            task = AutoTask(project_id=self.project_id,
                            build_no=self.build_no,
                            status="running",
                            create_author_id=user_id,
                            create_timestamp=datetime.now())
            db.session.add(task)
            db.session.commit()

            output_dir = os.getcwd() + "/logs/%s/%s" % (self.project_id, self.build_no)
            output_dir = output_dir.replace("\\", "/")

            # -x result/output.xml -l result/log.html -r result/report.html
            shell = False
            if "Windows" in platform.platform():
                self._out_fd = codecs.open(output_dir + "/logs.log", "a+", "cp936")
                command = "pybot -d %s -L DEBUG -N %s %s/testcase.robot" % (output_dir, name, output_dir)
                shell = True
            else:
                self._out_fd = codecs.open(output_dir + "/logs.log", "a+", "utf-8")
                command = ["pybot", "-d", "%s" % output_dir, "-L", "DEBUG", "-N", "%s" % name, "%s/testcase.robot" % output_dir]
                #print(command)

            self._process = subprocess.Popen(command, shell=shell, stdout=self._out_fd, stderr=subprocess.STDOUT)
            #self._process = Process(command)

            #self._process.start()

        except Exception as e:
            print(str(e))
            pass

        return {"status": "success",
                "msg": "任务启动成功",
                "project_id": self.project_id,
                "build_no": self.build_no}

    # 用于手工在页面触发执行
    def run(self):
        #
        try:
            name = AutoProject.query.filter_by(id=self.project_id).first().name
            task = AutoTask(project_id=self.project_id,
                            build_no=self.build_no,
                            status="running",
                            create_author_id=current_user.get_id(),
                            create_timestamp=datetime.now())
            db.session.add(task)
            db.session.commit()

            output_dir = os.getcwd() + "/logs/%s/%s" % (self.project_id, self.build_no)
            output_dir = output_dir.replace("\\", "/")

            shell = False
            if "Windows" in platform.platform():
                self._out_fd = codecs.open(output_dir + "/logs.log", "a+", "cp936")
                command = "pybot -d %s -L DEBUG -N %s %s/testcase.robot" % (output_dir, name, output_dir)
                shell = True
            else:
                self._out_fd = codecs.open(output_dir + "/logs.log", "a+", "utf-8")
                command = ["pybot", "-d", "%s" % output_dir, "-L", "DEBUG", "-N", "%s" % name,
                           "%s/testcase.robot" % output_dir]
                # print(command)

            self._process = subprocess.Popen(command, shell=shell, stdout=self._out_fd, stderr=subprocess.STDOUT)

        except Exception as e:
            print(str(e))
            pass

        return {"status": "success",
                "msg": "任务启动成功",
                "project_id": self.project_id,
                "build_no": self.build_no}

    # 调试模式,用于检查是否符合RF语法
    def debug(self):
        try:
            output_dir = os.getcwd() + "/logs/%s/%s" % (self.project_id, self.build_no)
            output_dir = output_dir.replace("\\", "/")
            # -x result/output.xml -l result/log.html -r result/report.html
            command = ["pybot", "-d", "%s" % output_dir, "--dryrun", "-N", "调试输出", "%s/testcase.robot" % output_dir]
            self._out_fd = open(output_dir + "/debug.log", "a+")
            self._process = subprocess.Popen(command, shell=False, stdout=self._out_fd, stderr=subprocess.STDOUT)
            while True:
                if self._process.poll() == 0:  # 判断子进程是否结束
                    break
                else:
                    time.sleep(0.2)

        except Exception as e:
            print(str(e))
            pass

        return {"status": "success",
                "msg": "任务启动成功",
                "project_id": self.project_id,
                "build_no": self.build_no}
    # 停止任务 
    def stop(self):
        status = "success"
        msg = "任务终止"
        try:
            self._process.stop()
            msg += "成功"
        except Exception as e:
            status = "fail"
            msg = msg + "异常" + str(e)

        return {"status": status,
                "msg": msg,
                "project_id": self.project_id,
                "build_no": self.build_no}

    def get_output(self, wait_until_finished=False):
        return self._process.get_output(wait_until_finished)

    def is_finish(self):
        return self._process.is_finished()

    def write_result(self):
        output_dir = os.getcwd() + "/logs/%s/%s" % (self.project_id, self.build_no)
        output_dir = output_dir.replace("\\", "/")
        print("write ... result ...")
        print(os.path.exists(output_dir + "/log.html"))
        if os.path.exists(output_dir + "/log.html"):
            time.sleep(0.2)
            task = AutoTask.query.filter(and_(AutoTask.project_id == self.project_id,
                                              AutoTask.build_no == self.build_no)).first()
            tree = ET.parse(output_dir + "/output.xml")
            root = tree.getroot()
            passed = root.find("./statistics/suite/stat").attrib["pass"]
            fail = root.find("./statistics/suite/stat").attrib["fail"]
            if int(fail) != 0:
                task.status = 'fail'
            else:
                task.status = 'pass'
            db.session.merge(task)
            db.session.commit()

            self._timer.canel()

说明:

  1. 在运行器中,关键的是一个Builder类,该类实现了从数据库读取数据,并序列号为RF语法的文件
  2. Runner执行器根据类型(web、app、http)调用Builder加载不同的RobotFramework支持库和通用的库,实现对RobotFramework的完整的支持
  3. 大家主要看Runner类,这里不对代码一一解释,因为代码本身没什么难度,关键在于细节的看上几遍就懂了的

AutoLine开源平台简明教程

AutoLine开源平台安装部署教程

AutoLine开源平台常见问题解答

AutoLine开源平台源码组织结构

AutoLine源码分析之开始篇

AutoLine源码分析之入口源码

AutoLine源码分析之配置管理

AutoLine源码分析之数据库模型

AutoLine源码分析之Flask初始化模块

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-07-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开源优测 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档