我正在尝试运行以下测试:
tests.py
from rest_framework.test import APITestCase
from myapp.routing import application
from channels.testing import WebsocketCommunicator
from account.models import User
from rest_framework.authtoken.models import Token
class Tests(APITestCase):
def setUp(self):
self.user = User.objects.create(email='test@test.test',
password='a password')
self.token, created = Token.objects.get_or_create(user=self.user)
async def test_connect(self):
communicator = WebsocketCommunicator(application, f"/ws/user/{self.token}/")
connected, subprotocol = await communicator.connect()
self.assertTrue(connected)
await communicator.disconnect()
application
是channels.routing.ProtocolTypeRouter
的样板实例(如这里的https://channels.readthedocs.io/en/latest/topics/routing.html)。所有的东西在生产中都很好。测试退出时会出现以下错误:
Traceback (most recent call last):
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/testing.py", line 74, in receive_output
return await self.output_queue.get()
File "/usr/lib/python3.7/asyncio/queues.py", line 159, in get
await getter
concurrent.futures._base.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/sync.py", line 223, in __call__
return call_result.result()
File "/usr/lib/python3.7/concurrent/futures/_base.py", line 428, in result
return self.__get_result()
File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/sync.py", line 292, in main_wrap
result = await self.awaitable(*args, **kwargs)
File "/home/projects/myapp/myapp-api/app/tests.py", line 35, in test_connect
connected, subprotocol = await communicator.connect()
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/channels/testing/websocket.py", line 36, in connect
response = await self.receive_output(timeout)
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/testing.py", line 85, in receive_output
raise e
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/testing.py", line 74, in receive_output
return await self.output_queue.get()
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/timeout.py", line 66, in __aexit__
self._do_exit(exc_type)
File "/home/projects/myapp/myapp-env/lib/python3.7/site-packages/asgiref/timeout.py", line 103, in _do_exit
raise asyncio.TimeoutError
concurrent.futures._base.TimeoutError
----------------------------------------------------------------------
Ran 1 test in 1.026s
我尝试了python版本3.7.5、3.8.0和3.9.9,使用了通道3.0.4、django 3.2.10和信道-redis 3.3.1 ('BACKEND': 'channels_redis.core.RedisChannelLayer'
in settings.py)。错误仍然存在。我做错了什么?
发布于 2022-05-09 13:21:51
我也有同样的问题。APITestCase或TestCase不允许事务,您必须使用来自django测试的SimpleTestCase,并将数据库设置为all。正因为如此,我认为它会起作用。请注意,事务将保存在测试之间,而不是在测试后回滚。
from django.test import SimpleTestCase
from myapp.routing import application
from channels.testing import WebsocketCommunicator
from account.models import User
from rest_framework.authtoken.models import Token
class Tests(SimpleTestCase):
databases = '__all__'
def setUp(self):
self.user = User.objects.create(email='test@test.test', password='a password')
self.token, created = Token.objects.get_or_create(user=self.user)
async def test_connect(self):
communicator = WebsocketCommunicator(application, f"/ws/user/{self.token}/")
connected, subprotocol = await communicator.connect()
self.assertTrue(connected)
await communicator.disconnect()
以下是SimpleTestCase https://docs.djangoproject.com/en/4.0/topics/testing/tools/的信息
https://stackoverflow.com/questions/70511010
复制相似问题