首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何将thrift服务器或TCP服务器作为pytest fixture运行?

如何将thrift服务器或TCP服务器作为pytest fixture运行?
EN

Stack Overflow用户
提问于 2019-06-28 05:43:19
回答 1查看 536关注 0票数 0

平台最终版本,python 2.7.12- linux2 -0

我的集成测试ping外部服务器,但我想将其更改为使用测试夹具。

几天来,我一直在尝试在pytest中运行Thrift库TCP服务器作为会话范围的fixture。我尝试将服务器作为后台线程运行,这样我的测试就不会被阻止运行。

代码语言:javascript
复制
@pytest.fixture(scope="session", autouse=True)
def thrift_server():
    config = get_config()
    nprocesses = 4

    try:
        nprocesses = config['num_processes']
    except:
        pass

    args = (Handler, config['db_credentials'], config['server_port'])
    kwargs = ({'env': config['env'], 'processpool': True, 'num_processes': nprocesses,
        'handler_config': config, 'logfile': 'tests/test_server_log.txt'})

    tserver = ThriftServer()
    tserver.add_path(config['thrift_path'])
    tserver.set_service("search")

    try:
        thread = threading.Thread(target=tserver.runserver, args=args, kwargs=kwargs)
        thread.daemon = True
        thread.start()
        yield tserver
    except:
        print("BOOHOO")
    print("TEARDOWN: test server")

对于我的每一个测试,我从pytest库代码中得到一个错误:

代码语言:javascript
复制
ERROR at setup of Tests.test_item_is_found 
venv/lib/python2.7/site-packages/_pytest/runner.py:226: in from_call
    result = func()
venv/lib/python2.7/site-packages/_pytest/runner.py:198: in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
venv/lib/python2.7/site-packages/pluggy/hooks.py:289: in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
venv/lib/python2.7/site-packages/pluggy/manager.py:87: in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
venv/lib/python2.7/site-packages/pluggy/manager.py:81: in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
venv/lib/python2.7/site-packages/_pytest/runner.py:116: in pytest_runtest_setup
    item.session._setupstate.prepare(item)
venv/lib/python2.7/site-packages/_pytest/runner.py:362: in prepare
    col.setup()
venv/lib/python2.7/site-packages/_pytest/unittest.py:119: in setup
    self._request._fillfixtures()
venv/lib/python2.7/site-packages/_pytest/fixtures.py:469: in _fillfixtures
    item.funcargs[argname] = self.getfixturevalue(argname)
venv/lib/python2.7/site-packages/_pytest/fixtures.py:479: in getfixturevalue
    return self._get_active_fixturedef(argname).cached_result[0]
venv/lib/python2.7/site-packages/_pytest/fixtures.py:502: in _get_active_fixturedef
    self._compute_fixture_value(fixturedef)
venv/lib/python2.7/site-packages/_pytest/fixtures.py:587: in _compute_fixture_value
    fixturedef.execute(request=subrequest)
venv/lib/python2.7/site-packages/_pytest/fixtures.py:894: in execute
    return hook.pytest_fixture_setup(fixturedef=self, request=request)
venv/lib/python2.7/site-packages/pluggy/hooks.py:289: in __call__
    return self._hookexec(self, self.get_hookimpls(), kwargs)
venv/lib/python2.7/site-packages/pluggy/manager.py:87: in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
venv/lib/python2.7/site-packages/pluggy/manager.py:81: in <lambda>
    firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
venv/lib/python2.7/site-packages/_pytest/fixtures.py:936: in pytest_fixture_setup
    result = call_fixture_func(fixturefunc, request, kwargs)
venv/lib/python2.7/site-packages/_pytest/fixtures.py:791: in call_fixture_func
    res = next(it)
E   StopIteration

这是一个我没有找到很好的文档或支持的问题,所以我必须询问question..How,我是否可以将TCP服务器设置为我的测试可以使用的pytest fixture?

作为推论,这是一种我应该自己滚动的情况,还是有一个好的插件来支持这个最简单的用例?同样,我的搜索在这方面几乎找不到什么。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-06-28 08:27:56

我能够使用以下设置将Thrift服务器设置为pytest fixture:

代码语言:javascript
复制
import pytest
import threading
from pythrift import ThriftServer
from myHandler import Handler


@pytest.fixture(scope="session", autouse=True)
def thrift_server():
     config = get_config()
     nprocesses = 4

     try:
         nprocesses = config['num_processes']
     except:
         pass

     args = (Handler, config['db_credentials'], config['server_port'])
     kwargs = ({'env': config['env'], 'processpool': True, 'num_processes': nprocesses,
         'handler_config': config, 'logfile': 'tests/test_server_log.txt'})

     tserver = ThriftServer()
     tserver.add_path(config['thrift_path'])
     tserver.set_service("search")

     try:
         thread = threading.Thread(target=tserver.run_server, args=args, kwargs=kwargs)
         thread.daemon = True
         thread.start()
         yield tserver
     except Exception as e:
         print(e)
     print("TEARDOWN: test server")
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56798791

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档