首页
学习
活动
专区
圈层
工具
发布
MCP广场 >详情页
MCP TypeScript SDK2026-04-02162分享添加福利群:解决AI开发者的「MCP实战痛点」
MCP TypeScript SDK 是一个实现模型上下文协议(Model Context Protocol, MCP)的工具,旨在为大型语言模型(LLM)提供标准化的上下文支持。通过该 SDK,开发者可以轻松构建 MCP 客户端和服务器,支持资源、工具和提示的暴露,处理协议消息和生命周期事件。核心功能包括通过资源提供数据、通过工具执行操作、通过提示定义交互模式,并支持 stdio 和可流式 HTTP 等传输方式。该 SDK 适用于构建与 LLM 交互的标准化服务,简化了上下文管理与模型交互的分离。
By modelcontextprotocol
2026-04-02162
github
详情内容

MCP TypeScript SDK NPM 版本 MIT 许可证

目录

概述

模型上下文协议(Model Context Protocol, MCP)允许应用程序以标准化的方式为大型语言模型(LLM)提供上下文,将提供上下文与实际 LLM 交互的关注点分离。这个 TypeScript SDK 实现了完整的 MCP 规范,使得以下操作变得简单:

  • 构建可以连接到任何 MCP 服务器的 MCP 客户端
  • 创建暴露资源、提示和工具的 MCP 服务器
  • 使用像 stdio 和可流式 HTTP 这样的标准传输方式
  • 处理所有 MCP 协议消息和生命周期事件

安装

npm install @modelcontextprotocol/sdk

快速开始

让我们创建一个简单的 MCP 服务器,暴露一个计算器工具和一些数据:

import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";

// 创建一个 MCP 服务器
const server = new McpServer({
  name: "Demo",
  version: "1.0.0"
});

// 添加一个加法工具
server.tool("add",
  { a: z.number(), b: z.number() },
  async ({ a, b }) => ({
    content: [{ type: "text", text: String(a + b) }]
  })
);

// 添加一个动态问候资源
server.resource(
  "greeting",
  new ResourceTemplate("greeting://{name}", { list: undefined }),
  async (uri, { name }) => ({
    contents: [{
      uri: uri.href,
      text: `Hello, ${name}!`
    }]
  })
);

// 开始在 stdin 上接收消息并在 stdout 上发送消息
const transport = new StdioServerTransport();
await server.connect(transport);

什么是 MCP?

模型上下文协议 (MCP) 让你能够以安全、标准化的方式构建服务器,向 LLM 应用程序暴露数据和功能。你可以把它想象成一个专门为 LLM 交互设计的 Web API。MCP 服务器可以:

  • 通过资源暴露数据(类似于 GET 端点;它们用于将信息加载到 LLM 的上下文中)
  • 通过工具提供功能(类似于 POST 端点;它们用于执行代码或产生副作用)
  • 通过提示定义交互模式(LLM 交互的可重用模板)
  • 以及更多!

核心概念

服务器

McpServer 是你与 MCP 协议的核心接口。它处理连接管理、协议合规性和消息路由:

const server = new McpServer({
  name: "My App",
  version: "1.0.0"
});

资源

资源是你向 LLM 暴露数据的方式。它们类似于 REST API 中的 GET 端点——它们提供数据,但不应该执行大量计算或产生副作用:

// 静态资源
server.resource(
  "config",
  "config://app",
  async (uri) => ({
    contents: [{
      uri: uri.href,
      text: "App configuration here"
    }]
  })
);

// 带参数的动态资源
server.resource(
  "user-profile",
  new ResourceTemplate("users://{userId}/profile", { list: undefined }),
  async (uri, { userId }) => ({
    contents: [{
      uri: uri.href,
      text: `Profile data for user ${userId}`
    }]
  })
);

工具

工具让 LLM 通过你的服务器执行操作。与资源不同,工具期望执行计算并产生副作用:

