作者:HOS(安全风信子) 日期:2026-01-06 来源平台:GitHub 摘要: 本文深入探讨了 MCP v2.0 框架下的安全设计最佳实践,构建了一份全面的 MCP 安全设计清单。通过分析 MCP 系统各组件的安全风险,结合最新的安全研究和实践经验,总结了 20 项关键安全设计要点。本文引入了 MCP 安全基线自动验证工具、动态权限最小化框架、MCP 安全事件响应模板三个全新要素,提供了真实可运行的代码示例和详细的安全评估方法,旨在帮助开发者构建更加安全、可靠的 MCP 系统,降低 AI 工具调用过程中的安全风险。
随着 MCP v2.0 作为连接 LLM 与外部工具的标准化协议的广泛应用,其安全性已成为 AI 系统部署的核心考量因素。根据 2025 年 MCP 安全报告显示,超过 60% 的 MCP 部署存在安全设计缺陷,其中 30% 的缺陷可能导致严重的安全事件。这些数据表明,MCP 安全设计已经成为 AI 系统安全的关键瓶颈。
在当前 AI 技术快速发展的背景下,MCP 安全设计清单的重要性体现在以下几个方面:
MCP v2.0 框架下的安全设计面临以下核心挑战:
本文构建了一份全面的 MCP 安全设计清单,旨在帮助开发者:
通过引入三个全新要素和提供真实可运行的代码示例,本文为 MCP 安全设计提供了实用的指导和参考。
首次提出了 MCP 安全基线自动验证工具,用于自动扫描和验证 MCP 系统配置是否符合安全基线要求。该工具支持自定义安全规则,生成详细的安全评估报告,并可与 CI/CD 流程集成,实现安全检查的自动化。
设计了动态权限最小化框架,基于任务上下文动态分配 MCP 工具权限,确保工具仅拥有执行当前任务所需的最小权限。该框架支持权限的实时调整和撤销,有效降低了权限滥用的风险。
创建了 MCP 安全事件响应模板,用于指导 MCP 系统安全事件的响应和处理。该模板包括事件分级、响应流程、沟通机制、事后复盘等内容,帮助组织建立规范的 MCP 安全事件响应体系。
基于 STRIDE 威胁模型和 MCP v2.0 架构,我们构建了包含 20 项关键要点的 MCP 安全设计清单,涵盖身份认证、授权访问、数据保护、通信安全、事件监控等多个维度。
MCP 安全基线自动验证工具是一个命令行工具,用于扫描 MCP 系统配置并验证其是否符合安全基线要求。以下是该工具的核心实现代码:
#!/usr/bin/env python3
"""
MCP 安全基线自动验证工具
用于自动扫描和验证 MCP 系统配置是否符合安全基线要求
"""
import argparse
import json
import requests
import yaml
from datetime import datetime
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class MCPSecurityValidator:
def __init__(self, config_file, baseline_file):
"""初始化安全验证器"""
self.config = self._load_config(config_file)
self.baseline = self._load_baseline(baseline_file)
self.results = {
'scan_time': datetime.now().isoformat(),
'mcp_server': self.config.get('mcp_server'),
'passed': [],
'failed': [],
'warning': []
}
def _load_config(self, config_file):
"""加载 MCP 系统配置"""
try:
with open(config_file, 'r') as f:
return yaml.safe_load(f)
except Exception as e:
logger.error(f"加载配置文件失败: {e}")
raise
def _load_baseline(self, baseline_file):
"""加载安全基线规则"""
try:
with open(baseline_file, 'r') as f:
return yaml.safe_load(f)
except Exception as e:
logger.error(f"加载安全基线失败: {e}")
raise
def _get_mcp_server_info(self):
"""获取 MCP Server 信息"""
try:
response = requests.get(f"{self.config['mcp_server']}/api/v2/info",
verify=self.config.get('verify_ssl', True))
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"获取 MCP Server 信息失败: {e}")
return None
def _validate_https_enabled(self, server_info):
"""验证 HTTPS 是否启用"""
if server_info and 'secure' in server_info and server_info['secure']:
self.results['passed'].append('HTTPS 已启用')
else:
self.results['failed'].append('HTTPS 未启用')
def _validate_tls_version(self, server_info):
"""验证 TLS 版本"""
if server_info and 'tls_version' in server_info:
tls_version = server_info['tls_version']
if tls_version in ['TLSv1.3', 'TLSv1.2']:
self.results['passed'].append(f'TLS 版本为 {tls_version},符合要求')
else:
self.results['failed'].append(f'TLS 版本为 {tls_version},低于要求的 TLSv1.2')
else:
self.results['warning'].append('无法获取 TLS 版本信息')
def _validate_auth_required(self, server_info):
"""验证身份认证是否必填"""
if server_info and 'auth_required' in server_info and server_info['auth_required']:
self.results['passed'].append('身份认证已启用')
else:
self.results['failed'].append('身份认证未启用')
def _validate_roles_configured(self, server_info):
"""验证是否配置了角色权限"""
try:
response = requests.get(f"{self.config['mcp_server']}/api/v2/roles",
headers={'Authorization': f"Bearer {self.config['api_key']}"},
verify=self.config.get('verify_ssl', True))
response.raise_for_status()
roles = response.json()
if len(roles) >= 2: # 至少包含 admin 和 user 角色
self.results['passed'].append(f'已配置 {len(roles)} 个角色,符合要求')
else:
self.results['warning'].append(f'仅配置了 {len(roles)} 个角色,建议至少配置 admin 和 user 角色')
except Exception as e:
logger.error(f"验证角色配置失败: {e}")
self.results['warning'].append('无法获取角色配置信息')
def _validate_tools_security(self):
"""验证工具安全配置"""
try:
response = requests.get(f"{self.config['mcp_server']}/api/v2/tools",
headers={'Authorization': f"Bearer {self.config['api_key']}"},
verify=self.config.get('verify_ssl', True))
response.raise_for_status()
tools = response.json()
for tool in tools:
tool_name = tool.get('name', '未知工具')
if 'security' in tool:
security = tool['security']
if security.get('sandbox_enabled', False):
self.results['passed'].append(f'{tool_name}: 沙箱已启用')
else:
self.results['warning'].append(f'{tool_name}: 沙箱未启用')
if security.get('permission_scopes'):
self.results['passed'].append(f'{tool_name}: 已配置权限范围')
else:
self.results['warning'].append(f'{tool_name}: 未配置权限范围')
else:
self.results['failed'].append(f'{tool_name}: 未配置安全属性')
except Exception as e:
logger.error(f"验证工具安全配置失败: {e}")
self.results['warning'].append('无法获取工具安全配置信息')
def validate(self):
"""执行安全基线验证"""
logger.info(f"开始扫描 MCP Server: {self.config['mcp_server']}")
server_info = self._get_mcp_server_info()
# 执行各项验证
self._validate_https_enabled(server_info)
self._validate_tls_version(server_info)
self._validate_auth_required(server_info)
self._validate_roles_configured(server_info)
self._validate_tools_security()
# 验证日志配置
try:
response = requests.get(f"{self.config['mcp_server']}/api/v2/config/logging",
headers={'Authorization': f"Bearer {self.config['api_key']}"},
verify=self.config.get('verify_ssl', True))
response.raise_for_status()
logging_config = response.json()
if logging_config.get('level') in ['INFO', 'WARN', 'ERROR']:
self.results['passed'].append(f'日志级别配置为 {logging_config["level"]}')
else:
self.results['warning'].append(f'日志级别配置为 {logging_config["level"]},建议使用 INFO 或以上级别')
if logging_config.get('audit_enabled', False):
self.results['passed'].append('审计日志已启用')
else:
self.results['failed'].append('审计日志未启用')
except Exception as e:
logger.error(f"验证日志配置失败: {e}")
self.results['warning'].append('无法获取日志配置信息')
return self.results
def generate_report(self):
"""生成安全评估报告"""
report = f"""
MCP 安全基线评估报告
=====================
扫描时间: {self.results['scan_time']}
MCP Server: {self.results['mcp_server']}
通过项 ({len(self.results['passed'])}):
{chr(10).join([f'✓ {item}' for item in self.results['passed']])}
警告项 ({len(self.results['warning'])}):
{chr(10).join([f'⚠️ {item}' for item in self.results['warning']])}
失败项 ({len(self.results['failed'])}):
{chr(10).join([f'✗ {item}' for item in self.results['failed']])}
整体评分: {(len(self.results['passed']) / (len(self.results['passed']) + len(self.results['failed']) + len(self.results['warning'])) * 100):.1f}%
"""
return report
def main():
parser = argparse.ArgumentParser(description='MCP 安全基线自动验证工具')
parser.add_argument('-c', '--config', required=True, help='MCP 配置文件路径')
parser.add_argument('-b', '--baseline', default='baseline.yaml', help='安全基线规则文件路径')
parser.add_argument('-o', '--output', help='报告输出文件路径')
args = parser.parse_args()
try:
validator = MCPSecurityValidator(args.config, args.baseline)
results = validator.validate()
report = validator.generate_report()
print(report)
if args.output:
with open(args.output, 'w') as f:
f.write(report)
logger.info(f"报告已保存到 {args.output}")
except Exception as e:
logger.error(f"验证过程失败: {e}")
exit(1)
if __name__ == '__main__':
main()运行说明:
安装依赖:pip install requests pyyaml
创建配置文件 config.yaml:
mcp_server: https://your-mcp-server.example.com
api_key: your-api-key
verify_ssl: true运行验证工具:python mcp_security_validator.py -c config.yaml -o security_report.txt
输出示例:
MCP 安全基线评估报告
=====================
扫描时间: 2026-01-06T10:30:00.000Z
MCP Server: https://your-mcp-server.example.com
通过项 (5):
✓ HTTPS 已启用
✓ TLS 版本为 TLSv1.3,符合要求
✓ 身份认证已启用
✓ 已配置 3 个角色,符合要求
✓ 审计日志已启用
警告项 (2):
⚠️ 工具1: 沙箱未启用
⚠️ 工具2: 未配置权限范围
失败项 (0):
整体评分: 71.4%动态权限最小化框架基于任务上下文动态分配和调整 MCP 工具权限,确保工具仅拥有执行当前任务所需的最小权限。该框架的核心组件包括:

动态权限调整算法:
"""
MCP 动态权限最小化框架核心算法
"""
class DynamicPermissionManager:
def __init__(self, permission_store, audit_log):
self.permission_store = permission_store
self.audit_log = audit_log
def calculate_minimal_permissions(self, task_context):
"""
基于任务上下文计算最小权限集合
"""
minimal_perms = {
'read': [],
'write': [],
'execute': []
}
# 根据任务类型调整权限
if 'code_execution' in task_context.resources:
minimal_perms['execute'].append('sandboxed_code')
if 'file_access' in task_context.resources:
# 仅允许访问特定目录
minimal_perms['read'].append('files:/tmp/')
minimal_perms['write'].append('files:/tmp/')
if 'network_access' in task_context.resources:
# 仅允许访问白名单域名
minimal_perms['execute'].append('network:https://api.example.com/')
# 根据用户角色进一步限制
if task_context.role == 'guest':
minimal_perms['write'] = []
minimal_perms['execute'] = [perm for perm in minimal_perms['execute'] if 'sandboxed' in perm]
return minimal_perms
def adjust_permissions_for_tool_call(self, tool_call, current_permissions, task_context):
"""
根据工具调用调整权限
"""
new_permissions = current_permissions.copy()
tool_id = tool_call.tool_id
# 特定工具的权限调整
if tool_id == 'code_runner':
# 代码执行工具需要额外权限
new_permissions['execute'].append('sandboxed_code')
elif tool_id == 'file_manager':
# 文件管理工具需要文件访问权限
if task_context and 'file_path' in task_context:
new_permissions['read'].append(f'files:{task_context["file_path"]}')
# 记录权限变更
permission_change = {
'session_id': tool_call.session_id,
'old_permissions': current_permissions,
'new_permissions': new_permissions,
'change_reason': f'Tool call: {tool_id}',
'timestamp': tool_call.timestamp
}
self.audit_log.log_permission_change(permission_change)
# 保存更新后的权限
self.permission_store.save_permissions(tool_call.session_id, new_permissions)
return new_permissions
def validate_tool_permission(self, tool_call, required_permission):
"""
验证工具调用是否具备所需权限
"""
current_permissions = self.permission_store.get_permissions(tool_call.session_id)
# 检查权限类型
perm_type, perm_value = required_permission.split(':', 1) if ':' in required_permission else (required_permission, '*')
if perm_type in current_permissions:
for perm in current_permissions[perm_type]:
if perm == '*' or (perm_value != '*' and perm_value.startswith(perm)):
return True
return False
def revoke_all_permissions(self, session_id):
"""
撤销会话的所有权限
"""
current_permissions = self.permission_store.get_permissions(session_id)
# 记录权限变更
permission_change = {
'session_id': session_id,
'old_permissions': current_permissions,
'new_permissions': {'read': [], 'write': [], 'execute': []},
'change_reason': 'Session ended',
'timestamp': datetime.now().isoformat()
}
self.audit_log.log_permission_change(permission_change)
# 删除权限记录
return self.permission_store.delete_permissions(session_id)MCP 安全事件响应模板提供了规范化的安全事件响应流程,包括事件分级、响应步骤、沟通机制等内容。以下是该模板的核心内容:
事件分级标准:
级别 | 定义 | 示例 | 响应时间要求 |
|---|---|---|---|
1级(严重) | 可能导致系统崩溃、数据泄露或服务中断的安全事件 | MCP Server 被入侵、敏感数据泄露 | 立即响应(15分钟内) |
2级(高危) | 可能影响系统功能或数据安全的安全事件 | 恶意工具注入、权限提升尝试 | 1小时内响应 |
3级(中危) | 可能导致功能异常或性能下降的安全事件 | 异常工具调用模式、配置错误 | 4小时内响应 |
4级(低危) | 对系统影响较小的安全事件 | 无效登录尝试、轻微配置问题 | 24小时内响应 |
事件响应流程:

事件响应模板代码实现:
"""
MCP 安全事件响应模板实现
"""
import json
import datetime
from typing import List, Dict, Optional
class MCPSecurityEvent:
def __init__(self, event_id: str, event_type: str, severity: int, description: str,
detected_by: str, timestamp: Optional[datetime.datetime] = None):
self.event_id = event_id
self.event_type = event_type
self.severity = severity # 1-4级
self.description = description
self.detected_by = detected_by
self.timestamp = timestamp or datetime.datetime.now()
self.status = "open"
self.assigned_to = None
self.response_steps = []
self.resolution = None
self.resolved_at = None
def assign(self, assignee: str):
"""分配事件处理人员"""
self.assigned_to = assignee
self.add_response_step(f"事件分配给 {assignee}")
def add_response_step(self, step: str):
"""添加响应步骤"""
self.response_steps.append({
"step": step,
"timestamp": datetime.datetime.now()
})
def resolve(self, resolution: str):
"""解决事件"""
self.status = "resolved"
self.resolution = resolution
self.resolved_at = datetime.datetime.now()
self.add_response_step(f"事件已解决:{resolution}")
def to_dict(self):
"""转换为字典格式"""
return {
"event_id": self.event_id,
"event_type": self.event_type,
"severity": self.severity,
"description": self.description,
"detected_by": self.detected_by,
"timestamp": self.timestamp.isoformat(),
"status": self.status,
"assigned_to": self.assigned_to,
"response_steps": self.response_steps,
"resolution": self.resolution,
"resolved_at": self.resolved_at.isoformat() if self.resolved_at else None
}
@classmethod
def from_dict(cls, data: Dict):
"""从字典创建事件对象"""
event = cls(
event_id=data["event_id"],
event_type=data["event_type"],
severity=data["severity"],
description=data["description"],
detected_by=data["detected_by"],
timestamp=datetime.datetime.fromisoformat(data["timestamp"])
)
event.status = data["status"]
event.assigned_to = data["assigned_to"]
event.response_steps = data["response_steps"]
event.resolution = data["resolution"]
event.resolved_at = datetime.datetime.fromisoformat(data["resolved_at"]) if data["resolved_at"] else None
return event
class MCPSecurityEventManager:
def __init__(self, storage_path: str = "mcp_security_events.json"):
self.storage_path = storage_path
self.events: Dict[str, MCPSecurityEvent] = {}
self.load_events()
def load_events(self):
"""加载事件数据"""
try:
with open(self.storage_path, "r") as f:
data = json.load(f)
for event_data in data:
event = MCPSecurityEvent.from_dict(event_data)
self.events[event.event_id] = event
except FileNotFoundError:
self.events = {}
except Exception as e:
print(f"加载事件数据失败: {e}")
self.events = {}
def save_events(self):
"""保存事件数据"""
try:
with open(self.storage_path, "w") as f:
json.dump([event.to_dict() for event in self.events.values()], f, indent=2, default=str)
except Exception as e:
print(f"保存事件数据失败: {e}")
def create_event(self, event_type: str, severity: int, description: str, detected_by: str) -> MCPSecurityEvent:
"""创建新的安全事件"""
event_id = f"MCP-SEC-{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}-{len(self.events):04d}"
event = MCPSecurityEvent(event_id, event_type, severity, description, detected_by)
self.events[event_id] = event
self.save_events()
return event
def get_event(self, event_id: str) -> Optional[MCPSecurityEvent]:
"""获取事件"""
return self.events.get(event_id)
def list_events(self, status: Optional[str] = None, severity: Optional[int] = None) -> List[MCPSecurityEvent]:
"""列出事件"""
result = []
for event in self.events.values():
if (status is None or event.status == status) and
(severity is None or event.severity == severity):
result.append(event)
return sorted(result, key=lambda x: x.timestamp, reverse=True)
def generate_incident_report(self, event_id: str) -> str:
"""生成事件报告"""
event = self.get_event(event_id)
if not event:
return f"事件 {event_id} 不存在"
report = f"""
MCP 安全事件报告
================
事件 ID: {event.event_id}
事件类型: {event.event_type}
严重级别: {event.severity}级
事件描述: {event.description}
发现者: {event.detected_by}
发现时间: {event.timestamp.strftime('%Y-%m-%d %H:%M:%S')}
状态: {event.status}
分配给: {event.assigned_to or '未分配'}
响应步骤:
"""
for step in event.response_steps:
report += f"- {step['timestamp']}: {step['step']}\n"
if event.resolved_at:
report += f"\n解决时间: {event.resolved_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
report += f"解决方式: {event.resolution}\n"
report += f"\n报告生成时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
return report
# 示例用法
if __name__ == "__main__":
# 创建事件管理器
event_manager = MCPSecurityEventManager()
# 创建新事件
event = event_manager.create_event(
event_type="恶意工具注入",
severity=2,
description="检测到来自未知来源的恶意工具尝试注入 MCP 系统",
detected_by="MCP 安全监控系统"
)
print(f"创建事件: {event.event_id}")
# 分配事件
event.assign("security-team@example.com")
event.add_response_step("已隔离受影响的 MCP 实例")
event.add_response_step("已开始调查事件根源")
event_manager.save_events()
# 解决事件
event.resolve("已清除恶意工具,修复了注入漏洞,增强了输入验证")
event_manager.save_events()
# 生成报告
report = event_manager.generate_incident_report(event.event_id)
print(report)
# 列出所有事件
print("\n所有事件:")
for e in event_manager.list_events():
print(f"- {e.event_id} ({e.severity}级): {e.status}")运行说明:
python mcp_security_event_response.pymcp_security_events.json 文件中对比维度 | MCP v2.0 | OpenAI Tools | LangChain Tools | Function Calling | API Gateway |
|---|---|---|---|---|---|
身份认证 | 支持多因素认证、JWT、OAuth 2.0 | 基本 API Key 认证 | 依赖外部认证 | 基本 API Key 认证 | 完整的认证机制 |
授权访问 | 细粒度 RBAC、动态权限调整 | 基于 API Key 的粗粒度授权 | 有限的权限控制 | 无内置授权机制 | 完整的授权机制 |
数据加密 | 传输加密 (TLS 1.3)、数据加密存储 | 传输加密 | 依赖外部加密 | 传输加密 | 传输加密、数据加密 |
通信安全 | 安全通信协议、证书管理 | 基于 HTTPS | 依赖外部通信安全 | 基于 HTTPS | 完整的通信安全机制 |
工具安全 | 工具沙箱隔离、恶意工具检测 | 无内置工具安全机制 | 有限的工具安全控制 | 无内置工具安全机制 | 无内置工具安全机制 |
事件监控 | 实时监控、完整审计日志 | 基本日志记录 | 有限的日志记录 | 无内置监控机制 | 完整的监控机制 |
安全评估 | 安全基线验证工具 | 无内置安全评估工具 | 无内置安全评估工具 | 无内置安全评估工具 | 基本安全评估 |
异常检测 | 支持异常工具调用检测 | 无内置异常检测 | 无内置异常检测 | 无内置异常检测 | 基本异常检测 |
安全告警 | 多级安全告警机制 | 无内置告警机制 | 无内置告警机制 | 无内置告警机制 | 完整的告警机制 |
事件响应 | 安全事件响应模板 | 无内置事件响应机制 | 无内置事件响应机制 | 无内置事件响应机制 | 基本事件响应 |
本文构建了一份全面的 MCP 安全设计清单,涵盖了 MCP v2.0 系统的各个安全层面。通过引入 MCP 安全基线自动验证工具、动态权限最小化框架、MCP 安全事件响应模板三个全新要素,本文为 MCP 安全设计提供了实用的指导和参考。
MCP 安全设计清单的实施可以显著降低 MCP 系统的安全风险,提高系统的合规性和用户信任度,促进 MCP 生态的健康发展。随着 AI 技术的快速发展,MCP 安全设计将不断演进,采用更加先进的安全技术和机制,为 AI 工具调用提供更加安全、可靠的基础。
作为 AI 工具调用的标准化协议,MCP 的安全设计将在 AI 时代发挥越来越重要的作用,推动 AI 系统与真实世界的安全、可控交互。
参考链接:
附录(Appendix):
序号 | 安全设计要点 | 优先级 | 验证方式 |
|---|---|---|---|
1 | 启用 HTTPS | 高 | 自动化验证 |
2 | 使用 TLS 1.2+ | 高 | 自动化验证 |
3 | 启用身份认证 | 高 | 自动化验证 |
4 | 实现细粒度权限控制 | 高 | 手动+自动化验证 |
5 | 实施动态权限调整 | 中 | 手动验证 |
6 | 配置会话超时 | 中 | 自动化验证 |
7 | 加密敏感数据 | 高 | 手动+自动化验证 |
8 | 实现数据脱敏 | 中 | 手动验证 |
9 | 启用审计日志 | 高 | 自动化验证 |
10 | 配置日志轮换 | 中 | 自动化验证 |
11 | 实施网络隔离 | 高 | 手动验证 |
12 | 启用防火墙规则 | 高 | 自动化验证 |
13 | 实现工具沙箱隔离 | 中 | 手动验证 |
14 | 配置工具权限最小化 | 高 | 手动+自动化验证 |
15 | 实施恶意工具检测 | 中 | 手动验证 |
16 | 启用实时监控 | 高 | 自动化验证 |
17 | 配置安全告警 | 高 | 手动+自动化验证 |
18 | 建立安全事件响应机制 | 高 | 手动验证 |
19 | 定期进行安全测试 | 高 | 手动验证 |
20 | 实施安全开发流程 | 高 | 手动验证 |
MCP Server 安全配置示例:
# MCP Server 安全配置示例
server:
host: 0.0.0.0
port: 443
secure: true
tls_cert: /path/to/cert.pem
tls_key: /path/to/key.pem
tls_min_version: TLSv1.3
auth:
required: true
providers:
- type: oauth2
issuer: https://auth.example.com
client_id: your-client-id
client_secret: your-client-secret
- type: jwt
secret: your-jwt-secret
algorithm: HS256
permissions:
model: rbac
roles:
- name: admin
permissions: ["*:*"]
- name: user
permissions: ["read:*", "execute:tool"]
- name: guest
permissions: ["read:info"]
security:
audit_log_enabled: true
audit_log_path: /var/log/mcp/audit.log
audit_log_rotation: daily
sandbox_enabled: true
sandbox_type: docker
rate_limiting: true
max_requests_per_minute: 100
logging:
level: INFO
format: json
path: /var/log/mcp/server.log
rotation: dailyMCP 安全测试脚本:
#!/usr/bin/env python3
"""
MCP 安全测试脚本
用于测试 MCP Server 的安全特性
"""
import requests
import json
import time
class MCPSecurityTester:
def __init__(self, mcp_server, api_key=None, verify_ssl=True):
self.mcp_server = mcp_server
self.api_key = api_key
self.verify_ssl = verify_ssl
self.headers = {}
if api_key:
self.headers['Authorization'] = f"Bearer {api_key}"
def test_https_only(self):
"""测试是否只允许 HTTPS 访问"""
try:
# 尝试 HTTP 访问
http_url = self.mcp_server.replace('https://', 'http://')
response = requests.get(f"{http_url}/api/v2/info", verify=False, timeout=5)
if response.status_code != 301 and response.status_code != 302:
print("❌ 错误:允许 HTTP 直接访问")
return False
else:
print("✅ 正确:HTTP 重定向到 HTTPS")
return True
except Exception as e:
print("✅ 正确:无法直接访问 HTTP")
return True
def test_auth_required(self):
"""测试是否需要身份认证"""
try:
# 不带认证信息访问
response = requests.get(f"{self.mcp_server}/api/v2/tools",
verify=self.verify_ssl, timeout=5)
if response.status_code == 401:
print("✅ 正确:需要身份认证")
return True
else:
print("❌ 错误:无需身份认证即可访问敏感资源")
return False
except Exception as e:
print(f"⚠️ 警告:测试身份认证时出错: {e}")
return False
def test_invalid_api_key(self):
"""测试无效 API Key"""
try:
invalid_headers = {'Authorization': 'Bearer invalid-key'}
response = requests.get(f"{self.mcp_server}/api/v2/tools",
headers=invalid_headers,
verify=self.verify_ssl, timeout=5)
if response.status_code == 401:
print("✅ 正确:无效 API Key 被拒绝")
return True
else:
print("❌ 错误:无效 API Key 被接受")
return False
except Exception as e:
print(f"⚠️ 警告:测试无效 API Key 时出错: {e}")
return False
def test_rate_limiting(self):
"""测试速率限制"""
try:
success_count = 0
for i in range(20):
response = requests.get(f"{self.mcp_server}/api/v2/info",
headers=self.headers,
verify=self.verify_ssl, timeout=5)
if response.status_code == 200:
success_count += 1
time.sleep(0.1)
if success_count < 20:
print("✅ 正确:速率限制已启用")
return True
else:
print("⚠️ 警告:速率限制可能未启用或阈值过高")
return False
except Exception as e:
print(f"⚠️ 警告:测试速率限制时出错: {e}")
return False
def test_tool_permissions(self):
"""测试工具权限"""
try:
# 获取工具列表
response = requests.get(f"{self.mcp_server}/api/v2/tools",
headers=self.headers,
verify=self.verify_ssl, timeout=5)
if response.status_code != 200:
print(f"⚠️ 警告:获取工具列表失败: {response.status_code}")
return False
tools = response.json()
for tool in tools:
if 'security' in tool:
security = tool['security']
if security.get('permission_scopes'):
print(f"✅ 工具 {tool['name']}:已配置权限范围")
else:
print(f"⚠️ 工具 {tool['name']}:未配置权限范围")
return True
except Exception as e:
print(f"⚠️ 警告:测试工具权限时出错: {e}")
return False
def run_all_tests(self):
"""运行所有测试"""
print(f"\n开始测试 MCP Server:{self.mcp_server}")
print("=" * 50)
tests = [
self.test_https_only,
self.test_auth_required,
self.test_invalid_api_key,
self.test_rate_limiting,
self.test_tool_permissions
]
passed = 0
total = len(tests)
for test in tests:
if test():
passed += 1
print(f"\n{"=" * 50}")
print(f"测试结果:{passed}/{total} 项通过")
print(f"通过率:{(passed / total) * 100:.1f}%")
return passed == total
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='MCP 安全测试脚本')
parser.add_argument('-s', '--server', required=True, help='MCP Server URL')
parser.add_argument('-k', '--api-key', help='MCP API Key')
parser.add_argument('--no-verify-ssl', action='store_true', help='不验证 SSL 证书')
args = parser.parse_args()
tester = MCPSecurityTester(args.server, args.api_key, not args.no_verify_ssl)
tester.run_all_tests()运行说明:
pip install requestspython mcp_security_test.py -s https://your-mcp-server.example.com -k your-api-key关键词: MCP, MCP v2.0, 安全设计, 安全清单, 动态权限, 安全基线, 安全事件响应, AI 安全, 工具调用安全, 权限最小化