首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >django通道websocket中间件如何接收消息

django通道websocket中间件如何接收消息
EN

Stack Overflow用户
提问于 2020-01-21 05:43:38
回答 1查看 673关注 0票数 0

中间件如何读取所有websocket消息?

据我所知,django通道中间件类似于https://github.com/django/channels/blob/2a98606c1e0600cbae71ae1f02f31aae5d01f82d/channels/middleware.py

代码语言:javascript
运行
复制
    async def coroutine_call(self, inner_instance, scope, receive, send):
        """
        ASGI coroutine; where we can resolve items in the scope
        (but you can't modify it at the top level here!)
        """
        await inner_instance(receive, send)

我知道,如果我调用await receive()而不是await inner_instance(receive, send),我将得到一条websocket消息,但在这种情况下,websocket处理程序将不再工作。

coroutine_call如何能够接收websocket消息,同时也可以将其转发到下一个websocket中间件或处理程序?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-01-23 21:42:41

为了让中间件拦截消息,它需要拦截receivesend队列。

下面是一个执行此https://github.com/hishnash/channelsmultiplexer/blob/master/channelsmultiplexer/demultiplexer.py的复杂中间件的示例。

一个更简单的版本是:

代码语言:javascript
运行
复制
class _InterceptionMiddleware:
    def __init__(self, application_cls, scope):
        self.application = application_cls(scope)


    async def __call__(self, receive, send):
        self.downstream_send = send

        # create a Queue for the upstream consumer to read messages from
        self.upstream_receive_queue = asyncio.Queue()

        # create a Queue for the upstream consumer to write messages to
        self.upstream_send_queue = asyncio.Queue()

        # pipe messages being sent to the upstream consumer to your interceptor method
        receiver = await_many_dispatch([receive], self.my_receive_interceptor_method)

        # pipe messages being sent buy the upstream consumer to your interceptor method
        sender = await_many_dispatch(
            [self.upstream_send_queue.get],
            self.my_send_interceptor_method
        )

        # set up an asyncio task to handle these pipes
        receiver_task = asyncio.create_task(receiver)
        sender_task = asyncio.create_task(sender)

        # create an asyncio task for the upstream consumer
        upstream_task = asyncio.create_task(
            # pass the `get` and `put` methods of your upstream send and receive queues
            self.application(self.upstream_receive_queue.get, self.upstream_send_queue.put)
        )

        # await it all
        done, pending = await asyncio.wait(
            [upstream_task, receiver_task, sender_task],
            # if any of them fail stop
            return_when=asyncio.FIRST_COMPLETED
        )
        for task in [upstream_task, receiver_task, sender_task]:
            if not task.dont():
                # we need to cancel this task.
                task.cancel()
                try:
                    await task
                except CancelledError:
                    # we expect this error
                    pass


    async def my_receive_interceptor_method(self, msg):
        # your interception code
        await self.upstream_receive_queue.put(msg)


    async def my_send_interceptor_method(self, msg):
        # your interception code
        await self.downstream_send(msg)


def InterceptionMiddleware(application_cls):
    return functools.partial(_InterceptionMiddleware, application_cls)
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59834822

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档