我正在尝试构建的python应用程序遇到了一个问题:基本上,我有一个从Redis PUBSUB连接接收简单json数据的脚本,我想将这些数据作为websocket服务器提供给客户端。因此,基本上,每次我从redis连接收到一条消息时,都必须使用websockets将该消息发送到客户端。
下面是我的基本代码:
从redis pubsub连接接收数据的部分:
import json
import redis
redis_url = 'MY-URL'
channel = 'test'
connection = redis.StrictRedis.from_url(redis_url, decode_responses=True)
pubsub = connection.pubsub(ignore_subscribe_messages=False)
pubsub.subscribe(channel)
for item in pubsub.listen():
message = item['data']
if type(message) != int:
message = json.loads(message)
print(message)
这是一个简单的websocket服务器,我使用
import asyncio
import websockets
async def main(websocket, path):
while True:
await websockets.send('Some data')
start_server = websockets.serve(main, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
我找不到合并这两部分代码的方法。有什么方法可以做到这一点吗?
发布于 2021-02-28 20:38:36
解决方案1
在此示例中,每个新的
连接请求我们正在注册新的监听器
import json
import redis
import asyncio
import websockets
redis_url = 'redis://localhost:6379/0'
channel = 'test'
connection = redis.StrictRedis.from_url(redis_url, decode_responses=True)
pubsub = connection.pubsub(ignore_subscribe_messages=False)
pubsub.subscribe(channel)
async def main(websocket, path):
for item in pubsub.listen():
message = item['data']
if type(message) != int:
message = json.loads(message)
print(message)
await websocket.send(message)
start_server = websockets.serve(main, "localhost", 8765)
print("Started")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
解决方案2
在这里,我们使用单个listerer向多个客户端发送事件,这些客户端通过websocket注册,并向打开的websocket连接发送消息
import json
import redis
import gevent
from flask import Flask
from flask_sockets import Sockets
redis_url = 'redis://localhost:6379/0'
channel = 'test'
connection = redis.StrictRedis.from_url(redis_url, decode_responses=True)
class PubSubListener(object):
def __init__(self):
self.clients = []
self.pubsub = connection.pubsub(ignore_subscribe_messages=False)
self.pubsub.subscribe(**{channel: self.handler})
self.thread = self.pubsub.run_in_thread(sleep_time=0.001)
def register(self, client):
self.clients.append(client)
def handler(self, message):
_message = message['data']
if type(_message) != int:
self.send(_message)
def send(self, data):
for client in self.clients:
try:
client.send(data)
except Exception:
self.clients.remove(client)
pslistener = PubSubListener()
app = Flask(__name__)
sockets = Sockets(app)
@sockets.route('/echo')
def echo_socket(ws):
pslistener.register(ws)
while not ws.closed:
gevent.sleep(0.1)
@app.route('/')
def hello():
return 'Hello World!'
if __name__ == "__main__":
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
print("Started")
server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
server.serve_forever()
https://stackoverflow.com/questions/66370642
复制相似问题