// 带参数的简单工具
server.tool(
  "calculate-bmi",
  {
    weightKg: z.number(),
    heightM: z.number()
  },
  async ({ weightKg, heightM }) => ({
    content: [{
      type: "text",
      text: String(weightKg / (heightM * heightM))
    }]
  })
);

// 带有外部 API 调用的异步工具
server.tool(
  "fetch-weather",
  { city: z.string() },
  async ({ city }) => {
    const response = await fetch(`https://api.weather.com/${city}`);
    const data = await response.text();
    return {
      content: [{ type: "text", text: data }]
    };
  }
);

提示

提示是可重用的模板,帮助 LLM 有效地与你的服务器交互:

server.prompt(
  "review-code",
  { code: z.string() },
  ({ code }) => ({
    messages: [{
      role: "user",
      content: {
        type: "text",
        text: `Please review this code:\n\n${code}`
      }
    }]
  })
);

运行你的服务器

TypeScript 中的 MCP 服务器需要连接到传输层以与客户端通信。你如何启动服务器取决于传输方式的选择:

stdio

对于命令行工具和直接集成:

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";

const server = new McpServer({
  name: "example-server",
  version: "1.0.0"
});

// ... 设置服务器资源、工具和提示 ...

const transport = new StdioServerTransport();
await server.connect(transport);

可流式 HTTP

对于远程服务器,设置一个可流式 HTTP 传输层,处理客户端请求和服务器到客户端的通知。

带会话管理

在某些情况下,服务器需要保持状态。这通过会话管理实现。

import express from "express";
import { randomUUID } from "node:crypto";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"

const app = express();
app.use(express.json());

// 按会话 ID 存储传输层的映射
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};

// 处理客户端到服务器通信的 POST 请求
app.post('/mcp', async (req, res) => {
  // 检查现有会话 ID
  const sessionId = req.headers['mcp-session-id'] as string | undefined;
  let transport: StreamableHTTPServerTransport;

  if (sessionId && transports[sessionId]) {
    // 重用现有传输层
    transport = transports[sessionId];
  } else if (!sessionId && isInitializeRequest(req.body)) {
    // 新的初始化请求
    transport = new StreamableHTTPServerTransport({
      sessionIdGenerator: () => randomUUID(),
      onsessioninitialized: (sessionId) => {
        // 按会话 ID 存储传输层
        transports[sessionId] = transport;
      }
    });

    // 传输层关闭时清理
    transport.onclose = () => {
      if (transport.sessionId) {
        delete transports[transport.sessionId];
      }
    };
    const server = new McpServer({
      name: "example-server",
      version: "1.0.0"
    });

    // ... 设置服务器资源、工具和提示 ...

    // 连接到 MCP 服务器
    await server.connect(transport);
  } else {
    // 无效请求
    res.status(400).json({
      jsonrpc: '2.0',
      error: {
        code: -32000,
        message: 'Bad Request: No valid session ID provided',
      },
      id: null,
    });
    return;
  }

  // 处理请求
  await transport.handleRequest(req, res, req.body);
});

// 处理 GET 和 DELETE 请求的通用处理程序
const handleSessionRequest = async (req: express.Request, res: express.Response) => {
  const sessionId = req.headers['mcp-session-id'] as string | undefined;
  if (!sessionId || !transports[sessionId]) {
    res.status(400).send('Invalid or missing session ID');
    return;
  }
  
  const transport = transports[sessionId];
  await transport.handleRequest(req, res);
};

// 处理通过 SSE 进行服务器到客户端通知的 GET 请求
app.get('/mcp', handleSessionRequest);

// 处理会话终止的 DELETE 请求
app.delete('/mcp', handleSessionRequest);

app.listen(3000);

不带会话管理(无状态)

对于不需要会话管理的简单用例:

const app = express();
app.use(express.json());

