首页
学习
活动
专区
圈层
工具
发布
MCP广场 >详情页
Kafka AI消息服务2026-06-02193分享添加福利群:解决AI开发者的「MCP实战痛点」
使AI模型能够通过标准化接口从Apache Kafka主题发布和消费消息,从而轻松地将Kafka消息与大语言模型和代理应用程序集成。
By pavanjava
2026-06-02193
github
详情内容

在MseeP验证通过

Kafka MCP服务器

这是一个与Apache Kafka集成的消息上下文协议(MCP)服务器,为LLM(大语言模型)和智能体应用提供发布与消费功能。

概述

本项目实现了一个服务器,允许AI模型通过标准化接口与Kafka主题交互。它支持:

  • 向Kafka主题发布消息
  • 从Kafka主题消费消息

前提条件

  • Python 3.8或更高版本
  • Apache Kafka实例
  • Python依赖项(参见安装部分)

安装

  1. 克隆代码仓库:

    git clone <仓库地址>
    cd <仓库目录>
    
  2. 创建并激活虚拟环境:

    python -m venv venv
    source venv/bin/activate  # Windows系统使用:venv\Scripts\activate
    
  3. 安装依赖项:

    pip install -r requirements.txt
    

    如果不存在requirements.txt文件,请安装以下包:

    pip install aiokafka python-dotenv pydantic-settings mcp-server
    

配置

在项目根目录创建.env文件,包含以下变量:

# Kafka配置
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
TOPIC_NAME=你的主题名称
IS_TOPIC_READ_FROM_BEGINNING=False
DEFAULT_GROUP_ID_FOR_CONSUMER=kafka-mcp-group

# 可选:自定义工具描述
# TOOL_PUBLISH_DESCRIPTION="发布工具的自定义描述"
# TOOL_CONSUME_DESCRIPTION="消费工具的自定义描述"

使用方法

运行服务器

使用提供的main.py脚本运行服务器:

python main.py --transport stdio

可用传输选项:

  • stdio:标准输入/输出(默认)
  • sse:服务器发送事件

与Claude Desktop集成

要在Claude Desktop中使用此Kafka MCP服务器,请在Claude Desktop配置文件中添加以下配置:

{
    "mcpServers": {
        "kafka": {
            "command": "python",
            "args": [
                "<项目路径>/main.py"
            ]
        }
    }
}

<项目路径>替换为你的项目目录的绝对路径。

项目结构

  • main.py:应用程序入口点
  • kafka.py:Kafka连接器实现
  • server.py:包含Kafka交互工具的MCP服务器实现
  • settings.py:使用Pydantic的配置管理

可用工具

kafka-publish

向配置的Kafka主题发布信息。

kafka-consume

从配置的Kafka主题消费信息。

  • 注意:一旦消息被读取,使用相同的groupid将无法再次读取

Create-Topic

创建具有指定参数的新Kafka主题。

  • 选项
    • --topic:要创建的主题名称
    • --partitions:分配的分区数量
    • --replication-factor:跨代理的复制因子
    • --config(可选):主题级别的配置覆盖(例如,retention.ms=604800000

Delete-Topic

删除现有的Kafka主题。

  • 选项
    • --topic:要删除的主题名称
    • --timeout(可选):等待删除完成的时间

List-Topics

列出集群中的所有主题(或按模式过滤)。

  • 选项
    • --bootstrap-server:代理地址
    • --pattern(可选):用于过滤主题名称的正则表达式
    • --exclude-internal(可选):排除内部主题(默认:true)

Topic-Configuration

显示或更改一个或多个主题的配置。

  • 选项
    • --describe:显示主题的当前配置
    • --alter:修改配置(例如,--add-config retention.ms=86400000,--delete-config cleanup.policy
    • --topic:主题名称

Topic-Metadata

获取主题或集群的元数据。

  • 选项
    • --topic(如果提供):仅获取此主题的元数据
    • --bootstrap-server:代理地址
    • --include-offline(可选):包括离线代理或分区
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档