我正在尝试从flask-socketio事件处理程序中监听rabbitmq队列,这样我就可以向web应用程序发送实时通知。到目前为止,我的设置:
服务器
import pika
import sys
from flask import Flask, request
from flask_socketio import SocketIO, emit, disconnect
app = Flask(__name__)
app.config['SECRET_KEY'] = 'not-so-secret'
socketio = SocketIO(app)
def is_authenticated():
return True
def rabbit_callback(ch, method, properties, body):
socketio.emit('connect', {'data': 'yes'})
print "body: ", body
@socketio.on('connect')
def connected():
emit('notification', {'data': 'Connected'})
creds = pika.PlainCredentials(
username="username",
password="password")
params = pika.ConnectionParameters(
host="localhost",
credentials=creds,
virtual_host="/")
connection = pika.BlockingConnection(params)
# This is one channel inside the connection
channel = connection.channel()
# Declare the exchange we're going to use
exchange_name = 'user'
channel.exchange_declare(exchange=exchange_name,
type='topic')
channel.queue_declare(queue='notifications')
channel.queue_bind(exchange='user',
queue='notifications',
routing_key='#')
channel.basic_consume(rabbit_callback,
queue='notifications',
no_ack=True)
channel.start_consuming()
if __name__ == '__main__':
socketio.run(app, port=8082)浏览器
<script type="text/javascript" charset="utf-8">
var socket = io.connect('http://' + document.domain + ':8082');
socket.on('connect', function(resp) {
console.log(resp);
});
socket.on('disconnect', function(resp) {
console.log(resp);
});
socket.on('error', function(resp) {
console.log(resp);
});
socket.on('notification', function(resp) {
console.log(resp);
});
</script>如果我注释掉服务器代码底部的"channel.start_consuming()“行并加载浏览器页面,我将成功连接到flask-socketio,并在控制台中看到{data:"Connected"}。
当我取消对该行的注释时,我在控制台中看不到{data:"Connected"}。但是,当我向通知队列发送消息时,会触发rabbit_callback函数。我看到我的消息打印到服务器控制台,但emit调用似乎不起作用。服务器或浏览器中没有错误。任何建议都是非常感谢的。
谢谢!
发布于 2017-09-14 18:04:30
我在使用eventlet时也遇到了同样的问题,我只需添加以下内容即可解决:
import eventlet
eventlet.monkey_patch(),在我的源代码的开头。
无论如何,我的代码有点不同,并且使用了start_background_task方法:
import pika
from threading import Lock
from flask import Flask, render_template, session, request, copy_current_request_context
from flask_socketio import SocketIO, emit, join_room, leave_room, \
close_room, rooms, disconnect
app = Flask(__name__, static_url_path='/static')
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, async_mode=async_mode)
thread = None
thread_lock = Lock()
@socketio.on('connect', namespace='/test')
def test_connect():
global thread
with thread_lock:
if thread is None:
thread = socketio.start_background_task(target=get_messages)
emit('my_response', {'data': 'Connected', 'count': 0})
print('connected')
def get_messages():
channel = connect_rabbitmq()
channel.start_consuming()
def connect_rabbitmq():
cred = pika.credentials.PlainCredentials('username', 'password')
conn_param = pika.ConnectionParameters(host='yourhostname',
credentials=cred)
connection = pika.BlockingConnection(conn_param)
channel = connection.channel()
channel.exchange_declare(exchange='ncs', exchange_type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='myexchangename', queue=queue_name)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
return channel希望这能帮到你。
https://stackoverflow.com/questions/33872063
复制相似问题