我已经创建了一个自定义令牌身份验证中间件。
from rest_framework.authtoken.models import Token
from django.contrib.auth.models import AnonymousUser
from django.db import close_old_connections
from asgiref.sync import sync_to_async
class TokenAuthMiddleware:
"""
Token authorization middleware for Django Channels 2
"""
def __init__(self, inner):
self.inner = inner
def __call__(self, scope):
# Close old database connections to prevent usage of timed out connections
sync_to_async(close_old_connections)()
headers = dict(scope['headers'])
try:
token_name, token_key = headers[b'sec-websocket-protocol'].decode().split(', ')
if token_name == 'Token':
token = sync_to_async(Token.objects.get, thread_sensitive=True)(key=token_name)
scope['user'] = token.user
else:
scope['user'] = AnonymousUser()
except Token.DoesNotExist:
scope['user'] = AnonymousUser()
return self.inner(scope)
当我运行它时,当我运行scope['user'] = token.user
时会发生异常
[Failure instance: Traceback: <class 'AttributeError'>: 'coroutine' object has no attribute 'user'
我试着像这样等待令牌查询:
token = await sync_to_async(Token.objects.get, thread_sensitive=True)(key=token_name)
我在__call__
函数前面添加了异步,但是在运行__call__
函数中的任何代码之前都会引发以下错误:
[Failure instance: Traceback: <class 'TypeError'>: 'coroutine' object is not callable
我使用Django v3.0.6和Django通道v2.4.0
发布于 2021-02-09 22:59:31
如果这对任何人有帮助的话,我和其他人也有同样的问题。我更新了django通道,然后将路由更改为
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer),
]
至
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]
您需要django通道3.0+来做到这一点(https://channels.readthedocs.io/en/stable/releases/3.0.0.html)。然后您可以按照https://channels.readthedocs.io/en/stable/topics/authentication.html#django-authentication设置您的自定义中间件。
https://stackoverflow.com/questions/64512520
复制相似问题