MCP TypeScript SDK添加福利群:解决AI开发者的「MCP实战痛点」模型上下文协议(Model Context Protocol, MCP)允许应用程序以标准化的方式为大型语言模型(LLM)提供上下文,将提供上下文与实际 LLM 交互的关注点分离。这个 TypeScript SDK 实现了完整的 MCP 规范,使得以下操作变得简单:
npm install @modelcontextprotocol/sdk
让我们创建一个简单的 MCP 服务器,暴露一个计算器工具和一些数据:
import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
// 创建一个 MCP 服务器
const server = new McpServer({
name: "Demo",
version: "1.0.0"
});
// 添加一个加法工具
server.tool("add",
{ a: z.number(), b: z.number() },
async ({ a, b }) => ({
content: [{ type: "text", text: String(a + b) }]
})
);
// 添加一个动态问候资源
server.resource(
"greeting",
new ResourceTemplate("greeting://{name}", { list: undefined }),
async (uri, { name }) => ({
contents: [{
uri: uri.href,
text: `Hello, ${name}!`
}]
})
);
// 开始在 stdin 上接收消息并在 stdout 上发送消息
const transport = new StdioServerTransport();
await server.connect(transport);

模型上下文协议 (MCP) 让你能够以安全、标准化的方式构建服务器,向 LLM 应用程序暴露数据和功能。你可以把它想象成一个专门为 LLM 交互设计的 Web API。MCP 服务器可以:
McpServer 是你与 MCP 协议的核心接口。它处理连接管理、协议合规性和消息路由:
const server = new McpServer({
name: "My App",
version: "1.0.0"
});

资源是你向 LLM 暴露数据的方式。它们类似于 REST API 中的 GET 端点——它们提供数据,但不应该执行大量计算或产生副作用:
// 静态资源
server.resource(
"config",
"config://app",
async (uri) => ({
contents: [{
uri: uri.href,
text: "App configuration here"
}]
})
);
// 带参数的动态资源
server.resource(
"user-profile",
new ResourceTemplate("users://{userId}/profile", { list: undefined }),
async (uri, { userId }) => ({
contents: [{
uri: uri.href,
text: `Profile data for user ${userId}`
}]
})
);

工具让 LLM 通过你的服务器执行操作。与资源不同,工具期望执行计算并产生副作用:
// 带参数的简单工具
server.tool(
"calculate-bmi",
{
weightKg: z.number(),
heightM: z.number()
},
async ({ weightKg, heightM }) => ({
content: [{
type: "text",
text: String(weightKg / (heightM * heightM))
}]
})
);
// 带有外部 API 调用的异步工具
server.tool(
"fetch-weather",
{ city: z.string() },
async ({ city }) => {
const response = await fetch(`https://api.weather.com/${city}`);
const data = await response.text();
return {
content: [{ type: "text", text: data }]
};
}
);

提示是可重用的模板,帮助 LLM 有效地与你的服务器交互:
server.prompt(
"review-code",
{ code: z.string() },
({ code }) => ({
messages: [{
role: "user",
content: {
type: "text",
text: `Please review this code:\n\n${code}`
}
}]
})
);

TypeScript 中的 MCP 服务器需要连接到传输层以与客户端通信。你如何启动服务器取决于传输方式的选择:
对于命令行工具和直接集成:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
const server = new McpServer({
name: "example-server",
version: "1.0.0"
});
// ... 设置服务器资源、工具和提示 ...
const transport = new StdioServerTransport();
await server.connect(transport);

