前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >zmq 协议_zmq通信协议

zmq 协议_zmq通信协议

作者头像
全栈程序员站长
发布2022-11-04 16:20:14
1.2K0
发布2022-11-04 16:20:14
举报
文章被收录于专栏:全栈程序员必看

文章目录

ZMQ 通信协议小结 🐝

最近有时间了把这个坑填一填!!!

前言 🔍

  • 项目中涉及到 zmq通信协议相关内容,所以将学习、使用过程同步分享
  • 通篇以代码分享为主,且本文对底层socket不做过多叙述,以实际应用为准,希望能帮到各位!
  • Talk is cheap, Show me the code

zmq的三种模型 💦

1、Request_Reply模式(请求——应答): REP、 REQ ☎️
  • 一发一收 无缓存 断开连接数据丢失;
  • 生产中也可以一个server对应多个client;
  • 双向消息,REP(server)端必须recv到REQ(client)的消息之后,调用send返回,否则通道堵塞; 相同的 REQ(client)端负责send消息到REP(server),然后调用recv获取REP(server)的返回;
伪代码

server.py

代码语言:javascript
复制
# 1、Request_Reply模式
# server
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://*:5556')

while True:
    message = socket.recv()
    print(message)
    socket.send('server response')

Jetbrains全家桶1年46,售后保障稳定

client.py

代码语言:javascript
复制
# client import zmqimport syscontext = zmq.Context()socket = context.socket(zmq.REQ)socket.connect('tcp://localhost:5556')while True:    data = raw_input('input your data:')    if data == 'q':        sys.exit()    socket.send(data)    response = socket.recv()    print(response)
应用场景

场景说明: 我们定义一个非阻塞 的消息通道, 用作发送特定的Python结构体数据,包含三个文件如下:

Code:

server.py

代码语言:javascript
复制
import time
import zmq
from data import zmqStruct

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5656")

while True:
    try:
        message = socket.recv_pyobj(zmq.NOBLOCK)
        print(message)
        #time.sleep(1)
        socket.send_pyobj('123123123')
    except zmq.Again as e:
        if e.errno!=zmq.EAGAIN:
            print(repr(e))

    time.sleep(1)

client.py

代码语言:javascript
复制
from data import zmqStruct

def zmqREQ():
    import zmq
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://{}:5656".format('192.168.24.107'))
    return socket

sendStruct = zmqStruct()
zmqClient = zmqREQ()
zmqClient.send_pyobj(sendStruct)
print zmqClient.recv_pyobj()

data.py

代码语言:javascript
复制
class zmqStruct(onject):  # 消息结构体
	def __init__(self, cmd=0, data=None, desc=''):
        self.cmd = cmd
        self.data = data
        self.desc = desc

2、Publish-Subscribe模式(发布——订阅): PUB、SUB 🎙
  • 广播所有client,无缓存,断开连接数据丢失。(当然所有的问题都可以通过增加中间层的方式解决);
  • 发布端发布主题topic,订阅端只会收到已订阅的主题topic;
  • PUB端发送消息,SUB端接受消息;
  • SUB可以注册多个PUB;
  • 如果PUB没有任何SUB,那么消息将会被丢弃;
  • SUB端消费过慢,消息则堆积到PUB端
  • 单工-单向数据传输
伪代码

server.py

代码语言:javascript
复制
# 2、Publish-Subscribe模式
# server
import zmq

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5005")
while True:
    msg = input('input your data:').encode('utf-8')
    socket.send(msg)

client.py

代码语言:javascript
复制
# client
import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:5005')
# 使用socket.setsockopt()进行过滤
socket.setsockopt(zmq.SUBSCRIBE,b'')
while True:
    print(socket.recv_string())
应用场景

场景说明: 我们假定 有一个任务调度器 , 结构为 1个 master 对应 10个 slave, master接受任务,将任务投递给 slave.

Code:

master.py

代码语言:javascript
复制
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5000")


tasks = [i for i in range(100)]


def pub():
	# 这个延时 是为了服务端绑定 socket 后会等待200毫秒避免消息丢失; 也是为了保证服客户端环境完备的折中之举
    time.sleep(1)  
    for i in tasks:
        socket.send(str(i))


if __name__ == '__main__':
    pub()

slave.py

代码语言:javascript
复制
import time
import threading
from concurrent.futures import ThreadPoolExecutor
import zmq  
context = zmq.Context()  
socket = context.socket(zmq.SUB)  
socket.connect("tcp://127.0.0.1:5000")  
socket.setsockopt(zmq.SUBSCRIBE, '')
threadpool = ThreadPoolExecutor(10)
def submsg():
""" socket 接受消息使用 `zmq.NOBLOCK` 非阻塞模式来进行,可以保证保证循环体内的其他功能正常使用 :return: """
while 1:
try:
msg = socket.recv(flags=zmq.NOBLOCK)
except zmq.Again as e:
if e.errno != zmq.EAGAIN:
print(repr(e))
else:
print '接收到广播消息,线程池投递任务 msg={}'.format(msg)
threadpool.submit(work, msg)
def work(msg):
print '开始工作 参数{}'.format(msg)
time.sleep(2)  # 模拟功能执行时间
print '结束工作'
if __name__ == '__main__':
submsg()

3、Parallel Pipeline模式(push——pull): PUSH、PULL 🔗
  • 管道模式(单工) – 单向通道;
  • 可以由三部分组成:push推送数据,work缓存数据,pull竞争数据,断开连接数据不丢失,重连继续发送。work中间件可以去掉;
伪代码

server.py

代码语言:javascript
复制
# 3、Parallel Pipeline模式
# server
import zmq
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind('tcp://*:5566')
while True:
data = socket.recv()
print(data)

work.py

代码语言:javascript
复制
# work 无work push 会阻塞掉
import zmq
context = zmq.Context()
recive = context.socket(zmq.PULL)
recive.connect('tcp://127.0.0.1:5565')
sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5566')
while True:
data = recive.recv()
sender.send(data)

client.py

代码语言:javascript
复制
# client
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind('tcp://*:5565')
while True:
data = raw_input('input your data:')
socket.send(data)
应用场景

场景说明:

Code:

代码语言:javascript
复制

Error:

代码语言:javascript
复制

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/203674.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022年10月23日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • ZMQ 通信协议小结 🐝
    • 前言 🔍
      • zmq的三种模型 💦
        • 1、Request_Reply模式(请求——应答): REP、 REQ ☎️
        • 2、Publish-Subscribe模式(发布——订阅): PUB、SUB 🎙
        • 3、Parallel Pipeline模式(push——pull): PUSH、PULL 🔗
    相关产品与服务
    消息队列 TDMQ
    消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档