我们在ZeroMQ中使用的是一种ZeroMQ可扩展的正式通信模式。发送者应用程序共发送30,000条消息,每个10 of。有很多数据丢失,因此我们在发送方设置了以下内容:
zmq_socket = context.socket(zmq.PUSH)
zmq_socket.setsockopt(zmq.SNDBUF, 10240)
zmq_socket.setsockopt(zmq.SNDHWM, 1)
zmq_socket.bind("tcp://127.0.0.1:4999")
在接收方:
zmq_socket = context.socket(zmq.PULL)
zmqSocket.setReceiveBufferSize(10240);
zmqSocket.setRcvHWM(1);
zmq_socket.connect("tcp://127.0.0.1:4999")
仍然有数据丢失。不知道如何才能避免数据包被悄悄丢弃。
编辑1:
Python的发件人代码:
context = zmq.Context()
zmq_socket = context.socket(zmq.PUSH)
zmq_socket.setsockopt(zmq.SNDBUF, 10240)
zmq_socket.setsockopt(zmq.SNDHWM, 1)
zmq_socket.bind("tcp://127.0.0.1:4999")
for file_name in list_of_files: # Reads data from a list of files:
while True: # data from a file_name
with open(os.path.join(self.local_base_dir,file_name), 'r') as sf:
socket_data = sf.read(5120)
if socket_data == '':
sf.close()
break # until EoF
ret = zmq_socket.send(socket_data)
if ret == 0:
return True
if ret == -1:
print zmq_errno()
接收机代码
private ZMQ.Socket zmqSocket = zmqContext.socket(ZMQ.PULL);
zmqSocket.setReceiveBufferSize(10240);
zmqSocket.setRcvHWM(1);
zmqSocket.connect(socketEndpoint);
String message = new String(zmqSocket.recv());
messages.add(message);
发布于 2017-05-09 11:53:54
使用ZMQ_IMMEDIATE
标志可以防止在没有已完成连接的管道中排队的消息丢失。
来自http://api.zeromq.org/4-2:zmq-setsockopt#toc21
只发送到已完成连接的
ZMQ_IMMEDIATE
**:队列消息** 默认情况下,即使连接尚未完成,队列也会填充传出连接。这可能导致在具有循环路由(REQ
、PUSH
、DEALER
)的套接字上“丢失”消息。如果此选项设置为1,则消息将只在已完成的连接上排队。如果没有其他连接,这将导致套接字阻塞,但将防止队列在等待连接的管道上填充。选项值类型int选项值单位布尔默认值0
(false
)适用的套接字类型全部,只适用于面向连接的传输。
zmq_socket.setsockopt(zmq.ZMQ_IMMEDIATE, 1)
https://stackoverflow.com/questions/43771830
复制相似问题