对于远程服务器,设置一个可流式 HTTP 传输层,处理客户端请求和服务器到客户端的通知。
在某些情况下,服务器需要保持状态。这通过会话管理实现。
import express from "express";
import { randomUUID } from "node:crypto";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"
const app = express();
app.use(express.json());
// 按会话 ID 存储传输层的映射
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};
// 处理客户端到服务器通信的 POST 请求
app.post('/mcp', async (req, res) => {
// 检查现有会话 ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
let transport: StreamableHTTPServerTransport;
if (sessionId && transports[sessionId]) {
// 重用现有传输层
transport = transports[sessionId];
} else if (!sessionId && isInitializeRequest(req.body)) {
// 新的初始化请求
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sessionId) => {
// 按会话 ID 存储传输层
transports[sessionId] = transport;
}
});
// 传输层关闭时清理
transport.onclose = () => {
if (transport.sessionId) {
delete transports[transport.sessionId];
}
};
const server = new McpServer({
name: "example-server",
version: "1.0.0"
});
// ... 设置服务器资源、工具和提示 ...
// 连接到 MCP 服务器
await server.connect(transport);
} else {
// 无效请求
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided',
},
id: null,
});
return;
}
// 处理请求
await transport.handleRequest(req, res, req.body);
});
// 处理 GET 和 DELETE 请求的通用处理程序
const handleSessionRequest = async (req: express.Request, res: express.Response) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;
if (!sessionId || !transports[sessionId]) {
res.status(400).send('Invalid or missing session ID');
return;
}
const transport = transports[sessionId];
await transport.handleRequest(req, res);
};
// 处理通过 SSE 进行服务器到客户端通知的 GET 请求
app.get('/mcp', handleSessionRequest);
// 处理会话终止的 DELETE 请求
app.delete('/mcp', handleSessionRequest);
app.listen(3000);

对于不需要会话管理的简单用例:
const app = express();
app.use(express.json());
app.post('/mcp', async (req: Request, res: Response) => {
// 在无状态模式下,为每个请求创建新的传输层和服务器实例
// 以确保完全隔离。单个实例会导致多个客户端并发连接时请求 ID 冲突。
try {
const server = getServer();
const transport: StreamableHTTPServerTransport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
});
res.on('close', () => {
console.log('Request closed');
transport.close();
server.close();
});
await server.connect(transport);
await transport.handleRequest(req, res, req.body);
} catch (error) {
console.error('Error handling MCP request:', error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error',
},
id: null,
});
}
}
});
app.get('/mcp', async (req: Request, res: Response) => {
console.log('Received GET MCP request');
res.writeHead(405).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Method not allowed."
},
id: null
}));
});
app.delete('/mcp', async (req: Request, res: Response) => {
console.log('Received DELETE MCP request');
res.writeHead(405).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Method not allowed."
},
id: null
}));
});
// 启动服务器
const PORT = 3000;
app.listen(PORT, () => {
console.log(`MCP Stateless Streamable HTTP Server listening on port ${PORT}`);
});

这种无状态方法适用于:
要测试你的服务器,可以使用 MCP Inspector。请参阅其 README 获取更多信息。
一个简单的服务器,展示资源、工具和提示:
import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "Echo",
version: "1.0.0"
});
server.resource(
"echo",
new ResourceTemplate("echo://{message}", { list: undefined }),
async (uri, { message }) => ({
contents: [{
uri: uri.href,
text: `Resource echo: ${message}`
}]
})
);
server.tool(
"echo",
{ message: z.string() },
async ({ message }) => ({
content: [{ type: "text", text: `Tool echo: ${message}` }]
})
);
server.prompt(
"echo",
{ message: z.string() },
({ message }) => ({
messages: [{
role: "user",
content: {
type: "text",
text: `Please process this message: ${message}`
}
}]
})
);