app.post('/mcp', async (req: Request, res: Response) => {
  // 在无状态模式下,为每个请求创建新的传输层和服务器实例
  // 以确保完全隔离。单个实例会导致多个客户端并发连接时请求 ID 冲突。
  
  try {
    const server = getServer(); 
    const transport: StreamableHTTPServerTransport = new StreamableHTTPServerTransport({
      sessionIdGenerator: undefined,
    });
    res.on('close', () => {
      console.log('Request closed');
      transport.close();
      server.close();
    });
    await server.connect(transport);
    await transport.handleRequest(req, res, req.body);
  } catch (error) {
    console.error('Error handling MCP request:', error);
    if (!res.headersSent) {
      res.status(500).json({
        jsonrpc: '2.0',
        error: {
          code: -32603,
          message: 'Internal server error',
        },
        id: null,
      });
    }
  }
});

app.get('/mcp', async (req: Request, res: Response) => {
  console.log('Received GET MCP request');
  res.writeHead(405).end(JSON.stringify({
    jsonrpc: "2.0",
    error: {
      code: -32000,
      message: "Method not allowed."
    },
    id: null
  }));
});

app.delete('/mcp', async (req: Request, res: Response) => {
  console.log('Received DELETE MCP request');
  res.writeHead(405).end(JSON.stringify({
    jsonrpc: "2.0",
    error: {
      code: -32000,
      message: "Method not allowed."
    },
    id: null
  }));
});


// 启动服务器
const PORT = 3000;
app.listen(PORT, () => {
  console.log(`MCP Stateless Streamable HTTP Server listening on port ${PORT}`);
});

这种无状态方法适用于:

  • 简单的 API 包装器
  • 每个请求独立的 RESTful 场景
  • 没有共享会话状态的水平扩展部署

测试与调试

要测试你的服务器,可以使用 MCP Inspector。请参阅其 README 获取更多信息。

示例

回显服务器

一个简单的服务器,展示资源、工具和提示:

import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";

const server = new McpServer({
  name: "Echo",
  version: "1.0.0"
});

server.resource(
  "echo",
  new ResourceTemplate("echo://{message}", { list: undefined }),
  async (uri, { message }) => ({
    contents: [{
      uri: uri.href,
      text: `Resource echo: ${message}`
    }]
  })
);

server.tool(
  "echo",
  { message: z.string() },
  async ({ message }) => ({
    content: [{ type: "text", text: `Tool echo: ${message}` }]
  })
);

server.prompt(
  "echo",
  { message: z.string() },
  ({ message }) => ({
    messages: [{
      role: "user",
      content: {
        type: "text",
        text: `Please process this message: ${message}`
      }
    }]
  })
);

SQLite 浏览器

一个更复杂的示例,展示数据库集成:

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import sqlite3 from "sqlite3";
import { promisify } from "util";
import { z } from "zod";

const server = new McpServer({
  name: "SQLite Explorer",
  version: "1.0.0"
});

// 创建数据库连接的辅助函数
const getDb = () => {
  const db = new sqlite3.Database("database.db");
  return {
    all: promisify<string, any[]>(db.all.bind(db)),
    close: promisify(db.close.bind(db))
  };
};

server.resource(
  "schema",
  "schema://main",
  async (uri) => {
    const db = getDb();
    try {
      const tables = await db.all(
        "SELECT sql FROM sqlite_master WHERE type='table'"
      );
      return {
        contents: [{
          uri: uri.href,
          text: tables.map((t: {sql: string}) => t.sql).join("\n")
        }]
      };
    } finally {
      await db.close();
    }
  }
);

server.tool(
  "query",
  { sql: z.string() },
  async ({ sql }) => {
    const db = getDb();
    try {
      const results = await db.all(sql);
      return {
        content: [{
          type: "text",
          text: JSON.stringify(results, null, 2)
        }]
      };
    } catch (err: unknown) {
      const error = err as Error;
      return {
        content: [{
          type: "text",
          text: `Error: ${error.message}`
        }],
        isError: true
      };
    } finally {
      await db.close();
    }
  }
);

高级用法

