首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Python多线程ZeroMQ REP

Python多线程ZeroMQ REP
EN

Stack Overflow用户
提问于 2014-08-18 16:16:51
回答 1查看 6K关注 0票数 3

我希望用Python和ZeroMQ使用多线程实现REQ模式。

使用Python,当一个新客户端连接到服务器时,我可以创建一个新线程。此线程将处理与该特定客户端的所有通信,直到套接字关闭为止:

代码语言:javascript
运行
复制
# Thread that will handle client's requests
class ClientThread(threading.Thread):
    # Implementation...
    def __init__(self, socket):
        threading.Thread.__init__(self)
        self.socket = socket
    def run(self):
        while keep_alive:
            # Thread can receive from client
            data = self.socket.recv(1024)
            # Processing...
            # And send back a reply
            self.socket.send(reply)

while True:
    # The server accepts an incoming connection
    conn, addr = sock.accept()
    # And creates a new thread to handle the client's requests
    newthread = ClientThread(conn)
    # Starting the thread
    newthread.start()

使用ZeroMQ可以做同样的事情吗?我已经看到了一些用ZeroMQ和Python进行多线程处理的例子,但是在所有这些例子中,一开始都用固定数量的线程创建了一个线程池,并且似乎更倾向于负载平衡。

*请注意,我想要的是保持客户机和它的线程之间的连接是活动的,因为线程期待来自客户端的多个REQ消息,并且它将存储必须在消息之间保存的信息(即:在新的REQ消息上递增其值的变量计数器;因此每个线程都有自己的变量,任何其他客户端都不应该能够访问该线程)。新客户端=新线程。

EN

Stack Overflow用户

发布于 2014-08-18 18:11:20

是的,ZeroMQ是一个强大的可以做的工具箱。

然而,最令人惊讶的是,ZeroMQ -s的结构要比在示例中使用的普通套接字要强得多。

{ aZmqContext -> aZmqSocket -> aBehavioralPrimitive }

ZMQ-Context,是(并将继续)作为“共享”使用的唯一东西,ZeroMQ在"singleton“框架下构建了一个卓越的、丰富的抽象框架。

线程不应“共享”任何其他“派生”对象,它们的状态越少,因为实现了一个强大的分布式责任框架体系结构,这既是为了清洁设计,也是为了高性能和低延迟。

对于所有的ZMQ-Socket-s来说,我们应该想象一个更聪明、分层的子结构,其中一个人接收到对I/O活动的卸载担忧(在ZMQ-Context责任中管理--因此保持-活的问题、时间问题和公平队列缓冲/选择--轮询问题只是停止了对您来说可见的. ),有一种正式的通信模式behaviour (由所选的ZMQ-Socket-type原型提供)。

最后

ZeroMQ和类似的nanomsg库都是类似于LEGO的项目,作为一名架构师和设计师,您可以从一开始就意识到这一点。

因此,人们可以专注于分布式系统的行为,而不是浪费时间和精力来解决仅仅是另一个套接字消息传递的噩梦。

(这两本书绝对值得一看,这两本书都出自ZeroMQ的共同父亲Pieter。)在这里你可以找到很多关于这个伟大主题的-moments。)

..。就像蛋糕上的樱桃--所有这些都是不可知的、通用的环境,无论是在inproc://,上传递一些消息,还是通过ipc://,或者通过tcp://层并行地听/说。

EDIT#12014-08-19 17:00 [UTC+0000]

请查看下面的注释,并进一步检查您的--初级和高级--设计--处理<琐碎--容易失败>--派生处理、<负载平衡>--REP-worker排队、-分布式处理和<故障弹性_mode>-REP-worker二进制-启动阴影处理的选项。

没有堆的模拟SLOC(s),没有一个单一的代码样例将进行一刀切.

这在设计分布式消息传递系统时是成倍有效的。

代码语言:javascript
运行
复制
"""REQ/REP modified with QUEUE/ROUTER/DEALER add-on ---------------------------

   Multithreaded Hello World server

   Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>

"""
import time
import threading
import zmq

print "ZeroMQ version sanity-check: ", zmq.__version__

def aWorker_asRoutine( aWorker_URL, aContext = None ):
    """Worker routine"""
    #Context to get inherited or create a new one trick------------------------------
    aContext = aContext or zmq.Context.instance()
    
    # Socket to talk to dispatcher --------------------------------------------------
    socket = aContext.socket( zmq.REP )
    
    socket.connect( aWorker_URL )
    
    while True:

        string  = socket.recv()

        print( "Received request: [ %s ]" % ( string ) )
        
        # do some 'work' -----------------------------------------------------------
        time.sleep(1)

        #send reply back to client, who asked --------------------------------------
        socket.send( b"World" )

def main():
    """Server routine"""

    url_worker = "inproc://workers"
    url_client = "tcp://*:5555"

    # Prepare our context and sockets ------------------------------------------------
    aLocalhostCentralContext = zmq.Context.instance()

    # Socket to talk to clients ------------------------------------------------------
    clients = aLocalhostCentralContext.socket( zmq.ROUTER )
    clients.bind( url_client )

    # Socket to talk to workers ------------------------------------------------------
    workers = aLocalhostCentralContext.socket( zmq.DEALER )
    workers.bind( url_worker )

    # --------------------------------------------------------------------||||||||||||--
    # Launch pool of worker threads --------------< or spin-off by one in OnDemandMODE >
    for i in range(5):
        thread = threading.Thread( target = aWorker_asRoutine, args = ( url_worker, ) )
        thread.start()

    zmq.device( zmq.QUEUE, clients, workers )

    # ----------------------|||||||||||||||------------------------< a fair practice >--
    # We never get here but clean up anyhow
    clients.close()
    workers.close()
    aLocalhostCentralContext.term()

if __name__ == "__main__":
    main()
票数 5
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/25367700

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档