首页
学习
活动
专区
圈层
工具
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

基于node.js实现MCP Server

今天,我们要深入一个引人入胜的网络协议世界,具体来说,就是使用我们值得信赖的好朋友Node.js来构建一个MCP (模型上下文协议 - Model Context Protocol)服务器。暂时忘掉那些标准的 HTTP 吧;有时候,你需要一些更定制化、更高效,或者仅仅是……与众不同的东西!

MCP 是啥玩意儿?我为啥要在乎?

首先。MCP (Model Context Protocol)这名字不像 HTTP 或 WebSocket 那样家喻户晓。你找不到定义它的通用 RFC 文档(尽管在特定社区,比如 MUDs - 多用户地牢游戏 - 中存在变体)。

你可以把 MCP 看作是一种网络通信的概念模式,它专注于交换关于共享模型上下文的信息。想象一下:

实时协作:多个用户编辑同一个文档。MCP 可以高效地同步更改(“模型”)和用户状态(“上下文”,比如光标位置)。

物联网设备:设备上报传感器数据(“模型”的一部分)及其运行状态(“上下文”)。

游戏服务器:同步玩家位置、库存(“模型”)和动作(“上下文”)。

为啥不用 HTTP/WebSocket?

开销:HTTP(S) 带有请求头、方法等,对于频繁、小型的消息来说可能过于臃肿。

结构:WebSocket 给你的是原始的管道,而 MCP 从一开始就隐含了一种结构化的方式来讨论模型和上下文。

定制需求:有时,你需要对通信流、错误处理或消息类型进行精细控制,而标准协议无法优雅地提供这些。

我们今天的目标:我们将使用 Node.js 基于原始 TCP 套接字设计并实现一个简化版的 MCP 服务器。它将处理多个客户端连接,解析结构化消息(我们用 JSON),并管理一些基本的模型/上下文信息。

准备工作:磨刀不误砍柴工!

开始编码前,请确保你已具备:

Node.js 和 npm/yarn:已安装在你的系统上。(用node -v和npm -v检查)。

基础 JavaScript/Node.js 知识:理解 async/await、模块、回调、基本数据结构。

文本编辑器/IDE:VS Code, WebStorm, Sublime Text 等。

一颗好奇心!

设计我们的简易 MCP

要构建服务器,我们首先需要定义我们自己版本的 MCP。保持简单:

传输层:原始TCP 套接字 (Sockets)。可靠的、面向连接的。

消息格式 :JSON。在 Node.js 中易于解析。

消息分隔符:每个通过套接字发送的 JSON 消息都将以换行符 (\n)结束。这一点至关重要,因为 TCP 是一个流协议,而不是消息协议。数据可能分块到达,所以我们需要一种方法来知道何时收到了一个完整的消息。

核心消息结构:

ounter(lineounter(lineounter(lineounter(lineounter(line{ "id": "unique_message_id", // 可选,用于追踪请求/响应 "command": "command_name", // 例如 'register', 'getModel', 'updateContext', 'subscribe' "payload": { ... } // 特定于命令的数据}

示例命令:

register: 客户端自我介绍。Payload:{ "clientId": "user_abc" }

getModel: 客户端请求数据。Payload:{ "modelName": "document_xyz" }

updateContext: 客户端发送其状态。Payload:{ "cursorPosition": 123 }

broadcast: 服务器(或客户端)向其他人发送消息。Payload:{ "message": "Hello all!" }

开建!手把手实现

好了,打开你的终端和编辑器。开始写代码!

第一步:项目设置

ounter(lineounter(lineounter(lineounter(lineounter(linemkdir node-mcp-servercd node-mcp-servernpm init -y# 核心逻辑暂时不需要外部依赖touch server.js

第二步:基础 TCP 服务器结构

打开server.js,使用 Node.js 内建的net模块。

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// server.jsconst net = require('net'); // 引入 Node.js 的 net 模块

const PORT = 3000; // 为我们的服务器选择一个端口const HOST = '127.0.0.1'; // 监听本地主机

// 存储连接的客户端(后面会用到)const clients = new Map(); // 使用 Map 可以方便地存储每个 socket 的更多信息

const server = net.createServer((socket) => {   // 这个回调函数在每次有 *新* 客户端连接时运行   console.log(` 客户端连接: ${socket.remoteAddress}:${socket.remotePort}`);

  // 为此 socket 分配一个唯一的 ID,便于管理   const clientId = `${socket.remoteAddress}:${socket.remotePort}-${Date.now()}`;   clients.set(clientId, socket); // 将 socket 存入 Map   console.log(`分配 clientId: ${clientId}. 当前客户端总数: ${clients.size}`);

  // --- 数据处理 (第 3 & 4 步) ---   let buffer = ''; // 用于累积数据块的缓冲区   socket.on('data', (data) => {       // 当从 *这个* 客户端接收到数据时运行       buffer += data.toString('utf8'); // 将新数据追加到缓冲区 (确保使用 UTF-8 解码)

      // 处理缓冲区中完整的消息(以 '\n' 分隔)       let boundary = buffer.indexOf('\n');       while (boundary !== -1) {           const messageJson = buffer.substring(0, boundary); // 提取一条完整的消息           buffer = buffer.substring(boundary + 1); // 从缓冲区移除已处理的消息

          // 避免处理空字符串(例如,如果只收到一个换行符)           if (messageJson.trim().length > 0) {               console.log(`[${clientId}] 收到原始数据: ${messageJson}`);               handleMessage(socket, clientId, messageJson); // 处理这条消息           } else {               console.log(`[${clientId}] 收到并忽略了空消息或只有换行符。`);           }

          boundary = buffer.indexOf('\n'); // 检查缓冲区中是否还有完整的消息       }   });

  // --- 错误处理 ---   socket.on('error', (err) => {       console.error(`[${clientId}] Socket 错误: ${err.message}`);       // 出错时清理       clients.delete(clientId);       console.log(`客户端 ${clientId} 因错误移除. 当前客户端总数: ${clients.size}`);   });

  // --- 关闭处理 ---   socket.on('close', (hadError) => {       console.log(` 客户端 ${clientId} 断开连接 ${hadError ? '因错误' : '正常'}.`);       clients.delete(clientId); // 清理断开连接的客户端       console.log(`当前客户端总数: ${clients.size}`);       // (可选)通知其他客户端此用户已离开       // broadcast({ command: 'userLeft', payload: { clientId } }, socket);   });

  // (可选)发送欢迎消息   // sendMessage(socket, { command: 'welcome', payload: { message: '已连接到 MCP 服务器!' } });

});

// 处理解析后的消息的函数 (第 5 步)function handleMessage(socket, clientId, messageJson) {   try {       const message = JSON.parse(messageJson); // 解析 JSON       console.log(`[${clientId}] 处理命令: ${message.command}`, message.payload || {});

      // 简单的命令路由器       switch (message.command) {           case 'register':               // 如果需要,存储更具体的客户端信息               clients.set(clientId, { socket, details: message.payload }); // 更新 Map 中的值               console.log(`[${clientId}] 已注册,详情:`, message.payload);               sendMessage(socket, { command: 'register_ack', payload: { status: 'success', clientId } });               break;

          case 'getModel':               // 模拟根据 payload 获取数据               const modelName = message.payload?.modelName || 'default';               const modelData = { name: modelName, data: `模型 ${modelName} 的一些数据 - ${Date.now()}` };               // 如果消息有 id,在响应中包含它,便于客户端对应               const response = { command: 'modelData', payload: modelData };               if (message.id) {                   response.respondsTo = message.id;               }               sendMessage(socket, response);               break;

          case 'updateContext':               console.log(`[${clientId}] 更新上下文:`, message.payload);               // 这里你可以存储这个上下文,或者广播给其他人               // 示例: 广播给其他人 (不包括发送者)               broadcast({ command: 'contextUpdated', payload: { from: clientId, ...message.payload } }, socket);               // 发送确认回执               const ack = { command: 'updateContext_ack', payload: { status: 'received' } };                if (message.id) {                   ack.respondsTo = message.id;               }               sendMessage(socket, ack);               break;

           case 'broadcast':                console.log(`[${clientId}] 广播消息:`, message.payload);                // 转发给其他所有连接的客户端                broadcast({ command: 'broadcast', payload: { from: clientId, ...message.payload } }, socket); // 发送给其他人                // 可以给发送者一个确认                const broadcastAck = { command: 'broadcast_ack', payload: { status: 'sent' } };                if (message.id) {                   broadcastAck.respondsTo = message.id;                }                sendMessage(socket, broadcastAck);                break;

          default:               console.warn(`[${clientId}] 未知命令: ${message.command}`);               // 发送错误信息给客户端               const errorMsg = { command: 'error', payload: { message: `未知命令: ${message.command}` } };                if (message.id) {                   errorMsg.respondsTo = message.id;                }               sendMessage(socket, errorMsg);       }

  } catch (error) {       console.error(`[${clientId}] 解析 JSON 或处理消息失败: ${messageJson}`, error);       // 如果 socket 仍然可写,尝试发送错误回执       try {           sendMessage(socket, { command: 'error', payload: { message: '无效的消息格式' } });       } catch (sendError) {           console.error(`[${clientId}] 发送错误消息回执失败:`, sendError);       }   }}

// 发送消息的辅助函数 (第 6 步)function sendMessage(socket, messageObject) {   // 确保 socket 仍然是可写的   if (!socket.writable) {       console.warn(`尝试向一个不可写的 socket 发送消息 (${socket.remoteAddress}:${socket.remotePort})`);       return;   }   try {       const messageString = JSON.stringify(messageObject) + '\n'; // 转换为 JSON 字符串并添加换行符       socket.write(messageString);       // 调试日志:记录发送的消息内容(去掉末尾换行符以便阅读)       // console.log(`发送至 ${socket.remoteAddress}:${socket.remotePort}: ${messageString.trim()}`);   } catch (error) {      console.error(`向 ${socket.remoteAddress}:${socket.remotePort} 发送消息失败:`, error);      // 这里可能需要处理错误,比如关闭这个 socket      // socket.destroy(error);   }}

// 广播消息的辅助函数 (第 6 步)function broadcast(messageObject, senderSocket = null) {   const messageString = JSON.stringify(messageObject) + '\n'; // 准备好要广播的消息字符串   console.log(`广播消息: ${messageString.trim()}`);

  clients.forEach((clientInfo, clientId) => {       // 从 Map 中获取 socket 对象(我们可能存了 {socket, details} 或直接存 socket)       const targetSocket = clientInfo.socket || clientInfo;

      // 确保目标 socket 不是发送者自己,并且是可写的       if (targetSocket !== senderSocket && targetSocket.writable) {           try {               targetSocket.write(messageString);           } catch(error) {                console.error(`向客户端 ${clientId} 广播失败:`, error);                // 考虑在这里移除有问题的客户端                // targetSocket.destroy(); // 强制关闭                // clients.delete(clientId); // 从 Map 中移除 (注意:在 forEach 中直接删除可能有问题,最好是标记后删除)           }       }   });}

// 开始监听server.listen(PORT, HOST, () => {   console.log(` MCP 服务器正在监听 ${HOST}:${PORT}`);});

// 处理服务器本身的错误server.on('error', (err) => {   console.error('服务器错误:', err);   // 处理特定的监听错误,比如端口被占用   if (err.code === 'EADDRINUSE') {       console.error(`端口 ${PORT} 已被占用,请尝试其他端口。`);       process.exit(1); // 退出程序   }});

代码分解说明:

net.createServer:创建 TCP 服务器实例。回调函数为每个新连接执行,提供一个socket对象代表该唯一连接。

clientsMap:我们用Map来存储连接的 sockets(或者包含 socket 及其他信息的对象)。这使我们能管理它们、广播消息等。Map结构更灵活。

socket.on('data', ...):这是接收数据的核心。关键在于我们缓冲 (buffer)传入的数据 (buffer += data.toString('utf8')),因为 TCP 流不保证客户端的一次write对应服务器的一次data事件,也不保证一条消息不会被分割到多个data事件中。确保使用utf8解码!

消息分隔 (\n): 我们反复检查缓冲区是否有换行符分隔符 (buffer.indexOf('\n'))。如果找到,就提取完整的 JSON 消息,用handleMessage处理它,并从缓冲区移除。while循环处理一次性收到多条消息的情况。增加了空消息检查。

handleMessage函数:解析 JSON 字符串。成功后,用switch语句将命令分发给相应的逻辑。包含了register,getModel,updateContext,broadcast的基础示例。添加了respondsTo逻辑用于关联请求和响应。

错误/关闭处理:对健壮的服务器至关重要。我们监听每个 socket 的error和close事件,记录问题并清理clientsMap,防止内存泄漏。

sendMessage辅助函数:封装了向特定客户端发送消息的操作。它将 JS 对象字符串化,并附加关键的\n分隔符。增加了writable检查。

broadcast辅助函数:遍历clientsMap,向所有连接的客户端发送消息,除了原始发送者(对聊天类功能或通知很有用)。包含了基本的 socket 可写性检查。改进了从 Map 获取 socket 的逻辑。

server.listen:启动服务器。

服务器错误处理:捕获与服务器自身相关的错误(例如端口已被占用)。

运行与测试

运行服务器:

ounter(linenode server.js

你应该看到: ` MCP 服务器正在监听 127.0.0.1:3000`

使用telnet或nc(Netcat) 测试:打开另一个终端窗口。

使用telnet:

ounter(linetelnet 127.0.0.1 3000

(某些系统可能需要先安装 telnet)。

使用 `nc` (Netcat):

ounter(linenc 127.0.0.1 3000

连接成功后,你不会立刻看到太多东西。现在,**精确地输入** 下面的 JSON 字符串,**每行输完后按回车键** (回车发送 `\n` 分隔符):

ounter(line{"id": "msg1", "command": "register", "payload": {"clientId": "test_user_1"}}

_服务器日志应显示注册信息。你的 telnet/nc 窗口可能会收到 `register_ack` 回执。_

ounter(line{"id": "msg2", "command": "getModel", "payload": {"modelName": "player_stats"}}

_服务器日志应显示处理中,你应该会收到包含 `modelData` 和 `respondsTo: "msg2"` 的响应。_

ounter(line{"id": "msg3", "command": "updateContext", "payload": {"status": "active", "location": "lobby"}}

_服务器日志记录上下文更新。你会收到一个 ack。如果有其他客户端连接,它们会收到 `contextUpdated` 消息。_

ounter(line{"id": "msg4", "command": "broadcast", "payload": {"message": "来自 Telnet 的问候!"}}

_服务器日志记录广播。你会收到一个 ack。其他连接的客户端会收到广播消息。_

ounter(line{"command": "invalidCommand"}

_服务器应记录 "未知命令" 并发回错误消息。_

ounter(line这不是JSON

超越基础:功能增强

我们简单的服务器能跑了,但对于真实世界的应用,还需要考虑:

健壮的错误处理:更具体的错误码,更好的报告机制。

安全:

认证:实现一个正式的登录命令,之后才允许执行其他操作。

加密:使用 Node.js 的tls模块替代net来创建安全的 TCP 服务器 (TCPS)。

状态管理:对于复杂的共享模型,使用合适的数据库或内存数据库(如 Redis)。

扩展性:使用 Node.js 的cluster模块或 PM2 等工具支持多核 CPU。对于大规模应用,考虑在实例间使用消息队列(如 Redis Pub/Sub, RabbitMQ, Kafka)。

协议定义:正式化你的命令集、版本控制和 payload 结构。

客户端库:为不同平台(JS, Python 等)创建库,以便轻松与你的 MCP 服务器交互。

节流/速率限制:防止滥用。

总结

刚刚使用 Node.js 和原始 TCP 套接字构建了一个能工作的(尽管简单)模型上下文协议服务器!

我们揭开了 MCP 概念的神秘面纱,设计了一个基于 JSON 和换行符分隔的基础协议,并实现了一个能处理连接、正确解析流数据、分发命令并与客户端通信的服务器。

虽然像 HTTP/WebSocket 这样的框架和标准协议自动处理了很多事情,但理解使用 TCP 套接字进行网络编程的基础知识是非常宝贵的。它赋予你在需要时构建高效、定制化通信系统的能力。

愉快!

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OXDLzBY6v5cOnRjY7vNZdjTQ0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券