动态服务器

如果你想提供初始的工具/提示/资源集,但稍后根据用户操作或外部状态更改添加更多内容,你可以在服务器连接后添加/更新/移除它们。这将自动发出相应的 listChanged 通知:

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";

const server = new McpServer({
  name: "Dynamic Example",
  version: "1.0.0"
});

const listMessageTool = server.tool(
  "listMessages",
  { channel: z.string() },
  async ({ channel }) => ({
    content: [{ type: "text", text: await listMessages(channel) }]
  })
);

const putMessageTool = server.tool(
  "putMessage",
  { channel: z.string(), message: z.string() },
  async ({ channel, message }) => ({
    content: [{ type: "text", text: await putMessage(channel, string) }]
  })
);
// 在我们升级认证之前,`putMessage` 是禁用的(不会出现在工具列表中)
putMessageTool.disable()

const upgradeAuthTool = server.tool(
  "upgradeAuth",
  { permission: z.enum(["write', admin"])},
  // 这里的任何更改都会自动发出 `listChanged` 通知
  async ({ permission }) => {
    const { ok, err, previous } = await upgradeAuthAndStoreToken(permission)
    if (!ok) return {content: [{ type: "text", text: `Error: ${err}` }]}

    // 如果我们之前只有只读权限,现在 `putMessage` 可用了
    if (previous === "read") {
      putMessageTool.enable()
    }

    if (permission === 'write') {
      // 如果我们刚刚升级到 'write' 权限,我们仍然可以调用 'upgradeAuth' 
      // 但只能升级到 'admin'。 
      upgradeAuthTool.update({
        paramSchema: { permission: z.enum(["admin"]) }, // 更改验证规则
      })
    } else {
      // 如果我们是管理员,我们就没有升级的空间了,所以完全移除该工具
      upgradeAuthTool.remove()
    }
  }
)

// 正常连接
const transport = new StdioServerTransport();
await server.connect(transport);

低级服务器

为了更精细的控制,你可以直接使用低级 Server 类:

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  ListPromptsRequestSchema,
  GetPromptRequestSchema
} from "@modelcontextprotocol/sdk/types.js";

const server = new Server(
  {
    name: "example-server",
    version: "1.0.0"
  },
  {
    capabilities: {
      prompts: {}
    }
  }
);

server.setRequestHandler(ListPromptsRequestSchema, async () => {
  return {
    prompts: [{
      name: "example-prompt",
      description: "An example prompt template",
      arguments: [{
        name: "arg1",
        description: "Example argument",
        required: true
      }]
    }]
  };
});

server.setRequestHandler(GetPromptRequestSchema, async (request) => {
  if (request.params.name !== "example-prompt") {
    throw new Error("Unknown prompt");
  }
  return {
    description: "Example prompt",
    messages: [{
      role: "user",
      content: {
        type: "text",
        text: "Example prompt text"
      }
    }]
  };
});

const transport = new StdioServerTransport();
await server.connect(transport);

编写MCP客户

SDK提供了高级客户端接口:

import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

const transport = new StdioClientTransport({
  command: "node",
  args: ["server.js"]
});

const client = new Client(
  {
    name: "example-client",
    version: "1.0.0"
  }
);

await client.connect(transport);

// List prompts
const prompts = await client.listPrompts();

// Get a prompt
const prompt = await client.getPrompt({
  name: "example-prompt",
  arguments: {
    arg1: "value"
  }
});

// List resources
const resources = await client.listResources();

// Read a resource
const resource = await client.readResource({
  uri: "file:///example.txt"
});

// Call a tool
const result = await client.callTool({
  name: "example-tool",
  arguments: {
    arg1: "value"
  }
});

代理授权请求上游

您可以代理OAUTH请求向外部授权提供商:

import express from 'express';
import { ProxyOAuthServerProvider } from '@modelcontextprotocol/sdk/server/auth/providers/proxyProvider.js';
import { mcpAuthRouter } from '@modelcontextprotocol/sdk/server/auth/router.js';

