首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >使用fork传递和访问子进程中的请求响应对象

使用fork传递和访问子进程中的请求响应对象
EN

Stack Overflow用户
提问于 2016-03-01 15:06:19
回答 3查看 4.6K关注 0票数 4

我想知道如何使用fork将请求响应对象从父进程传递到子进程。我所做的是

代码语言:javascript
复制
var child = cp.fork(__dirname + '/childProcess.js', req, res);
child.on('message', function(m) 
{
    console.log("child process returned data " + m);
});
child.send("hello");

childProcess.js

代码语言:javascript
复制
var req = process.argv[2];
var res = process.argv[3];
process.on('message', (msg) => 
{
    console.log("req object :-" + req );
    console.log("res object :-" + res);
}
process.send("callparent");

这使我在子进程中没有定义。我也试过child.send("hello",req.socket )。但是我不能访问子进程中请求方法。它表现为环形结构。

EN

回答 3

Stack Overflow用户

发布于 2016-06-07 01:41:14

其他答案是告诉您将套接字发送给子进程,并让子进程监听和处理请求,这并不是我们要问的问题(如何将请求/资源发送到子进程)。如果您的目标是将套接字传递给孩子,并让孩子监听和处理请求,那么这些答案是可以接受的。如果您的目标是监听和预处理主进程中的请求,并将这些请求传递给子进程进行进一步处理,请参见以下内容:

简而言之,您必须实际显式地将请求和响应发送给子进程(它们不会神奇地出现在子进程中)。但这其中也存在一些挑战。

在我们的应用程序中,我们遇到了类似的问题。我们的主请求处理器检查请求,看看哪个子进程应该处理每个请求,然后将请求发送给子进程(通过child.send())。

在父对象和子对象之间传递请求/响应的问题是,您必须能够序列化使用send来回发送的对象,特别是请求对象不能被序列化(它充满了循环引用,具有对许多其他对象的引用,等等)。

因此,我们要做的是从子进程需要的请求中提取数据的子集,并通过child.send()将“轻量级请求”对象发送给子进程,同时在父进程的队列中保留对原始请求的引用。当子进程完成处理时,它通过parent.send()将其响应数据发送回父进程,父进程将该响应数据与挂起队列中的原始请求进行匹配,并使用响应数据满足请求。

这种方法的优点(与仅在父/子之间创建自定义协议相反)是,子请求处理代码仍然对请求/响应对象(只是它们的轻量级版本)进行操作。在我们的例子中,这意味着我们可以使用相同的请求处理代码,无论我们是在子进程中调用它,还是让进程直接侦听和处理请求。

例如,下面是如何创建一个轻量级的、可序列化的请求对象并将其发送给子流程:

代码语言:javascript
复制
var requestData = 
{
    httpVersion: request.httpVersion,
    httpVersionMajor: request.httpVersionMajor,
    httpVersionMinor: request.httpVersionMinor,
    method: request.method,
    url: request.url,
    headers: request.headers,
    body: request.body
}
child.send(requestData);

您也可以只使用lodash pick之类的东西来获取您想要的字段。

这个解决方案的棘手之处在于,如果您需要在请求/响应时调用任何方法,则必须在父对象中调用(其中实际对象是-所有子对象都是选定的数据)。

票数 1
EN

Stack Overflow用户

发布于 2018-05-31 17:07:12

下面的例子可能会更好地说明这一点。这有一个父进程和一个子进程,父进程创建一个服务器,在客户端连接时,套接字句柄被传递给子进程。

子进程只有一个消息循环,在该循环中它接收句柄,并且能够通过该消息循环到达客户端。

$ cat foo.js

代码语言:javascript
复制
const cp = require('child_process')
const n = require('net')

if (process.argv[2] === 'child') {
  process.on('message', (m, h) =>  {
    h.end('ok')
  })
}
else {
  const c = cp.fork(process.argv[1], ['child'])
  const s = n.createServer();
  s.on('connection', (h) => {
    c.send({}, h)
  }).listen(12000, () => {
  const m = n.connect(12000)
  m.on('data', (d) => {
    console.log(d.toString())
  })
 })
}

在握手过程中使用相关部分。

代码语言:javascript
复制
[pid 12055] sendmsg(11, {msg_name(0)=NULL, msg_iov(1)=[{"{\"cmd\":\"NODE_HANDLE\",\"type\":\"net.Socket\",\"msg\":{},\"key\":\"6::::12000\"}\n", 70}], msg_controllen=24, {cmsg_len=20, cmsg_level=SOL_SOCKET, cmsg_type=SCM_RIGHTS, {14}}, msg_flags=0}, 0) = 70

[pid 12061] recvmsg(3, {msg_name(0)=NULL, msg_iov(1)=[{"{\"cmd\":\"NODE_HANDLE\",\"type\":\"net.Socket\",\"msg\":{},\"key\":\"6::::12000\"}\n", 65536}], msg_controllen=24, {cmsg_len=20, cmsg_level=SOL_SOCKET, cmsg_type=SCM_RIGHTS, {12}}, msg_flags=MSG_CMSG_CLOEXEC}, MSG_CMSG_CLOEXEC) = 70

[pid 12061] getsockname(12, {sa_family=AF_INET6, sin6_port=htons(12000), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28]) = 0

[pid 12061] getsockopt(12, SOL_SOCKET, SO_TYPE, [1], [4]) = 0

[pid 12061] write(3, "{\"cmd\":\"NODE_HANDLE_ACK\"}\n", 26) = 26

[pid 12055] <... epoll_wait resumed> {{EPOLLIN, {u32=11, u64=11}}}, 1024, -1) = 1

