有奖捉虫:办公协同&微信生态&物联网文档专题 HOT

操作场景

MongoDB 内有 Change Stream 作为其追踪变更的解决方案,但为了更好地对变更记录进行搜索,往往需要将变更记录同步到 Elasticsearch、日志服务(CLS) 等。



本文以 MongoDB 接入 CKafka 并从 CKafka 流出到 CLS 为例,讲解如何使用 DIP 数据转储服务实现 Mongo Stream 数据变更记录分析。

运行原理

MongoDB 的数据流入配置项可参见官方 MongoDB Kafka source connector,通过设置不同的配置项,能够对接入的变更记录做相应的数据处理后再写入到 CKafka 实例的 Topic。

前提条件

需开启云数据库 MongoDB 服务或使用负载均衡 CLB 监听自建 MongoDB。

并且设置的安全组需要开放 TCP:27017 端口。


需开启 CKafka 服务。
需开启 CLS 服务。

操作步骤

步骤1:创建数据接入连接

1. 登录 DIP 控制台
2. 在左侧导航栏点击连接列表,选择好地域后,点击新建连接
3. 连接类型选择 MongoDB,然后点击下一步


4. 填写 MongoDB 连接配置,然后单击下一步


5. 等待连接校验成功后,可以在连接列表看到创建好的连接。

步骤2:创建数据接入任务

1. 在左侧导航栏单击任务管理 > 任务列表,选择好地域后,单击新建任务
2. 任务类型选择数据接入,接入方式选择 MongoDB数据订阅,单击下一步


3. 数据源选择刚刚新建的 MySQL 连接database 留空表示监听所有数据库的变更。table 留空表示监听某个数据库的所有表的变更。复制存量数据可以根据业务需求选择是否打开,然后单击下一步


4. 根据业务需求配置数据目标然后点击提交。等待任务显示健康,即表示创建成功。


5. 当 MongoDB 数据发生变更时,可以在目标 Topic 中查看到新增的消息。


数据目标为 CKafka 实例的 Topic,可以在侧边栏点击消息查询进行查看;
数据目标为单独 Topic 时,可以在侧边栏点击 Topic 列表,然后点击 Topic 进入详情页,再点击查看消息

步骤3:创建数据流出任务

1. 在左侧导航栏点击任务管理 > 任务列表,选择好地域后,点击新建任务
2. 任务类型选择数据流出,接入方式选择日志服务(CLS),点击下一步


3. 填写任务详情,选取与数据接入任务相同的 CKafka 实例和 Topic,保证在消息生产后能直接进行消费。


4. 单击提交,等待任务显示健康,即表示创建成功。
说明
当任务在健康的状态时, Topic 有新增的消息写入,会直接被消费到指定的 CLS 日志主题中。

步骤4:查看流出数据

1. 登录 日志服务 控制台。
2. 在左侧导航栏选择检索分析,选择流出时填写的日志集与日志主题的“ID”,即可看到 MongoDB 的变更记录。


3. 通过关键字检索等操作,能直观得到所需要的记录。