前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >测试框架实践--多线程

测试框架实践--多线程

作者头像
iTesting
发布2019-10-29 16:50:24
5730
发布2019-10-29 16:50:24
举报
文章被收录于专栏:iTestingiTesting

iTesting,爱测试,爱分享

前面几次的分享,我从一个数据驱动的实现展开去,先后讨论了什么是数据驱动,如何实现数据驱动,数据驱动在自动化框架里如何应用。 Python数据驱动实践(一)–ddt实现数据驱动 Python数据驱动实践(二)–教你用Python实现数据驱动 Python数据驱动实践(三)–动态添加测试用例 Python测试框架实现(四)–动态挑选测试用例

为什么要讲这些呢?

因为这些是一个测试框架必不可少的部分,看看前面4次的分享,虽然可以成功的运行,但我们采用了顺序运行的方式,跟实际使用相差很远,现实中我们一般实现“并发”运行我们的测试用例。

说起并发,Python 有多线程(Multiple Threading)和多进程(Multiple Process)之分, 由于GIL即Global Interpreter Lock(全局解释器锁)的存在,python多线程并不能实现真正的并发(无论你的CPU是否多核),相反,多进程因为有独占的内存空间可以真正的实现并发。但在我们的自动化测试框架里,我们还是希望利用线程的公用内存空间特性,特别是同一个测试类我们希望有统一的setup, teardown特性。而这个多进程就不太适用,加上我们测试用例也不是CPU计算密集型,多线程方案的“并发”看起来是最佳选择。

但是是不是就一定要用threading.Thread呢?

我们先看看”传统“的多线程并发。 一个通用的多线程模板是这样的:

代码语言:javascript
复制
import threading
import queue
import time
exitFlag = 0
class MyThread(threading.Thread):
    def __init__(self, q):
        threading.Thread.__init__(self)
        self.q = q
    def run(self):
        do_something(self.q)
def do_something(q):
    while not exitFlag:
        queue_lock.acquire()
        if not work_queue.empty():
            data = q.get()
            queue_lock.release()
            print("Now thread %s is processing %s" % (threading.currentThread(), data))
            time.sleep(1)
        else:
            queue_lock.release()
if __name__ == "__main__":
    num_worker_threads = 3
    target_case = ['case1', 'case2', 'case3', 'case4', 'case5']
    queue_lock = threading.Lock()
    work_queue = queue.Queue()
    threads = []
    start_time = time.time()
    # Create new thread
    for case in range(num_worker_threads):
        thread = MyThread(work_queue)
        thread.start()
        threads.append(thread)
    # Fill queue
    queue_lock.acquire()
    for case in target_case:
        work_queue.put(case)
    queue_lock.release()
    # Wait queue empty
    while not work_queue.empty():
        pass
    # Notify thread quite
    exitFlag = 1
    # Wait all threads done
    for t in threads:
        t.join()
    end_time = time.time()
    print("All cases finished with running time --%s" % (end_time - start_time))

把我们前面实现的顺序运行改成并发, 只需要改成如下就可以实现多线程:

代码语言:javascript
复制
import threading
import queue
import time
from common.test_case_finder import DiscoverTestCases, unpack_test_cases_from_functions
exitFlag = 0
def f(case):
    name, func, value = case
    try:
        if value:
            func.__call__(name, *value)
        else:
            func.__call__(name)
    except:
        # traceback.print_exc()
        cases_run_fail.append(name)
    else:
        cases_run_success.append(name)
    return cases_run_fail, cases_run_success
class MyThread(threading.Thread):
    def __init__(self, q):
        threading.Thread.__init__(self)
        self.q = q
    def run(self):
        while not exitFlag:
            queue_lock.acquire()
            if not work_queue.empty():
                data = self.q.get()
                f(data)
                time.sleep(5)
                queue_lock.release()
                print("Now thread %s is processing %s" % (threading.currentThread(), data))
            else:
                queue_lock.release()
if __name__ == "__main__":
    num_worker_threads = 5
    mypath = r"D:\ktest\tests\test_page1"
    cases_to_run = []
    cases_run_success = []
    cases_run_fail = []
    discover_cases = DiscoverTestCases(mypath)
    mds = discover_cases.get_modules_spec()
    raw_test_cases = discover_cases.find_classes_in_module(mds)
    cases_to_run = unpack_test_cases_from_functions(raw_test_cases)
    queue_lock = threading.Lock()
    work_queue = queue.Queue()
    threads = []
    start_time = time.time()
    # Create new thread
    for case in range(num_worker_threads):
        thread = MyThread(work_queue)
        thread.start()
        threads.append(thread)
    # Fill queue
    queue_lock.acquire()
    for case in cases_to_run:
        work_queue.put(case)
    queue_lock.release()
    # Wait queue empty
    while not work_queue.empty():
        pass
    # Notify thread quite
    exitFlag = 1
    # Wait all threads done
    for t in threads:
        t.join()
    end_time = time.time()
    print("All cases finished with running time --{:.2f} seconds" .format(end_time - start_time))
    print('Below cases are passed:\n %s' %cases_run_success)
    print('Below cases are failed:\n %s' % cases_run_fail)

可以看到,你自己要做很多事情来保证多线程的正常运行,你要维护queue,要注意资源锁,你要使用join来等待子线程都完成。 多线程难道都这么麻烦吗?

“人生苦短,我用python”, 不知道你们听过没 :)

multiprocessing.dummy 来助你一臂之力!

dummy是multiprocessing的一个克隆体,唯一的区别在于,dummy使用线程而multiprocessing使用进程。

代码语言:javascript
复制
from multiprocessing.dummy import Pool as ThreadPool
import time
from common.test_case_finder import DiscoverTestCases, unpack_test_cases_from_functions
def f(case):
    name, func, value = case
    try:
        if value:
            func.__call__(name, *value)
        else:
            func.__call__(name)
    except:
        # traceback.print_exc()
        cases_run_fail.append(name)
    else:
        cases_run_success.append(name)
    return cases_run_fail, cases_run_success
if __name__ == "__main__":
    number_of_threads = 5
    mypath = r"D:\ktest\tests\test_page1"
    cases_to_run = []
    cases_run_success = []
    cases_run_fail = []
    discover_cases = DiscoverTestCases(mypath)
    mds = discover_cases.get_modules_spec()
    raw_test_cases = discover_cases.find_classes_in_module(mds)
    cases_to_run = unpack_test_cases_from_functions(raw_test_cases)
    start_time = time.time()
    with ThreadPool(number_of_threads) as p:
        p.map(f, cases_to_run)
    p.close()
    p.join()
    end_time = time.time()
    print('Below cases are passed:\n %s' %cases_run_success)
    print('Below cases are failed:\n %s' % cases_run_fail)
    print("All cases finished with running time --{:.2f} seconds" .format(end_time - start_time))

可以看到,传统的threading.Thread除开定义Thread类外,还要用了这么多篇幅来做多线程,multiprocessing.dummy只用一个Pool就全搞定了。

好了,今天我们讲到这里,并发也实现了,动态挑选也实现了,后面我们就要实现test fixture, 敬请期待!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-10-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 iTesting 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档