我想知道如何使用fork将请求响应对象从父进程传递到子进程。我所做的是
var child = cp.fork(__dirname + '/childProcess.js', req, res);
child.on('message', function(m)
{
console.log("child process returned data " + m);
});
child.send("hello");
childProcess.js
var req = process.argv[2];
var res = process.argv[3];
process.on('message', (msg) =>
{
console.log("req object :-" + req );
console.log("res object :-" + res);
}
process.send("callparent");
这使我在子进程中没有定义。我也试过child.send("hello",req.socket )。但是我不能访问子进程中请求方法。它表现为环形结构。
发布于 2016-06-07 01:41:14
其他答案是告诉您将套接字发送给子进程,并让子进程监听和处理请求,这并不是我们要问的问题(如何将请求/资源发送到子进程)。如果您的目标是将套接字传递给孩子,并让孩子监听和处理请求,那么这些答案是可以接受的。如果您的目标是监听和预处理主进程中的请求,并将这些请求传递给子进程进行进一步处理,请参见以下内容:
简而言之,您必须实际显式地将请求和响应发送给子进程(它们不会神奇地出现在子进程中)。但这其中也存在一些挑战。
在我们的应用程序中,我们遇到了类似的问题。我们的主请求处理器检查请求,看看哪个子进程应该处理每个请求,然后将请求发送给子进程(通过child.send()
)。
在父对象和子对象之间传递请求/响应的问题是,您必须能够序列化使用send
来回发送的对象,特别是请求对象不能被序列化(它充满了循环引用,具有对许多其他对象的引用,等等)。
因此,我们要做的是从子进程需要的请求中提取数据的子集,并通过child.send()
将“轻量级请求”对象发送给子进程,同时在父进程的队列中保留对原始请求的引用。当子进程完成处理时,它通过parent.send()
将其响应数据发送回父进程,父进程将该响应数据与挂起队列中的原始请求进行匹配,并使用响应数据满足请求。
这种方法的优点(与仅在父/子之间创建自定义协议相反)是,子请求处理代码仍然对请求/响应对象(只是它们的轻量级版本)进行操作。在我们的例子中,这意味着我们可以使用相同的请求处理代码,无论我们是在子进程中调用它,还是让进程直接侦听和处理请求。
例如,下面是如何创建一个轻量级的、可序列化的请求对象并将其发送给子流程:
var requestData =
{
httpVersion: request.httpVersion,
httpVersionMajor: request.httpVersionMajor,
httpVersionMinor: request.httpVersionMinor,
method: request.method,
url: request.url,
headers: request.headers,
body: request.body
}
child.send(requestData);
您也可以只使用lodash pick之类的东西来获取您想要的字段。
这个解决方案的棘手之处在于,如果您需要在请求/响应时调用任何方法,则必须在父对象中调用(其中实际对象是-所有子对象都是选定的数据)。
发布于 2018-05-31 17:07:12
下面的例子可能会更好地说明这一点。这有一个父进程和一个子进程,父进程创建一个服务器,在客户端连接时,套接字句柄被传递给子进程。
子进程只有一个消息循环,在该循环中它接收句柄,并且能够通过该消息循环到达客户端。
$ cat foo.js
const cp = require('child_process')
const n = require('net')
if (process.argv[2] === 'child') {
process.on('message', (m, h) => {
h.end('ok')
})
}
else {
const c = cp.fork(process.argv[1], ['child'])
const s = n.createServer();
s.on('connection', (h) => {
c.send({}, h)
}).listen(12000, () => {
const m = n.connect(12000)
m.on('data', (d) => {
console.log(d.toString())
})
})
}
在握手过程中使用相关部分。
[pid 12055] sendmsg(11, {msg_name(0)=NULL, msg_iov(1)=[{"{\"cmd\":\"NODE_HANDLE\",\"type\":\"net.Socket\",\"msg\":{},\"key\":\"6::::12000\"}\n", 70}], msg_controllen=24, {cmsg_len=20, cmsg_level=SOL_SOCKET, cmsg_type=SCM_RIGHTS, {14}}, msg_flags=0}, 0) = 70
[pid 12061] recvmsg(3, {msg_name(0)=NULL, msg_iov(1)=[{"{\"cmd\":\"NODE_HANDLE\",\"type\":\"net.Socket\",\"msg\":{},\"key\":\"6::::12000\"}\n", 65536}], msg_controllen=24, {cmsg_len=20, cmsg_level=SOL_SOCKET, cmsg_type=SCM_RIGHTS, {12}}, msg_flags=MSG_CMSG_CLOEXEC}, MSG_CMSG_CLOEXEC) = 70
[pid 12061] getsockname(12, {sa_family=AF_INET6, sin6_port=htons(12000), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28]) = 0
[pid 12061] getsockopt(12, SOL_SOCKET, SO_TYPE, [1], [4]) = 0
[pid 12061] write(3, "{\"cmd\":\"NODE_HANDLE_ACK\"}\n", 26) = 26
[pid 12055] <... epoll_wait resumed> {{EPOLLIN, {u32=11, u64=11}}}, 1024, -1) = 1
[pid 12055] recvmsg(11, {msg_name(0)=NULL, msg_iov(1)=[{"{\"cmd\":\"NODE_HANDLE_ACK\"}\n", 65536}], msg_controllen=0, msg_flags=MSG_CMSG_CLOEXEC}, MSG_CMSG_CLOEXEC) = 26
如您所见,父进程和子进程参与了一个协议,通过该协议传输套接字句柄,子进程根据收到的句柄信息重新创建一个net.Socket对象。随后,子对象能够处理此连接下的客户端。
该协议以及跨进程传输句柄和工作负载的方式是cluster
模块的核心和灵魂。
发布于 2021-04-06 02:05:19
您不能(至少在Node.js <= 15.13.0中不能)将HTTP请求或响应对象传递给子进程。因此,我想出了一个使用IPC的解决方案,我将在这篇文章中分享。
首先创建一个包含以下内容的package.json
文件:
{
"type": "module"
}
然后创建包含以下内容的文件server.js
:
import * as http from 'http'
import * as child_process from 'child_process'
import * as net from 'net'
import * as fs from 'fs'
import * as crypto from 'crypto'
const ipcPrefix = (process.platform != 'win32' ? '/tmp/' : '\\\\.\\pipe\\')
+ crypto.randomBytes(8).toString('hex')
const subprocess = child_process.fork('subprocess.js', {
stdio: ['pipe', 'pipe', 'inherit', 'ipc']
})
let requestNumber = 0
const server = http.createServer()
.listen(8080)
.on('request', (request, response) => {
if (!subprocess.connected) {
console.error('Subprocess not connected for: '+request.url)
response.statusCode = 500
response.end()
return
}
const {headers, url, method} = request
const ipcPath = ipcPrefix + requestNumber++ // a unique IPC path
try {fs.unlinkSync(ipcPath)} catch{} // delete the socket file if it exist already
let headerChunks = [], headerSize, sizeGotten = 0
net.createServer() // the IPC server
.listen(ipcPath, () => subprocess.send({cmd: 'req', headers, url, method, ipcPath}))
.on('connection', socket => {
socket.on('close', () => {
response.end()
fs.unlinkSync(ipcPath) // clean it up
})
socket.once('data', async chunk => {
headerSize = chunk.readUInt32LE()
headerChunks.push(chunk)
sizeGotten += chunk.byteLength
while (sizeGotten < headerSize) {
let timer
const timedOut = await Promise.race([
// race next packet or timeout timer
new Promise(resolve => {
socket.once('data', chunk => {
headerChunks.push(chunk)
sizeGotten += chunk.byteLength
resolve(false)
})
}),
new Promise(resolve => {timer = setTimeout(resolve, 2000, true)})
])
clearTimeout(timer) // to not leave it hanging
if (timedOut) {
response.statusCode = 500
// response.write('lol, bye') // optional error page
socket.destroy() // this causes the close event which ends the response
console.error('Subprocess response timed out...')
return // break the loop and exit function
}
}
const data = Buffer.concat(headerChunks)
const headerData = data.slice(4, 4+headerSize)
const header = JSON.parse(headerData.toString())
response.writeHead(header.statusCode, header.headers)
if (data.byteLength - 4 - headerSize > 0) {
response.write(data.slice(4 + headerSize))
}
socket.pipe(response) // just pipe it through
//socket.on('data', response.write.bind(response)) // or do this
// but set a timeout on the socket to close it if inactive
socket.setTimeout(2000)
socket.on('timeout', () => {
socket.destroy()
console.log('Subprocess response timed out...')
})
})
})
})
然后创建文件subprocess.js
。包含以下内容:
import * as net from 'net'
process.on('message', msg => {
switch (msg.cmd) {
case 'req': {
const socket = net.createConnection({path: msg.ipcPath})
// depending on {headers, url, method} in msg do your thing below
writeHead(socket, 200, {'testHeader': 'something'})
socket.end('hello world '+msg.url)
} break
}
})
function writeHead(socket, statusCode, headers) {
const headerBuffer = new TextEncoder().encode(JSON.stringify({statusCode, headers}))
const headerSize = Buffer.allocUnsafe(4)
headerSize.writeUInt32LE(headerBuffer.byteLength)
socket.write(headerSize)
socket.write(headerBuffer)
}
现在运行服务器node server
并在浏览器中转到localhost:8080
查看它的运行情况。
如果你喜欢我的答案,请随时在GitHub上赞助我。
https://stackoverflow.com/questions/35716675
复制相似问题