场景:我有一个sanic网站服务器服务一个简单的网站。该网站基本上是一个支持vue模板的html大型数据表。由于表条目每隔几分钟更改一次,因此数据将通过websocket进行更改。同时约有2000名用户。我已经尝试实现了一个酒吧/子架构。
问题:我的websockets一回到sanic处理程序就会关闭。我可以在里面有一个循环让处理程序保持打开。但让2000名操纵者打开听起来是个坏主意.此外,开放处理程序的行为也很奇怪。一个线程或一个小线程池应该完成这项工作。也许我弄错了卫生文件,需要设计建议。
我尝试过的事情:-将超时设置提高到足够高-尝试sanic中的其他各种websocket设置-让我的客户端js返回false onmessage (Javascript打开后立即关闭) -传递后将ws引用设置为null
Sanic网络服务器索引:
@app.route('/')
async def serve_index(request):
return await file(os.path.join(os.path.dirname(__file__), 'index.html'))Index.html的JS:
var app = new Vue({
el: '#app',
data() {
manydata0: 0,
manydata1: 0,
ws: null,
}
},
methods: {
update: function (json_data) {
json = JSON.parse(json_data);
this.manydata0 = json['data0'];
this.manydata1 = json['data1'];
}
},
created: function () {
this.ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/reload');
messages = document.createElement('ul');
this.ws.onmessage = function (event) {
console.log("new data")
app.update(event.data);
return false;
};
document.body.appendChild(messages);
this.ws.onclose = function (event) {
console.log("closed :(")
};Sanic Webserver的Websocket Handler (第1版,Socket立即死亡):
@app.websocket('/reload')
async def feed(request, ws):
#time.sleep(42) # this causes the websocket to be created and closed on client side 42 seconds after my request
await ws.send(Path(json).read_text()) # serve initial data
connected_clients.append(ws) # subscribe to websocket list. another thread will read list entries and serve them updatesSanic req服务器的Websocket (第二个版本,Handler阻止其他req处理程序)
@app.websocket('/reload')
async def feed(request, ws):
mod_time = 0
while True:
try:
stat = os.stat(json)
if mod_time != stat.st_mtime:
await ws.send(Path(json).read_text())
except Exception as e:
print("Exception while checking file: ", e)
# this stops the server to handle other @app.routes like css, fonts, faviconSanic Webservers的Websocket (第3版,不必要的recv())
@app.websocket('/reload')
async def feed(request, ws):
mod_time = 0
while True:
try:
stat = os.stat(json)
if mod_time != stat.st_mtime:
await ws.send(Path(json).read_text())
await recv() # if the client sends from time to time all is fine
except Exception as e:
print("Exception while checking file: ", e)最后两个代码片段没有太大的不同。我添加了一个ws.recv(),并从客户端发送一些合适的东西(例如,间隔),然后一切正常工作。然后发送css、字体和图标。但这不是故意的,对吗?这不应该是很好的,对吧?
总之,这对我来说没有多大意义。我是什么夫人?
发布于 2019-06-19 08:10:01
这里有一个卫生中心的人。
首先,作为一个公共类型体系结构的例子,这是一个要点 I准备了。我觉得这可能会有帮助。
我的基本想法是创建一个Feed对象,它在自己的任务中循环,寻找一个事件。在我的例子中,它是从公共部门收到信息。在您的示例中,它应该检查JSON文档上的时间。
然后,当该Feed.receiver触发了一个事件时,它就会向所有侦听的客户端发出请求。
在websocket处理程序内部,您希望保持打开状态。如果没有,则连接将关闭。如果您不关心从客户端接收信息,则不需要使用await recv()。
因此,在您的例子中,使用超级简单逻辑,我将执行如下操作。
这是未经测试的代码,可能需要一些修改的
import os
import random
import string
from functools import partial
from pathlib import Path
from sanic import Sanic
import asyncio
import websockets
from dataclasses import dataclass, field
from typing import Optional, Set
app = Sanic(__name__)
FILE = "/tmp/foobar"
TIMEOUT = 10
INTERVAL = 20
def generate_code(length=12, include_punctuation=False):
characters = string.ascii_letters + string.digits
if include_punctuation:
characters += string.punctuation
return "".join(random.choice(characters) for x in range(length))
@dataclass
class Client:
interface: websockets.server.WebSocketServerProtocol = field(repr=False)
sid: str = field(default_factory=partial(generate_code, 36))
def __hash__(self):
return hash(str(self))
async def keep_alive(self) -> None:
while True:
try:
try:
pong_waiter = await self.interface.ping()
await asyncio.wait_for(pong_waiter, timeout=TIMEOUT)
except asyncio.TimeoutError:
print("NO PONG!!")
await self.feed.unregister(self)
else:
print(f"ping: {self.sid} on <{self.feed.name}>")
await asyncio.sleep(INTERVAL)
except websockets.exceptions.ConnectionClosed:
print(f"broken connection: {self.sid} on <{self.feed.name}>")
await self.feed.unregister(self)
break
async def shutdown(self) -> None:
self.interface.close()
async def run(self) -> None:
try:
self.feed.app.add_task(self.keep_alive())
while True:
pass
except websockets.exceptions.ConnectionClosed:
print("connection closed")
finally:
await self.feed.unregister(self)
class Feed:
app: Sanic
clients: Set[Client]
cached = None
def __init__(self, app: Sanic):
self.clients = set()
self.app = app
@classmethod
async def get(cls, app: Sanic):
is_existing = False
if cls.cached:
is_existing = True
feed = cls.cached
else:
feed = cls(app)
cls.cached = feed
if not is_existing:
feed.app.add_task(feed.receiver())
return feed, is_existing
async def receiver(self) -> None:
print("Feed receiver started")
mod_time = 0
while True:
try:
stat = os.stat(FILE)
print(f"times: {mod_time} | {stat.st_mtime}")
if mod_time != stat.st_mtime:
content = self.get_file_contents()
for client in self.clients:
try:
print(f"\tSending to {client.sid}")
await client.interface.send(content)
except websockets.exceptions.ConnectionClosed:
print(f"ConnectionClosed. Client {client.sid}")
except Exception as e:
print("Exception while checking file: ", e)
async def register(
self, websocket: websockets.server.WebSocketServerProtocol
) -> Optional[Client]:
client = Client(interface=websocket)
print(f">>> register {client}")
client.feed = self
self.clients.add(client)
# Send initial content
content = self.get_file_contents()
client.interface.send(content)
print(f"\nAll clients\n{self.clients}\n\n")
return client
async def unregister(self, client: Client) -> None:
print(f">>> unregister {client} on <{self.name}>")
if client in self.clients:
await client.shutdown()
self.clients.remove(client)
print(f"\nAll remaining clients\n{self.clients}\n\n")
def get_file_contents(self):
return Path(FILE).read_text()
@app.websocket("/reload")
async def feed(request, ws):
feed, is_existing = await Feed.get(app)
client = await feed.register(ws)
await client.run()
if __name__ == "__main__":
app.run(debug=True, port=7777)https://stackoverflow.com/questions/56655894
复制相似问题