[pid 12055] recvmsg(11, {msg_name(0)=NULL, msg_iov(1)=[{"{\"cmd\":\"NODE_HANDLE_ACK\"}\n", 65536}], msg_controllen=0, msg_flags=MSG_CMSG_CLOEXEC}, MSG_CMSG_CLOEXEC) = 26

如您所见,父进程和子进程参与了一个协议,通过该协议传输套接字句柄,子进程根据收到的句柄信息重新创建一个net.Socket对象。随后,子对象能够处理此连接下的客户端。

该协议以及跨进程传输句柄和工作负载的方式是cluster模块的核心和灵魂。

票数 0
EN

Stack Overflow用户

发布于 2021-04-06 02:05:19

您不能(至少在Node.js <= 15.13.0中不能)将HTTP请求或响应对象传递给子进程。因此,我想出了一个使用IPC的解决方案,我将在这篇文章中分享。

首先创建一个包含以下内容的package.json文件:

代码语言:javascript
复制
{
  "type": "module"
}

然后创建包含以下内容的文件server.js

代码语言:javascript
复制
import * as http from 'http'
import * as child_process from 'child_process'
import * as net from 'net'
import * as fs from 'fs'
import * as crypto from 'crypto'

const ipcPrefix = (process.platform != 'win32' ? '/tmp/' : '\\\\.\\pipe\\') 
                + crypto.randomBytes(8).toString('hex')
const subprocess = child_process.fork('subprocess.js', {
  stdio: ['pipe', 'pipe', 'inherit', 'ipc']
})
let requestNumber = 0

const server = http.createServer()
.listen(8080)
.on('request', (request, response) => {
  if (!subprocess.connected) {
    console.error('Subprocess not connected for: '+request.url)
    response.statusCode = 500
    response.end()
    return
  }
  const {headers, url, method} = request
  const ipcPath = ipcPrefix + requestNumber++ // a unique IPC path
  try {fs.unlinkSync(ipcPath)} catch{} // delete the socket file if it exist already
  let headerChunks = [], headerSize, sizeGotten = 0
  net.createServer() // the IPC server
  .listen(ipcPath, () => subprocess.send({cmd: 'req', headers, url, method, ipcPath}))
  .on('connection', socket => {
    socket.on('close', () => {
      response.end()
      fs.unlinkSync(ipcPath) // clean it up
    })
    socket.once('data', async chunk => {
      headerSize = chunk.readUInt32LE()
      headerChunks.push(chunk)
      sizeGotten += chunk.byteLength
      while (sizeGotten < headerSize) {
        let timer
        const timedOut = await Promise.race([
          // race next packet or timeout timer
          new Promise(resolve => {
            socket.once('data', chunk => {
              headerChunks.push(chunk)
              sizeGotten += chunk.byteLength
              resolve(false)
            })
          }),
          new Promise(resolve => {timer = setTimeout(resolve, 2000, true)})
        ])
        clearTimeout(timer) // to not leave it hanging
        if (timedOut) {
          response.statusCode = 500
          // response.write('lol, bye') // optional error page
          socket.destroy() // this causes the close event which ends the response
          console.error('Subprocess response timed out...')
          return // break the loop and exit function
        }
      }
      const data = Buffer.concat(headerChunks)
      const headerData = data.slice(4, 4+headerSize)
      const header = JSON.parse(headerData.toString())
      response.writeHead(header.statusCode, header.headers)
      if (data.byteLength - 4 - headerSize > 0) {
        response.write(data.slice(4 + headerSize))
      }
      socket.pipe(response) // just pipe it through
      //socket.on('data', response.write.bind(response)) // or do this
      // but set a timeout on the socket to close it if inactive
      socket.setTimeout(2000)
      socket.on('timeout', () => {
        socket.destroy()
        console.log('Subprocess response timed out...')
      })
    })
  })
})

然后创建文件subprocess.js。包含以下内容:

代码语言:javascript
复制
import * as net from 'net'

process.on('message', msg => {
  switch (msg.cmd) {
    case 'req': {
      const socket = net.createConnection({path: msg.ipcPath})
      // depending on {headers, url, method} in msg do your thing below
      writeHead(socket, 200, {'testHeader': 'something'})
      socket.end('hello world '+msg.url)
    } break
  }
})

function writeHead(socket, statusCode, headers) {
  const headerBuffer = new TextEncoder().encode(JSON.stringify({statusCode, headers}))
  const headerSize = Buffer.allocUnsafe(4)
  headerSize.writeUInt32LE(headerBuffer.byteLength)
  socket.write(headerSize)
  socket.write(headerBuffer)
}

现在运行服务器node server并在浏览器中转到localhost:8080查看它的运行情况。

如果你喜欢我的答案,请随时在GitHub上赞助我。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35716675

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档