文档中心>消息队列 CKafka 版>连接器使用指南>配置 Kafka 数据同步至 Elasticsearch Service

配置 Kafka 数据同步至 Elasticsearch Service

最近更新时间:2025-08-28 17:40:12

我的收藏

操作场景

CKafka 连接器支持将流式数据转化为可搜索、可分析的结构化存储,适用于实时日志监控、用户行为检索或 IoT 设备状态分析等多种场景。例如:
业务日志通过 Kafka 同步至 Elasticsearch 后,并借助 Kibana 实现运维监控可视化。
实时写入用户单击流或订单数据至 Elasticsearch Service,支持复杂条件的快速检索与查询。
该连接器提供近实时写入(数据秒级可见)、自动索引管理(动态适配 JSON 结构)及数据预处理能力(JSON 解析、字段裁剪),并通过死信队列机制有效隔离异常数据,结合重试策略保障传输可靠性。广泛应用于在线教育平台中的用户学习行为分析,或智能硬件场景下传感器异常数据的快速定位与排查。

约束与限制

只支持7.0以上版本的 Elasticsearch Service。

前提条件

该功能目前依赖 Elasticsearch Service 服务,使用时需开通相关产品功能,并准备好 ES 集群。

新建 Elasticsearch Service 连接

1. 登录 CKafka 控制台
2. 在左侧导航栏选择连接器 > 连接列表,选择好地域后单击新建连接
3. 选择好地域后,连接类型选择 Elasticsearch Service,单击下一步,在设置连接配置页面设置连接信息。
连接名称:填写连接名称,用于区分不同的 ES 连接
描述:选填,填写连接说明,不能超过128字符。
ES 实例集群:选取腾讯云 Elasticsearch Service 实例集群。
实例用户名:输入 Elasticsearch 实例用户名,腾讯云 Elasticsearch 默认用户名为 elastic,且不可更改。
实例密码:输入 Elasticsearch 实例密码。
4. 单击下一步,开始进行连接校验,校验成功后,连接创建完成,在连接列表可以看到创建好的连接。

创建数据同步任务

1. 在左侧导航栏单击连接器 > 任务列表,选择好地域后,单击新建任务
2. 在新建任务弹窗中设置任务基本信息。
任务名称:填写任务名称,用于区分不同的数据同步任务,只能包含字母、数字、下划线、“-”、“.”。
描述:选填,填写任务说明信息,不超过128个字符。
任务类型:选择数据流出
数据目标类型:选择 Elasticsearch Service。
3. 单击下一步,配置数据源信息。

配置数据源

1. 在数据源配置页面设置好数据源信息。
参数
说明
数据源类型
仅支持 CKafka 实例内 Topic
CKafka 实例
在下拉列表中,选择已准备好的数据源 CKafka 实例。
源 Topic
在下拉列表中,选择已准备好的数据源 Topic。
若数据源实例设置了 ACL 策略,请确保具备选中的数据源 Topic 的读写权限。
起始位置
配置 Topic offset(偏移量),用于设置转储时对历史消息的处理策略。支持如下三种方式:
从最新位置开始消费:即最大偏移量,从当前最新的数据开始消费(跳过历史消息)。
从最开始位置开始消费:即最小偏移量,从最早的数据开始消费(处理全部历史消息)。
从时间点位置开始消费:从用户自定义的时间点开始消费。
2. 信息确认无误后,单击下一步,配置数据处理规则。

配置数据处理规则

1. 在“数据处理规则”页面,在源数据处单击预览 Topic 消息,将会选取源 Topic 中的第一条消息进行解析。
说明
目前解析消息需要满足以下条件:消息为 JSON 字符串结构,且源数据必须为单层 JSON 格式,嵌套 JSON 格式可使用数据处理进行简单的消息格式转换。
2. 若需要对源数据进行清洗处理,请开启对源数据进行数据处理按钮,并执行步骤3;若无需清洗数据,只需要完成数据同步,则可以跳过后续步骤,直接进入下个环节配置数据目标。
3. (可选)数据处理规则配置支持从本地导入模板,若已有提前准备好的规则模板,直接导入即可;若无则继续执行步骤4配置数据处理规则,后续配置完数据处理规则后,您也可以将其导出存储为模板,在其他任务中复制使用。
4. 在“原始数据”选择数据来源,支持从源 Topic 拉取或者自定义,此处以从源 Topic 拉取为例。

5. 在“解析模式”选择对应的数据解析模式并确认,可以看到数据解析结果。此处以 JSON 模式为例,单击左侧的解析结果可以在右侧生成结构化预览。
解析模式
说明
JSON
解析标准 JSON 格式的数据,支持嵌套字段,输出格式为键值对。
分隔符
按指定分隔符解析非结构化文本,分隔符支持 空格制表符,;|:自定义
正则提取
适用于对长数组类型的消息进行特定字段的提取,您可以手动填写正则表达式,也可以使用正则表达式自动生成功能进行自动提取。详细介绍请参考正则提取解析模式说明
说明:
当输入的正则表达式中含有类似 (?<name>expr) 或者 (?P<name>expr) 捕获组时,会将正则表达式视为模式串进行匹配,当消息成功匹配模式串时将解析捕获组内容;否则,会将整个输入的正则表达式视为捕获组,并提取消息中所有匹配的内容。
JSON 对象数组-单行输出
数组内每个对象的格式一致,解析时仅解析第一个对象,输出结果为单条的 JSON,是 map 类型。
JSON 对象数组-多行输出
数组内每个对象的格式一致,解析时仅解析第一个对象,输出结果为数组类型。

