首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >具有多个工人的zeromq管道,但带有一个水槽,当工人到达呼吸机时,该水槽按顺序处理工人的输出

具有多个工人的zeromq管道,但带有一个水槽,当工人到达呼吸机时,该水槽按顺序处理工人的输出
EN

Stack Overflow用户
提问于 2015-12-07 20:09:13
回答 1查看 246关注 0票数 0

我想实现分而治之的模式http://zguide.zeromq.org/page:all#Divide-and-Conquer,但不同的是,接收器将以特定的顺序获得结果。更详细地说,呼吸机将接收带有时间戳的事件,然后将它们分派给工人。水槽应该能够按照呼吸机接收的确切顺序对工人处理的事件在其输出中进行排队。我不介意由此产生的额外延迟,因为我有多个事件几乎同时到达呼吸机,并且工人的过程时间实际上是固定的。我计划随机地将事件分发给工人,或者使用背压来保持每个工人的消息长度较小。

我想将时间戳/assigned_worker_id对从呼吸器直接发布到接收器,然后循环接收器,直到我想要的对准备好被拉出。以类似的方式,我可以在工人和接收器之间建立一个REQ/REP (但还不确定哪个会成为REQ )。还有什么我看不到的更简单的想法吗?非常感谢,filimon

EN

回答 1

Stack Overflow用户

发布于 2015-12-07 21:26:55

对此的一般解决方案可以在TCP协议中找到,TCP协议还必须从无序流重新创建数据顺序。这里的方法使用滑动窗口来存储和排序数据。此外,它还可以处理丢失的数据。

如果您不需要处理丢失的输入数据,您可以只对呼吸器中的请求进行编号,并在接收器中对它们进行排队,直到下一个请求到达将其发送出去。在C++/zeroMQ伪代码中,如下所示

代码语言:javascript
运行
复制
class Ventilator {
public:
    void newRequest(...) {
         // create multi-part message [counter, message]
         // and send it on the push socket
         out.send( counter++ );
         out.send( ... );
    }
private:
    zmq::socket_t out;
    int counter = 0;
};

// the workers receive the multi-part message, produce a result
// and send it with the counter to the sink
class Worker {
    void run() {
        auto counter = in.recv();
        auto request = in.recv();
        // process and create result message
        auto result = process(request);
        out.send( counter, ZMQ_SNDMORE);
        out.send( result );
    }
private:
    zmq::socket_t in;
    zmq::socket_t out;
};

class Sink {
    void run() {
        auto counter = in.recv();
        auto data = in.recv();
        auto const c = static_cast<int*>(counter.data());

        auto cmp = [](auto x, auto y) {return x.first < y.first;};
        q.push_back( make_pair(( c, data) );
        std::push_heap( q.begin(), q.end(), cmp );

        if (lastCounter == -1 || lastCounter+1 == c)
        {
            // this is the next message in order
            std::pop_heap( q.begin(), q.end(), cmp );
            out.send( q.back() );
            q.pop_back();
            lastCounter = c;
        }
    }        

    zmq::socket_t in;
    zmq::socket_t out;
    std::vector< std::pair<int, zmq::message_t>> q;
    int lastCounter = -1; // to handle first answer
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34133226

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档