首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >带有zeromq (jeromq)的路由文件

带有zeromq (jeromq)的路由文件
EN

Stack Overflow用户
提问于 2013-12-02 19:01:07
回答 1查看 1.2K关注 0票数 1

我试图在zmq上实现一个“文件分发程序”(实际上,我希望避免jni)。

我需要的是将传入的文件加载到处理器上:

  • 每个文件只由一个处理器处理。
  • 文件可能很大,所以我需要管理文件传输。

理想情况下,我想要像https://github.com/zeromq/filemq这样的东西,但是

  • 使用推/拉行为,而不是发布/订阅
  • 能够处理接收到的文件,而不是将其写入磁盘

我的想法是混合使用taskvent/task接收器和异步and示例。

客户端:

  • 一个拉套接字将被通知要处理的文件
  • 一个交易商套接字来处理(异步)文件逐块传输

服务器端:

  • 一个推送套接字来分派传入文件(名称)
  • 一个处理文件请求的路由器套接字
  • 一些交易商工作人员管理客户端的文件传输,并通过inproc代理连接到路由器。

我的第一个问题是:这似乎是正确的方式吗?可能还有更简单的吗?

我的第二个问题是:我当前的暗示是在发送实际的文件数据时陷入困境。

  • 客户端由服务器通知,并发出请求。
  • 服务器工作人员获得请求,并将响应写回inproc队列,但是响应似乎永远不会离开服务器(在wireshark中看不到),客户端被卡在poller.poll上等待响应。

这不是套接字填满和丢弃数据的问题,我从一次发送的非常小的文件开始。

有洞察力吗?

谢谢!

==================

按照raffian的建议,我简化了我的代码,删除了推拉额外的套接字(既然你这么说了,这是有意义的)。

剩下的是“不工作”插座!

这是我目前的密码。它有许多目前超出范围的缺陷(客户机ID、下一个块等等)

现在,我只是想让两个人按顺序说话

  • 服务器 对象FileDispatcher扩展应用程序{ val context = ZMQ.context(1) // server是将文件名推送给客户端并接收请求的前端,val server = context.socket(ZMQ.ROUTER) server.bind(“tcp:/*:5565”) //后端处理客户端请求val后端= context.socket(ZMQ.DEALER) backend.bind(“inproc: //后端”)//文件在参数args.toList.foreach {filepath=>println(s“发布$filepath")中给出的分派。server.send("newfile".getBytes() ),( ZMQ.SNDMORE) server.send(filepath.getBytes(),0) } //多线程服务器:路由器通过inproc队列val NB_WORKERS =1 val workers =List.fill(NB_WORKERS)(新线程(新ServerWorker(上下文)ZMQ.proxy(服务器、后端)向经销商工作人员发出请求。)}类ServerWorker(ctx: ZMQ.Context)扩展Runnable { override (){ServerWorker= ctx.socket(ZMQ.DEALER) worker.connect("inproc: // backend"),而(true) { val = ZMsg.recvMsg( worker ) zmsg.pop / drop内队列信封(?)val = zmsg.pop //cmd用于继续/停止cmd.toString匹配{ case "get“=> val file = zmsg.pop.toString println(s"clientReq: cmd:$cmd,file:$ file ") //1-蛮力:忽略cmd并一次性发送完整文件!io.Source.fromFile(file).mkString("").getBytes (“eof”.getBytes,ZMQ.SNDMORE) //header表示这是最后一块val字节=.getBytes//脏读,仅用于测试!Worker.send(字节,0) println(s"${bytes.size} $file:“+新字符串(字节))大小写x => println("cmd "+x+”未实现!)}
  • 客户端 对象FileHandler扩展App{ val context = ZMQ.context(1) // client收到新文件的通知,然后从服务器val client = context.socket(ZMQ.DEALER) client.connect("tcp://*:5565")获取文件(“tcp://*:5565”)ZMQ.Poller=新ZMQ.Poller(1) //“轮询”响应poller.register(客户端),( ZMQ.Poller.POLLIN) { poller.poll val zmsg = ZMsg.recvMsg(client) val cmd = zmsg.pop val data = zmsg.pop // header是命令/动作cmd.toString match { case "newfile“=> startDownload(data.toString)//消息内容是获取大小写的文件名-- => gotChunk(data.toString,zmsg.pop.getData) // filename,块大小写"eof”=> endDownload(data.toString ),// zmsg.pop.getData) //filename,最后一批} def startDownload(文件名:字符串){println(“获取通知:开始下载”+文件名) client.send("get".getBytes,ZMQ.SNDMORE) //命令头client.send(filename.getBytes,0) } def gotChunk(文件名: String,字节: ArrayByte) {println(“+filename+:+新字符串(字节)”)//回调这里的用户client.send(“下一步”.getBytes,ZMQ.SNDMORE) client.send(filename.getBytes,0) } def endDownload(文件名: String,字节: ArrayByte) {println(为"+filename+":“+新字符串(字节)”)//回调用户}}
EN

Stack Overflow用户

回答已采纳

发布于 2013-12-02 22:20:16

在客户机上,您不需要PULLDEALER。经销商是PUSHPULL的结合,所以只使用经销商,您的代码将更简单。

服务器也是如此,除非您正在做一些特殊的事情,否则您不需要PUSHROUTER,路由器是双向的。

服务器工作人员获得请求,并将响应写回inproc队列,但是响应似乎永远不会离开服务器(在wireshark中看不到),客户端被卡在poller.poll上等待响应。

代码问题

在服务器中,您在启动代理之前使用args.toList.foreach发送文件,这可能就是为什么没有任何东西离开服务器。首先启动代理,然后使用它;而且,一旦调用ZMQProxy(..),代码就会无限期地阻塞,因此您需要一个单独的线程来发送文件。

客户端可能与计票人有问题。轮询的典型模式是:

代码语言:javascript
复制
ZMQ.Poller items = new ZMQ.Poller (1);
items.register(receiver, ZMQ.Poller.POLLIN);
while (true) {

  items.poll(TIMEOUT);
  if (items.pollin(0)) {
    message = receiver.recv(0);

在上面的代码中,1)轮询直到超时,2)然后检查消息,如果可用,3)使用receiver.recv(0)。但是在您的代码中,您可以在不检查的情况下投票,然后进入recv()。在调用recv()之前,您需要检查轮询器是否有轮询套接字的消息,否则,如果没有消息,接收方将挂起。

票数 3
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/20335810

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档