Kafka AI消息服务添加福利群:解决AI开发者的「MCP实战痛点」这是一个与Apache Kafka集成的消息上下文协议(MCP)服务器,为LLM(大语言模型)和智能体应用提供发布与消费功能。
本项目实现了一个服务器,允许AI模型通过标准化接口与Kafka主题交互。它支持:
克隆代码仓库:
git clone <仓库地址>
cd <仓库目录>

创建并激活虚拟环境:
python -m venv venv
source venv/bin/activate # Windows系统使用:venv\Scripts\activate

安装依赖项:
pip install -r requirements.txt
如果不存在requirements.txt文件,请安装以下包:
pip install aiokafka python-dotenv pydantic-settings mcp-server
在项目根目录创建.env文件,包含以下变量:
# Kafka配置 KAFKA_BOOTSTRAP_SERVERS=localhost:9092 TOPIC_NAME=你的主题名称 IS_TOPIC_READ_FROM_BEGINNING=False DEFAULT_GROUP_ID_FOR_CONSUMER=kafka-mcp-group # 可选:自定义工具描述 # TOOL_PUBLISH_DESCRIPTION="发布工具的自定义描述" # TOOL_CONSUME_DESCRIPTION="消费工具的自定义描述"
使用提供的main.py脚本运行服务器:
python main.py --transport stdio
可用传输选项:
stdio:标准输入/输出(默认)sse:服务器发送事件要在Claude Desktop中使用此Kafka MCP服务器,请在Claude Desktop配置文件中添加以下配置:
{
"mcpServers": {
"kafka": {
"command": "python",
"args": [
"<项目路径>/main.py"
]
}
}
}

将<项目路径>替换为你的项目目录的绝对路径。
main.py:应用程序入口点kafka.py:Kafka连接器实现server.py:包含Kafka交互工具的MCP服务器实现settings.py:使用Pydantic的配置管理向配置的Kafka主题发布信息。
从配置的Kafka主题消费信息。
创建具有指定参数的新Kafka主题。
--topic:要创建的主题名称--partitions:分配的分区数量--replication-factor:跨代理的复制因子--config(可选):主题级别的配置覆盖(例如,retention.ms=604800000)删除现有的Kafka主题。
--topic:要删除的主题名称--timeout(可选):等待删除完成的时间列出集群中的所有主题(或按模式过滤)。
--bootstrap-server:代理地址--pattern(可选):用于过滤主题名称的正则表达式--exclude-internal(可选):排除内部主题(默认:true)显示或更改一个或多个主题的配置。
--describe:显示主题的当前配置--alter:修改配置(例如,--add-config retention.ms=86400000,--delete-config cleanup.policy)--topic:主题名称获取主题或集群的元数据。
--topic(如果提供):仅获取此主题的元数据--bootstrap-server:代理地址--include-offline(可选):包括离线代理或分区