const app = express();

const proxyProvider = new ProxyOAuthServerProvider({
    endpoints: {
        authorizationUrl: "https://auth.external.com/oauth2/v1/authorize",
        tokenUrl: "https://auth.external.com/oauth2/v1/token",
        revocationUrl: "https://auth.external.com/oauth2/v1/revoke",
    },
    verifyAccessToken: async (token) => {
        return {
            token,
            clientId: "123",
            scopes: ["openid", "email", "profile"],
        }
    },
    getClient: async (client_id) => {
        return {
            client_id,
            redirect_uris: ["http://localhost:3000/callback"],
        }
    }
})

app.use(mcpAuthRouter({
    provider: proxyProvider,
    issuerUrl: new URL("http://auth.external.com"),
    baseUrl: new URL("http://mcp.example.com"),
    serviceDocumentationUrl: new URL("https://docs.example.com/"),
}))

此设置使您可以:

  • OAUTH向外部提供商提出要求
  • 添加自定义令牌验证逻辑
  • 管理客户注册
  • 提供自定义文档URL
  • 在委派给外部提供商时,保持对OAuth流的控制

向后兼容

带有streamableHTTP Tranport的客户和服务器可以维护backwards compatibility使用已弃用的HTTP+SSE传输(从协议版本2024-11-05)如下

客户端兼容性

对于需要与流式HTTP和较旧的SSE服务器一起使用的客户:

import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
let client: Client|undefined = undefined
const baseUrl = new URL(url);
try {
  client = new Client({
    name: 'streamable-http-client',
    version: '1.0.0'
  });
  const transport = new StreamableHTTPClientTransport(
    new URL(baseUrl)
  );
  await client.connect(transport);
  console.log("Connected using Streamable HTTP transport");
} catch (error) {
  // If that fails with a 4xx error, try the older SSE transport
  console.log("Streamable HTTP connection failed, falling back to SSE transport");
  client = new Client({
    name: 'sse-client',
    version: '1.0.0'
  });
  const sseTransport = new SSEClientTransport(baseUrl);
  await client.connect(sseTransport);
  console.log("Connected using SSE transport");
}

服务器端兼容性

对于需要支持流式HTTP和较旧客户端的服务器:

import express from "express";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";

const server = new McpServer({
  name: "backwards-compatible-server",
  version: "1.0.0"
});

// ... set up server resources, tools, and prompts ...

const app = express();
app.use(express.json());

// Store transports for each session type
const transports = {
  streamable: {} as Record<string, StreamableHTTPServerTransport>,
  sse: {} as Record<string, SSEServerTransport>
};

// Modern Streamable HTTP endpoint
app.all('/mcp', async (req, res) => {
  // Handle Streamable HTTP transport for modern clients
  // Implementation as shown in the "With Session Management" example
  // ...
});

// Legacy SSE endpoint for older clients
app.get('/sse', async (req, res) => {
  // Create SSE transport for legacy clients
  const transport = new SSEServerTransport('/messages', res);
  transports.sse[transport.sessionId] = transport;
  
  res.on("close", () => {
    delete transports.sse[transport.sessionId];
  });
  
  await server.connect(transport);
});

// Legacy message endpoint for older clients
app.post('/messages', async (req, res) => {
  const sessionId = req.query.sessionId as string;
  const transport = transports.sse[sessionId];
  if (transport) {
    await transport.handlePostMessage(req, res, req.body);
  } else {
    res.status(400).send('No transport found for sessionId');
  }
});

app.listen(3000);

注意:SSE运输现在被弃用以支持流式http。新实施应使用流式HTTP,现有的SSE实现应计划迁移。

文档

贡献

欢迎在https://github.com/modelcontextprotocol/typescript-sdk的GitHub上的问题和拉力请求。

执照

该项目已根据MIT许可获得许可,请参阅LICENSE文件以获取详细信息。

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档