我想实现分而治之的模式http://zguide.zeromq.org/page:all#Divide-and-Conquer,但不同的是,接收器将以特定的顺序获得结果。更详细地说,呼吸机将接收带有时间戳的事件,然后将它们分派给工人。水槽应该能够按照呼吸机接收的确切顺序对工人处理的事件在其输出中进行排队。我不介意由此产生的额外延迟,因为我有多个事件几乎同时到达呼吸机,并且工人的过程时间实际上是固定的。我计划随机地将事件分发给工人,或者使用背压来保持每个工人的消息长度较小。
我想将时间戳/assigned_worker_id对从呼吸器直接发布到接收器,然后循环接收器,直到我想要的对准备好被拉出。以类似的方式,我可以在工人和接收器之间建立一个REQ/REP (但还不确定哪个会成为REQ )。还有什么我看不到的更简单的想法吗?非常感谢,filimon
发布于 2015-12-07 21:26:55
对此的一般解决方案可以在TCP协议中找到,TCP协议还必须从无序流重新创建数据顺序。这里的方法使用滑动窗口来存储和排序数据。此外,它还可以处理丢失的数据。
如果您不需要处理丢失的输入数据,您可以只对呼吸器中的请求进行编号,并在接收器中对它们进行排队,直到下一个请求到达将其发送出去。在C++/zeroMQ伪代码中,如下所示
class Ventilator {
public:
void newRequest(...) {
// create multi-part message [counter, message]
// and send it on the push socket
out.send( counter++ );
out.send( ... );
}
private:
zmq::socket_t out;
int counter = 0;
};
// the workers receive the multi-part message, produce a result
// and send it with the counter to the sink
class Worker {
void run() {
auto counter = in.recv();
auto request = in.recv();
// process and create result message
auto result = process(request);
out.send( counter, ZMQ_SNDMORE);
out.send( result );
}
private:
zmq::socket_t in;
zmq::socket_t out;
};
class Sink {
void run() {
auto counter = in.recv();
auto data = in.recv();
auto const c = static_cast<int*>(counter.data());
auto cmp = [](auto x, auto y) {return x.first < y.first;};
q.push_back( make_pair(( c, data) );
std::push_heap( q.begin(), q.end(), cmp );
if (lastCounter == -1 || lastCounter+1 == c)
{
// this is the next message in order
std::pop_heap( q.begin(), q.end(), cmp );
out.send( q.back() );
q.pop_back();
lastCounter = c;
}
}
zmq::socket_t in;
zmq::socket_t out;
std::vector< std::pair<int, zmq::message_t>> q;
int lastCounter = -1; // to handle first answer
}
https://stackoverflow.com/questions/34133226
复制相似问题