前言
在上一篇文章--Appium系列(十七)将Appium服务端口号通过参数传递给测试用例,我们处理了通过参数传递给appium服务,那么这节课呢,我们要改造,改造成多设备并行执行测试用例。
正文
之前有两篇文章,Appium自动化(九)如何处理多设备的启动参数和
Appium自动化(十)如何控制多设备并行执行测试用例讲解了多设备执行的一些要领,那么今天呢,我们来看下,如何把现有的改造成多设备并行的。
首先我来说下我的思路:
1.获取多个设备,根据每个设备不同,获取产生不一样的端口,最后产生的数量和设备数一致。
2.启动多个的appium 的服务
3.启动进程池,端口和app相互绑定。进程和用例绑定
4.启动进程进行测试。
那么我们看看具体是怎么实现的
我们先把之前的启动appium的server给写到common目录中的appiumserveruntil方法中。
import platform
from multiprocessing import Process,Pool
import time,urllib.request
import threading,os
class RunServer(threading.Thread):#启动服务的线程
def __init__(self, cmd):
threading.Thread.__init__(self)
self.cmd = cmd
def run(self):
os.system(self.cmd)
def start(port_list:list):
def __run(url):
time.sleep(10)
response = urllib.request.urlopen(url, timeout=5)
if str(response.getcode()).startswith("2"):
return True
for i in range(0, len(port_list)):
cmd = "appium -p %s " % (
port_list[i])
if platform.system() == "Windows": # windows下启动server
t1 =RunServer(cmd)
p = Process(target=t1.start())
p.start()
while True:
time.sleep(4)
if __run("http://127.0.0.1:" + port_list[i]+ "/wd/hub/status"):
break
else:
appium = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1,
close_fds=True)
while True:
appium_line = appium.stdout.readline().strip().decode()
time.sleep(2)
if 'listener started' in appium_line or 'Error: listen' in appium_line:
print("----server启动成功---")
break
def stop_server(portlist:list):
sysstr = platform.system()
if sysstr == 'Windows':
os.popen("taskkill /f /im node.exe")
else:
for port in portlist:
cmd = "lsof -i :{0}".format(port)
plist = os.popen(cmd).readlines()
plisttmp = plist[1].split(" ")
plists = plisttmp[1].split(" ")
os.popen("kill -9 {0}".format(plists[0])):
我们有了产生appium服务的方法,我们需要有一个设备管理的方法,我们去创建一个产生设备信息的进程池放在run.py
from common.parame import Parmer
from common.adbtool import *
from common.packageparse import get_apk_lautc,get_apkname
from multiprocessing import Pool
import unittest,random
from common.appiumserveruntil import start,stop_server
from case.logintestcase import testCase
def runnerPool(getDevices):
'''
根据链接的设备生成不同的dict
然后放到设备的list里面
设备list的长度产生进程池大小
'''
devices_Pool = []
for i in range(0, len(getDevices)):
_pool = []
_initApp = {}
_initApp["deviceName"] =getDevices[i]['devices']
_initApp["platformVersion"] = getPlatForm(getDevices[i]['devices'])
_initApp["platformName"] = "android"
_initApp["port"] = getDevices[i]["port"]
_initApp["appPackage"] = get_apkname("apk/iBiliPlayer-bili.apk")
_initApp["appActivity"] = get_apk_lautc("apk/iBiliPlayer-bili.apk")
_pool.append(_initApp)
devices_Pool.append(_initApp)
pool = Pool(len(devices_Pool))
pool.map(runnerCaseApp, devices_Pool)
pool.close()
pool.join()
那么runnerCaseApp如何管理用例呢,其实很简单
def runnerCaseApp(devices):
'''利用unittest的testsuite来组织测试用例'''
test_suit = unittest.TestSuite()
test_suit.addTest(Parmer.parametrize(testcase_klass=testCase, parame=devices)) # 扩展的其他的测试用例均这样添加
unittest.TextTestRunner(verbosity=2).run(test_suit)
接着我们写一个run方法
def run():
devices = get_devices()
port_list = []
if len(devices) > 0:
for dev in devices:
app = {}
app["devices"] = dev
port = str(random.randint(4593, 4598))
app["port"] = port
port_list.append(port)
l_devices.append(app)
start(port_list)
runnerPool(l_devices)
try:
stop_server(port_list)
except Exception as e:
print("关闭服务失败,原因:%s" % e)
最后我们执行这个run方法
if __name__ =="__main__":
run()
然后我们就可以执行run方法。下面就开始执行来。