最近有时间了把这个坑填一填!!!
REP(server)
端必须recv到REQ(client)
的消息之后,调用send返回,否则通道堵塞; 相同的 REQ(client)
端负责send消息到REP(server)
,然后调用recv获取REP(server)
的返回;server.py
# 1、Request_Reply模式
# server
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://*:5556')
while True:
message = socket.recv()
print(message)
socket.send('server response')
client.py
# client import zmqimport syscontext = zmq.Context()socket = context.socket(zmq.REQ)socket.connect('tcp://localhost:5556')while True: data = raw_input('input your data:') if data == 'q': sys.exit() socket.send(data) response = socket.recv() print(response)
场景说明:
我们定义一个非阻塞
的消息通道, 用作发送特定的Python结构体数据,包含三个文件如下:
Code:
server.py
import time
import zmq
from data import zmqStruct
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5656")
while True:
try:
message = socket.recv_pyobj(zmq.NOBLOCK)
print(message)
#time.sleep(1)
socket.send_pyobj('123123123')
except zmq.Again as e:
if e.errno!=zmq.EAGAIN:
print(repr(e))
time.sleep(1)
client.py
from data import zmqStruct
def zmqREQ():
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://{}:5656".format('192.168.24.107'))
return socket
sendStruct = zmqStruct()
zmqClient = zmqREQ()
zmqClient.send_pyobj(sendStruct)
print zmqClient.recv_pyobj()
data.py
class zmqStruct(onject): # 消息结构体
def __init__(self, cmd=0, data=None, desc=''):
self.cmd = cmd
self.data = data
self.desc = desc
server.py
# 2、Publish-Subscribe模式
# server
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5005")
while True:
msg = input('input your data:').encode('utf-8')
socket.send(msg)
client.py
# client
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:5005')
# 使用socket.setsockopt()进行过滤
socket.setsockopt(zmq.SUBSCRIBE,b'')
while True:
print(socket.recv_string())
场景说明:
我们假定 有一个任务调度器 , 结构为 1个 master 对应 10个 slave
, master接受任务,将任务投递给 slave.
Code:
master.py
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5000")
tasks = [i for i in range(100)]
def pub():
# 这个延时 是为了服务端绑定 socket 后会等待200毫秒避免消息丢失; 也是为了保证服客户端环境完备的折中之举
time.sleep(1)
for i in tasks:
socket.send(str(i))
if __name__ == '__main__':
pub()
slave.py
import time
import threading
from concurrent.futures import ThreadPoolExecutor
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:5000")
socket.setsockopt(zmq.SUBSCRIBE, '')
threadpool = ThreadPoolExecutor(10)
def submsg():
""" socket 接受消息使用 `zmq.NOBLOCK` 非阻塞模式来进行,可以保证保证循环体内的其他功能正常使用 :return: """
while 1:
try:
msg = socket.recv(flags=zmq.NOBLOCK)
except zmq.Again as e:
if e.errno != zmq.EAGAIN:
print(repr(e))
else:
print '接收到广播消息,线程池投递任务 msg={}'.format(msg)
threadpool.submit(work, msg)
def work(msg):
print '开始工作 参数{}'.format(msg)
time.sleep(2) # 模拟功能执行时间
print '结束工作'
if __name__ == '__main__':
submsg()
server.py
# 3、Parallel Pipeline模式
# server
import zmq
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind('tcp://*:5566')
while True:
data = socket.recv()
print(data)
work.py
# work 无work push 会阻塞掉
import zmq
context = zmq.Context()
recive = context.socket(zmq.PULL)
recive.connect('tcp://127.0.0.1:5565')
sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5566')
while True:
data = recive.recv()
sender.send(data)
client.py
# client
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind('tcp://*:5565')
while True:
data = raw_input('input your data:')
socket.send(data)
场景说明:
Code:
Error:
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/203674.html原文链接:https://javaforall.cn