前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Appium系列(十八)多设备并行执行测试用例

Appium系列(十八)多设备并行执行测试用例

作者头像
雷子
发布2021-03-30 16:36:50
1.6K0
发布2021-03-30 16:36:50
举报

前言

在上一篇文章--Appium系列(十七)将Appium服务端口号通过参数传递给测试用例,我们处理了通过参数传递给appium服务,那么这节课呢,我们要改造,改造成多设备并行执行测试用例。

正文

之前有两篇文章,Appium自动化(九)如何处理多设备的启动参数

Appium自动化(十)如何控制多设备并行执行测试用例讲解了多设备执行的一些要领,那么今天呢,我们来看下,如何把现有的改造成多设备并行的。

首先我来说下我的思路:

1.获取多个设备,根据每个设备不同,获取产生不一样的端口,最后产生的数量和设备数一致。

2.启动多个的appium 的服务

3.启动进程池,端口和app相互绑定。进程和用例绑定

4.启动进程进行测试。

那么我们看看具体是怎么实现的

我们先把之前的启动appium的server给写到common目录中的appiumserveruntil方法中。

代码语言:javascript
复制

import  platform
from multiprocessing import Process,Pool
import time,urllib.request
import threading,os
class RunServer(threading.Thread):#启动服务的线程
    def __init__(self, cmd):
        threading.Thread.__init__(self)
        self.cmd = cmd
    def run(self):
        os.system(self.cmd)
def  start(port_list:list):
    def __run(url):
        time.sleep(10)
        response = urllib.request.urlopen(url, timeout=5)
        if str(response.getcode()).startswith("2"):
            return True
    for i in range(0, len(port_list)):
        cmd = "appium  -p %s  " % (
            port_list[i])
        if platform.system() == "Windows":  # windows下启动server
            t1 =RunServer(cmd)
            p = Process(target=t1.start())
            p.start()
            while True:
                time.sleep(4)
                if __run("http://127.0.0.1:" + port_list[i]+ "/wd/hub/status"):
                    break
        else:
          appium = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1,
                                      close_fds=True)

           while True:
                appium_line = appium.stdout.readline().strip().decode()
                time.sleep(2)
                if 'listener started' in appium_line or 'Error: listen' in appium_line:
                    print("----server启动成功---")
                    break
def stop_server(portlist:list):
    sysstr = platform.system()
    if sysstr == 'Windows':
        os.popen("taskkill /f /im node.exe")
    else:
        for port in portlist:
            cmd = "lsof -i :{0}".format(port)
            plist = os.popen(cmd).readlines()
            plisttmp = plist[1].split("    ")
            plists = plisttmp[1].split(" ")
            os.popen("kill -9 {0}".format(plists[0])):

我们有了产生appium服务的方法,我们需要有一个设备管理的方法,我们去创建一个产生设备信息的进程池放在run.py

代码语言:javascript
复制
from  common.parame import Parmer

from common.adbtool import *
from common.packageparse import get_apk_lautc,get_apkname
from multiprocessing import Pool
import unittest,random
from common.appiumserveruntil import start,stop_server
from case.logintestcase import testCase
def runnerPool(getDevices):
    '''
       根据链接的设备生成不同的dict
       然后放到设备的list里面
       设备list的长度产生进程池大小
    '''

    devices_Pool = []
    for i in range(0, len(getDevices)):
        _pool = []
        _initApp = {}
        _initApp["deviceName"] =getDevices[i]['devices']
        _initApp["platformVersion"] = getPlatForm(getDevices[i]['devices'])
        _initApp["platformName"] = "android"
        _initApp["port"] = getDevices[i]["port"]
        _initApp["appPackage"] = get_apkname("apk/iBiliPlayer-bili.apk")
        _initApp["appActivity"] = get_apk_lautc("apk/iBiliPlayer-bili.apk")
        _pool.append(_initApp)
        devices_Pool.append(_initApp)
    pool = Pool(len(devices_Pool))
    pool.map(runnerCaseApp, devices_Pool)
    pool.close()
    pool.join()
    
    

那么runnerCaseApp如何管理用例呢,其实很简单

代码语言:javascript
复制
def runnerCaseApp(devices):
    '''利用unittest的testsuite来组织测试用例'''
    test_suit = unittest.TestSuite()
    test_suit.addTest(Parmer.parametrize(testcase_klass=testCase, parame=devices))  # 扩展的其他的测试用例均这样添加
    unittest.TextTestRunner(verbosity=2).run(test_suit)

接着我们写一个run方法

代码语言:javascript
复制
def run():
    devices = get_devices()
    port_list = []
    if len(devices) > 0:
        for dev in devices:
            app = {}
            app["devices"] = dev
            port = str(random.randint(4593, 4598))
            app["port"] = port
            port_list.append(port)
            l_devices.append(app)
        start(port_list)
        runnerPool(l_devices)
        try:
            stop_server(port_list)
        except Exception as e:
            print("关闭服务失败,原因:%s" % e)

最后我们执行这个run方法

代码语言:javascript
复制
if __name__ =="__main__":
    run()

然后我们就可以执行run方法。下面就开始执行来。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-03-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 雷子说测试开发 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档