首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Sanic Webserver: Websocket处理程序返回时关闭套接字;循环中断其他请求处理程序

Sanic Webserver: Websocket处理程序返回时关闭套接字;循环中断其他请求处理程序
EN

Stack Overflow用户
提问于 2019-06-18 19:34:11
回答 1查看 1K关注 0票数 2

场景:我有一个sanic网站服务器服务一个简单的网站。该网站基本上是一个支持vue模板的html大型数据表。由于表条目每隔几分钟更改一次,因此数据将通过websocket进行更改。同时约有2000名用户。我已经尝试实现了一个酒吧/子架构。

问题:我的websockets一回到sanic处理程序就会关闭。我可以在里面有一个循环让处理程序保持打开。但让2000名操纵者打开听起来是个坏主意.此外,开放处理程序的行为也很奇怪。一个线程或一个小线程池应该完成这项工作。也许我弄错了卫生文件,需要设计建议。

我尝试过的事情:-将超时设置提高到足够高-尝试sanic中的其他各种websocket设置-让我的客户端js返回false onmessage (Javascript打开后立即关闭) -传递后将ws引用设置为null

Sanic网络服务器索引:

代码语言:javascript
复制
@app.route('/')
async def serve_index(request):
    return await file(os.path.join(os.path.dirname(__file__), 'index.html'))

Index.html的JS:

代码语言:javascript
复制
var app = new Vue({
    el: '#app',
        data() {
            manydata0: 0,
            manydata1: 0,
            ws: null,
        }
    },
    methods: {
        update: function (json_data) {
            json = JSON.parse(json_data);
            this.manydata0 = json['data0'];
            this.manydata1 = json['data1'];
        }
    },
    created: function () {
        this.ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/reload');
        messages = document.createElement('ul');
        this.ws.onmessage = function (event) {
            console.log("new data")
            app.update(event.data);
        return false;
    };
    document.body.appendChild(messages);
    this.ws.onclose = function (event) {
        console.log("closed :(")
    };

Sanic Webserver的Websocket Handler (第1版,Socket立即死亡):

代码语言:javascript
复制
@app.websocket('/reload')
async def feed(request, ws):
    #time.sleep(42) # this causes the websocket to be created and closed on client side 42 seconds after my request
    await ws.send(Path(json).read_text()) # serve initial data
    connected_clients.append(ws) # subscribe to websocket list. another thread will read list entries and serve them updates

Sanic req服务器的Websocket (第二个版本,Handler阻止其他req处理程序)

代码语言:javascript
复制
@app.websocket('/reload')
async def feed(request, ws):
    mod_time = 0
    while True:
        try:
            stat = os.stat(json)
            if mod_time != stat.st_mtime:
                await ws.send(Path(json).read_text())
        except Exception as e:
            print("Exception while checking file: ", e)
    # this stops the server to handle other @app.routes like css, fonts, favicon

Sanic Webservers的Websocket (第3版,不必要的recv())

代码语言:javascript
复制
@app.websocket('/reload')
async def feed(request, ws):
    mod_time = 0
    while True:
        try:
            stat = os.stat(json)
            if mod_time != stat.st_mtime:
                await ws.send(Path(json).read_text())
                await recv() # if the client sends from time to time all is fine
        except Exception as e:
            print("Exception while checking file: ", e)

最后两个代码片段没有太大的不同。我添加了一个ws.recv(),并从客户端发送一些合适的东西(例如,间隔),然后一切正常工作。然后发送css、字体和图标。但这不是故意的,对吗?这不应该是很好的,对吧?

总之,这对我来说没有多大意义。我是什么夫人?

EN

Stack Overflow用户

回答已采纳

发布于 2019-06-19 08:10:01

这里有一个卫生中心的人。

首先,作为一个公共类型体系结构的例子,这是一个要点 I准备了。我觉得这可能会有帮助。

我的基本想法是创建一个Feed对象,它在自己的任务中循环,寻找一个事件。在我的例子中,它是从公共部门收到信息。在您的示例中,它应该检查JSON文档上的时间。

然后,当该Feed.receiver触发了一个事件时,它就会向所有侦听的客户端发出请求。

websocket处理程序内部,您希望保持打开状态。如果没有,则连接将关闭。如果您不关心从客户端接收信息,则不需要使用await recv()

因此,在您的例子中,使用超级简单逻辑,我将执行如下操作。

这是未经测试的代码,可能需要一些修改的

代码语言:javascript
复制
import os
import random
import string
from functools import partial
from pathlib import Path

from sanic import Sanic

import asyncio
import websockets
from dataclasses import dataclass, field
from typing import Optional, Set

app = Sanic(__name__)

FILE = "/tmp/foobar"
TIMEOUT = 10
INTERVAL = 20


def generate_code(length=12, include_punctuation=False):
    characters = string.ascii_letters + string.digits
    if include_punctuation:
        characters += string.punctuation
    return "".join(random.choice(characters) for x in range(length))


@dataclass
class Client:
    interface: websockets.server.WebSocketServerProtocol = field(repr=False)
    sid: str = field(default_factory=partial(generate_code, 36))

    def __hash__(self):
        return hash(str(self))

    async def keep_alive(self) -> None:
        while True:
            try:
                try:
                    pong_waiter = await self.interface.ping()
                    await asyncio.wait_for(pong_waiter, timeout=TIMEOUT)
                except asyncio.TimeoutError:
                    print("NO PONG!!")
                    await self.feed.unregister(self)
                else:
                    print(f"ping: {self.sid} on <{self.feed.name}>")
                await asyncio.sleep(INTERVAL)
            except websockets.exceptions.ConnectionClosed:
                print(f"broken connection: {self.sid} on <{self.feed.name}>")
                await self.feed.unregister(self)
                break

    async def shutdown(self) -> None:
        self.interface.close()

    async def run(self) -> None:
        try:
            self.feed.app.add_task(self.keep_alive())
            while True:
                pass
        except websockets.exceptions.ConnectionClosed:
            print("connection closed")
        finally:
            await self.feed.unregister(self)


class Feed:
    app: Sanic
    clients: Set[Client]
    cached = None

    def __init__(self, app: Sanic):
        self.clients = set()
        self.app = app

    @classmethod
    async def get(cls, app: Sanic):
        is_existing = False

        if cls.cached:
            is_existing = True
            feed = cls.cached
        else:
            feed = cls(app)
            cls.cached = feed

        if not is_existing:
            feed.app.add_task(feed.receiver())

        return feed, is_existing

    async def receiver(self) -> None:
        print("Feed receiver started")
        mod_time = 0
        while True:
            try:
                stat = os.stat(FILE)
                print(f"times: {mod_time} | {stat.st_mtime}")
                if mod_time != stat.st_mtime:
                    content = self.get_file_contents()
                    for client in self.clients:
                        try:
                            print(f"\tSending to {client.sid}")
                            await client.interface.send(content)
                        except websockets.exceptions.ConnectionClosed:
                            print(f"ConnectionClosed. Client {client.sid}")
            except Exception as e:
                print("Exception while checking file: ", e)

    async def register(
        self, websocket: websockets.server.WebSocketServerProtocol
    ) -> Optional[Client]:
        client = Client(interface=websocket)
        print(f">>> register {client}")

        client.feed = self
        self.clients.add(client)

        # Send initial content
        content = self.get_file_contents()
        client.interface.send(content)

        print(f"\nAll clients\n{self.clients}\n\n")

        return client

    async def unregister(self, client: Client) -> None:
        print(f">>> unregister {client} on <{self.name}>")
        if client in self.clients:
            await client.shutdown()
            self.clients.remove(client)
            print(f"\nAll remaining clients\n{self.clients}\n\n")

    def get_file_contents(self):
        return Path(FILE).read_text()


@app.websocket("/reload")
async def feed(request, ws):
    feed, is_existing = await Feed.get(app)

    client = await feed.register(ws)
    await client.run()


if __name__ == "__main__":
    app.run(debug=True, port=7777)
票数 2
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56655894

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档