一个更复杂的示例,展示数据库集成:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import sqlite3 from "sqlite3";
import { promisify } from "util";
import { z } from "zod";
const server = new McpServer({
name: "SQLite Explorer",
version: "1.0.0"
});
// 创建数据库连接的辅助函数
const getDb = () => {
const db = new sqlite3.Database("database.db");
return {
all: promisify<string, any[]>(db.all.bind(db)),
close: promisify(db.close.bind(db))
};
};
server.resource(
"schema",
"schema://main",
async (uri) => {
const db = getDb();
try {
const tables = await db.all(
"SELECT sql FROM sqlite_master WHERE type='table'"
);
return {
contents: [{
uri: uri.href,
text: tables.map((t: {sql: string}) => t.sql).join("\n")
}]
};
} finally {
await db.close();
}
}
);
server.tool(
"query",
{ sql: z.string() },
async ({ sql }) => {
const db = getDb();
try {
const results = await db.all(sql);
return {
content: [{
type: "text",
text: JSON.stringify(results, null, 2)
}]
};
} catch (err: unknown) {
const error = err as Error;
return {
content: [{
type: "text",
text: `Error: ${error.message}`
}],
isError: true
};
} finally {
await db.close();
}
}
);

如果你想提供初始的工具/提示/资源集,但稍后根据用户操作或外部状态更改添加更多内容,你可以在服务器连接后添加/更新/移除它们。这将自动发出相应的 listChanged 通知:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "Dynamic Example",
version: "1.0.0"
});
const listMessageTool = server.tool(
"listMessages",
{ channel: z.string() },
async ({ channel }) => ({
content: [{ type: "text", text: await listMessages(channel) }]
})
);
const putMessageTool = server.tool(
"putMessage",
{ channel: z.string(), message: z.string() },
async ({ channel, message }) => ({
content: [{ type: "text", text: await putMessage(channel, string) }]
})
);
// 在我们升级认证之前,`putMessage` 是禁用的(不会出现在工具列表中)
putMessageTool.disable()
const upgradeAuthTool = server.tool(
"upgradeAuth",
{ permission: z.enum(["write', admin"])},
// 这里的任何更改都会自动发出 `listChanged` 通知
async ({ permission }) => {
const { ok, err, previous } = await upgradeAuthAndStoreToken(permission)
if (!ok) return {content: [{ type: "text", text: `Error: ${err}` }]}
// 如果我们之前只有只读权限,现在 `putMessage` 可用了
if (previous === "read") {
putMessageTool.enable()
}
if (permission === 'write') {
// 如果我们刚刚升级到 'write' 权限,我们仍然可以调用 'upgradeAuth'
// 但只能升级到 'admin'。
upgradeAuthTool.update({
paramSchema: { permission: z.enum(["admin"]) }, // 更改验证规则
})
} else {
// 如果我们是管理员,我们就没有升级的空间了,所以完全移除该工具
upgradeAuthTool.remove()
}
}
)
// 正常连接
const transport = new StdioServerTransport();
await server.connect(transport);

为了更精细的控制,你可以直接使用低级 Server 类:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
ListPromptsRequestSchema,
GetPromptRequestSchema
} from "@modelcontextprotocol/sdk/types.js";
const server = new Server(
{
name: "example-server",
version: "1.0.0"
},
{
capabilities: {
prompts: {}
}
}
);
server.setRequestHandler(ListPromptsRequestSchema, async () => {
return {
prompts: [{
name: "example-prompt",
description: "An example prompt template",
arguments: [{
name: "arg1",
description: "Example argument",
required: true
}]
}]
};
});
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
if (request.params.name !== "example-prompt") {
throw new Error("Unknown prompt");
}
return {
description: "Example prompt",
messages: [{
role: "user",
content: {
type: "text",
text: "Example prompt text"
}
}]
};
});
const transport = new StdioServerTransport();
await server.connect(transport);

SDK提供了高级客户端接口:
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
const transport = new StdioClientTransport({
command: "node",
args: ["server.js"]
});
const client = new Client(
{
name: "example-client",
version: "1.0.0"
}
);
await client.connect(transport);
// List prompts
const prompts = await client.listPrompts();
// Get a prompt
const prompt = await client.getPrompt({
name: "example-prompt",
arguments: {
arg1: "value"
}
});
// List resources
const resources = await client.listResources();
// Read a resource
const resource = await client.readResource({
uri: "file:///example.txt"
});
// Call a tool
const result = await client.callTool({
name: "example-tool",
arguments: {
arg1: "value"
}
});

