我很难理解ZeroMQ高水标记队列是如何工作的。
我已经制作了两个脚本附在下面,它们复制以下内容。
我得到的结果是,拉手能够成功地接收(打印)所有消息。而且,推手似乎几乎立刻就完成了处决。根据ZMQ正式文档,我所期望的是推手在拉出器醒来之前不完成执行,因为在第二个send(...)
调用中由于到达HWM而被阻塞。我还尝试在每个send(...)
调用之间添加0.001秒睡眠,结果相同。
所以,我的问题是:
send(...)
时没有阻塞(大小为1)?剧本:
pusher.py
import zmq
context = zmq.Context()
push_socket = context.socket(zmq.PUSH)
push_socket.setsockopt(zmq.SNDHWM, 1)
push_socket.setsockopt(zmq.RCVHWM, 1)
push_socket.bind("tcp://127.0.0.1:5557")
print(push_socket.get_hwm()) # Prints 1
print('Sending all messages')
for i in range(2200):
push_socket.send(str(i).encode('ascii'))
print('Finished execution...')
puller.py
import zmq
import time
context = zmq.Context()
pull_socket = context.socket(zmq.PULL)
pull_socket.setsockopt(zmq.RCVHWM, 1)
pull_socket.setsockopt(zmq.SNDHWM, 1)
pull_socket.connect("tcp://127.0.0.1:5557")
print(pull_socket.get_hwm()) # Prints 1
print('Connected, but not receiving yet... (Sleep 4s)')
time.sleep(4)
print('Receiving everything now!')
rec = ''
for i in range(2200):
rec += '{} '.format(pull_socket.recv().decode('ascii'))
print(rec) # Prints `0 1 2 ... 2198 2199 `
为了重现我的测试用例,打开两个终端并在一个中启动第一个puller.py,然后在另一个中快速地启动(4秒窗口) pusher.py。
发布于 2017-03-22 10:53:49
这里至少涉及4个缓冲区: zmq发送缓冲区、OS写tcp缓冲区、OS读取tcp缓冲区和zmq recv缓冲区。
zmq线程在成功写入OS tcp写缓冲区时将消息标记为“发送”。这些信息现在被视为“过境”。
然后,网络堆栈将尽可能多地传输到其他进程的匹配的OS recv缓冲区中,最后,接收到的zmq线程最多一次从该缓冲区读取HWM消息到ZMQ队列。
在默认情况下,OS缓冲区通常在10-100‘t左右,这两个缓冲区都可以在ZMQ甚至注意到对方没有使用任何消息之前完全填满“正在传输”的消息。出于性能原因,这些缓冲区是必需的--您不能就这样摆脱它们。
您的问题的解决方案可能涉及req/rep套接字和一个明确的应用程序级ack,即指南中的懒惰盗版模式。
https://stackoverflow.com/questions/42948798
复制相似问题