首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >zeromq:如何防止无限等待?

zeromq:如何防止无限等待?
EN

Stack Overflow用户
提问于 2011-09-24 20:26:00
回答 4查看 77.7K关注 0票数 76

我刚刚开始使用ZMQ。我正在设计一个应用程序,它的工作流程是:

  1. 许多客户端(具有随机拉取地址)之一将请求推送到5555

的服务器,该服务器永远在等待客户端PUSHes。当一个请求到来时,将为该特定请求派生一个工作进程。是的,工作进程可以存在,当进程完成它的任务时,它会将结果PUSHes到客户端。

我假设PUSH/PULL架构适用于此。请在这一点上纠正我。

但是我该如何处理这些场景呢?

  1. 当服务器无法响应时,client_receiver.recv()将无限期地等待。
  2. 客户端可能会发送请求,但它将立即失败,因此工作进程将永远停留在server_sender.send()。

那么如何在PUSH/PULL模型中设置类似timeout的东西呢?

:谢谢用户938949的建议,我得到了一个working answer,我正在将它分享给后人。

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2011-09-25 00:33:22

如果您使用的是>= 3.0,那么您可以设置RCVTIMEO套接字选项:

代码语言:javascript
复制
client_receiver.RCVTIMEO = 1000 # in milliseconds

但一般来说,您可以使用轮询器:

代码语言:javascript
复制
poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send

poller.poll()需要一个超时:

代码语言:javascript
复制
evts = poller.poll(1000) # wait *up to* one second for a message to arrive.

如果没有要接收的内容,则evts将是一个空列表。

您可以使用zmq.POLLOUT进行轮询,以检查发送是否成功。

或者,要处理可能已失败的对等体的情况,请执行以下操作:

代码语言:javascript
复制
worker.send(msg, zmq.NOBLOCK)

可能足够了,它将始终立即返回-如果发送不能完成,则引发一个ZMQError(zmq.EAGAIN)。

票数 85
EN

Stack Overflow用户

发布于 2011-09-26 16:47:26

这是在我参考了user938949的答案和http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/之后,我做的一个快速破解。如果你做得更好,请发布你的答案,我会推荐你的答案

对于那些想要持久解决方案 on可靠性的人,请参阅http://zguide.zeromq.org/page:all#toc64

zeromq (beta ATM) 3.0版支持ZMQ_RCVTIMEO和ZMQ_SNDTIMEO中的timeouthttp://api.zeromq.org/3-0:zmq-setsockopt

服务器

zmq.NOBLOCK确保当客户端不存在时,send()不会阻塞。

代码语言:javascript
复制
import time
import zmq
context = zmq.Context()

ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

i=0

while True:
    i=i+1
    time.sleep(0.5)
    print ">>sending message ",i
    try:
        ventilator_send.send(repr(i),zmq.NOBLOCK)
        print "  succeed"
    except:
        print "  failed"

客户端

轮询器对象可以监听许多接收套接字(参见上面链接的“使用ZeroMQ的Python多处理”)。我只在work_receiver.上链接了它在无限循环中,客户端以1000ms为间隔进行轮询。如果在该时间内未收到任何消息,则socks对象将返回空。

代码语言:javascript
复制
import time
import zmq
context = zmq.Context()

work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")

poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)

# Loop and accept messages from both channels, acting accordingly
while True:
    socks = dict(poller.poll(1000))
    if socks:
        if socks.get(work_receiver) == zmq.POLLIN:
            print "got message ",work_receiver.recv(zmq.NOBLOCK)
    else:
        print "error: message timeout"
票数 16
EN

Stack Overflow用户

发布于 2012-06-01 15:50:16

如果您使用ZMQ_NOBLOCK,发送将不会阻塞,但如果您尝试关闭套接字和上下文,此步骤将阻止程序退出。

原因是套接字等待任何对等点,以确保传出消息排队。要立即关闭套接字并刷新缓冲区中的传出消息,请使用ZMQ_LINGER并将其设置为0。

票数 10
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/7538988

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档