您可以代理OAUTH请求向外部授权提供商:
import express from 'express';
import { ProxyOAuthServerProvider } from '@modelcontextprotocol/sdk/server/auth/providers/proxyProvider.js';
import { mcpAuthRouter } from '@modelcontextprotocol/sdk/server/auth/router.js';
const app = express();
const proxyProvider = new ProxyOAuthServerProvider({
endpoints: {
authorizationUrl: "https://auth.external.com/oauth2/v1/authorize",
tokenUrl: "https://auth.external.com/oauth2/v1/token",
revocationUrl: "https://auth.external.com/oauth2/v1/revoke",
},
verifyAccessToken: async (token) => {
return {
token,
clientId: "123",
scopes: ["openid", "email", "profile"],
}
},
getClient: async (client_id) => {
return {
client_id,
redirect_uris: ["http://localhost:3000/callback"],
}
}
})
app.use(mcpAuthRouter({
provider: proxyProvider,
issuerUrl: new URL("http://auth.external.com"),
baseUrl: new URL("http://mcp.example.com"),
serviceDocumentationUrl: new URL("https://docs.example.com/"),
}))

此设置使您可以:
带有streamableHTTP Tranport的客户和服务器可以维护backwards compatibility使用已弃用的HTTP+SSE传输(从协议版本2024-11-05)如下
对于需要与流式HTTP和较旧的SSE服务器一起使用的客户:
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
let client: Client|undefined = undefined
const baseUrl = new URL(url);
try {
client = new Client({
name: 'streamable-http-client',
version: '1.0.0'
});
const transport = new StreamableHTTPClientTransport(
new URL(baseUrl)
);
await client.connect(transport);
console.log("Connected using Streamable HTTP transport");
} catch (error) {
// If that fails with a 4xx error, try the older SSE transport
console.log("Streamable HTTP connection failed, falling back to SSE transport");
client = new Client({
name: 'sse-client',
version: '1.0.0'
});
const sseTransport = new SSEClientTransport(baseUrl);
await client.connect(sseTransport);
console.log("Connected using SSE transport");
}

对于需要支持流式HTTP和较旧客户端的服务器:
import express from "express";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
const server = new McpServer({
name: "backwards-compatible-server",
version: "1.0.0"
});
// ... set up server resources, tools, and prompts ...
const app = express();
app.use(express.json());
// Store transports for each session type
const transports = {
streamable: {} as Record<string, StreamableHTTPServerTransport>,
sse: {} as Record<string, SSEServerTransport>
};
// Modern Streamable HTTP endpoint
app.all('/mcp', async (req, res) => {
// Handle Streamable HTTP transport for modern clients
// Implementation as shown in the "With Session Management" example
// ...
});
// Legacy SSE endpoint for older clients
app.get('/sse', async (req, res) => {
// Create SSE transport for legacy clients
const transport = new SSEServerTransport('/messages', res);
transports.sse[transport.sessionId] = transport;
res.on("close", () => {
delete transports.sse[transport.sessionId];
});
await server.connect(transport);
});
// Legacy message endpoint for older clients
app.post('/messages', async (req, res) => {
const sessionId = req.query.sessionId as string;
const transport = transports.sse[sessionId];
if (transport) {
await transport.handlePostMessage(req, res, req.body);
} else {
res.status(400).send('No transport found for sessionId');
}
});
app.listen(3000);

注意:SSE运输现在被弃用以支持流式http。新实施应使用流式HTTP,现有的SSE实现应计划迁移。
欢迎在https://github.com/modelcontextprotocol/typescript-sdk的GitHub上的问题和拉力请求。
该项目已根据MIT许可获得许可,请参阅LICENSE文件以获取详细信息。