6. 若开启 key-value 二次解析,将对 value 里的数据再次进行 key-value 解析。
7. 在“数据处理”区域设置数据处理规则,此处可对字段进行编辑、删减,调整时间戳格式,并新增当前系统时间字段等等。单击处理 value 可以添加处理链对处理后的单条数据进行再次处理。
操作
说明
映射
可以选择已有的 KEY,最终输出的 VALUE 值由指定的 KEY 映射而来。
JSONPATH
解析多层嵌套的 JSON 数据,用$符号开头,.符号定位到多层 JSON 的具体字段。详细介绍请参考JSONPath 说明
系统预设-当前时间
可以选择系统预设的 VALUE ,目前支持 DATE(时间戳)。
自定义
可以输入自定义 VALUE。

8. 单击测试,查看数据处理的测试结果。此时可根据实际业务需求添加处理链可对上面数据处理结果再次进行处理。
9. 在“过滤器”选择是否开启过滤器,若开启则可以仅输出符合过滤器规则的数据。过滤器的匹配模式支持前缀匹配、后缀匹配、包含匹配(contains)、除外匹配(except)、数值匹配和 IP 匹配。详情参见 过滤器规则说明
10. 在“保留 Topic 源数据”选择是否开启保留 Topic 源数据。
11. 在“输出格式”设置数据输出内容,默认为 JSON,支持 ROW 格式,若选择 ROW 格式需要选择输出行内容。
输出行内容
说明
VALUE
只输出上方测试结果中的 VALUE 值,用分隔符隔开。VALUE 间分隔符默认为“无”选项。
KEY&VALUE
输出上方测试结果中的 KEY 和 VALUE,KEY 和 VALUE 间分隔符和 VALUE 间分隔符均不能为“无”。
12. 在“失败消息处理”设置投递失败的消息处理规则,支持丢弃保留投递到死信队列三种方式。
处理方式
说明
丢弃
适合用于生产环境,任务运行失败时将会忽略当前失败消息。建议使用 "保留" 模式测试无误后,再将任务编辑成 "丢弃" 模式用于生产。
保留
适合用于测试环境,任务运行失败时将会终止任务不会重试,并且在事件中心中记录失败原因。
投递到死信队列
需指定死信队列 Topic,适合用于严格生产环境,任务运行失败时会将失败消息及元数据和失败原因投递到指定的 CKafka Topic 中。
13. 数据规则配置完成后,可以直接在顶部单击导出为模板,在后续的数据任务中复制使用,减少重复配置的操作成本。
14. 单击下一步,配置数据目标。

配置数据目标

1. 在数据配置目标页面,配置数据目标。
源数据:单击拉取源 Topic 数据。若源 Topic 暂无数据,也可以自定义数据。
数据目标:选择提前创建好的数据流出的目标 Elasticsearch Service 连接。
索引名称:填写索引名称,索引名称必须全部为小写,支持 JSONPath 语法。
按日期拆分索引名称:可选,开启后需选择日期格式,写入 ES 的索引为%(索引名称)_%(日期)。
失败消息处理:选择投递失败的消息的处理方式,支持丢弃保留投递至 CLS (需指定投递到的日志集和日志主题并授权访问日志服务 CLS)三种方式。
保留:适合用于测试环境,任务运行失败时将会终止任务不会重试,并且在事件中心中记录失败原因。
丢弃:适合用于生产环境,任务运行失败时将会忽略当前失败消息。建议使用 "保留" 模式测试无误后,再将任务编辑成 "丢弃" 模式用于生产。
投递至 CLS:适合用于严格生产环境,任务运行失败时会将失败消息及元数据和失败原因上传到指定 CLS 主题中。
死信队列:适合用于严格生产环境,任务运行失败时会将失败消息及元数据和失败原因投递到指定的 CKafka Topic 中。
数据源类型
其他数据
连接器数据订阅任务内的数据



索引时间:可以指定源数据中某一字段作为索引时间。默认为消息的投递时间。
ES 文档 ID 字段:可以指定该字段的值作为 ES 文档 ID 的值。默认为topic+kafkaPartition+kafkaOffset
保留非 JSON 数据:若开启,则对于非 JSON 数据,会指定 KEY 进行组装投递。若关闭,则会丢弃非 JSON 数据。
KEY:源 Topic 内数据不是 JSON 格式时,可以指定 key 组装为 JSON 投递到 ES 中。
本选项仅用于连接器订阅关系型数据库到 Topic 里面的数据(增删改)同步更新到 ES。会识别数据库的增删改,保持 ES 的数据与源表的数据一致。



同步模式:若选择字段逐一匹配,则可以自定义消息字段名和目标索引字段的映射关系。若选择默认字段匹配,则会在 ES 的索引中,mapping 使用消息的 key 作为 field 名称。
目标索引类型:可以选择新建索引或者从现有的 ES 索引中选择。
主键:指定数据库表的主键作为 ES 文档 ID 的值。
索引时间:可以指定源数据中的某一字段作为索引时间。默认为消息的投递时间。
2. 单击提交,完成任务创建,在任务列表页面可以看到创建好的数据同步任务。任务创建成功后,会自动根据任务设置开始数据同步,实时复制数据。


查看数据同步进度

1. 在任务列表页面,单击创建好的任务的“ID”,进入任务的基本信息页面。
2. 在页面顶部选择同步进度页签,可以查看数据同步的进度和详情。