前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >mongodb:实时数据同步(一)

mongodb:实时数据同步(一)

作者头像
cosmozhu
发布2020-06-15 06:45:06
5.3K0
发布2020-06-15 06:45:06
举报
文章被收录于专栏:cosmozhu技术篇cosmozhu技术篇

关于mongodb数据实时同步,如果只是做数据备份的话,直接搭建一个replica set集群或者shard集群就可以实现目的了。但这样的话作为备份库的节点都是secondery,你没法往备份库上写数据上去。

不幸的是我最近就遇到了这样的需求,一个云上mongodb和一个云下机房的mongodb。云上的数据需要实时同步到云下,但云下的数据库会写入一些其它业务。

这样的话我只能将数据实时从云上采集到云下库。

本文介绍的是基于kafka-connector的一种解决方案。

环境准备

已有搭建好的kafka集群,可以参考cosmo这篇《Kafka集群搭建》快速搭建一个开发用的kafka集群。

debezium提供的 connector 插件:debezium-connector-mongodb

mongodb官方提供的connector插件:mongo-kafka-connect-1.0.1-all.jar

两个概念

kafka-connector 由两个重要的部分组成source和sink。source用来从数据源采集数据,sink用来将数据保存到目标数据源。

为什么要使用两个connector?

本文将使用debezium提供的变更数据事件采集器来采集数据,使用 mongodb 官方提供的connector中的sink将数据推送给下游数据源。

插件安装

将下载下来的两个压缩包放在kafka插件目录下

代码语言:javascript
复制
/usr/local/share/kafka/plugins

如果目录不存在请新建

解压 debezium-connector-mongodbmongo-kafka-connect-1.0.1-all.jar

启动kafka-connect

kafka-connector启动分为单机版和集群版,我们这里不讨论单机版。

代码语言:javascript
复制
#在所有kafka brokers上执行下面命令,启动connector
bin/connect-distributed.sh -daemon config/connect-distributed.properties

因为kafka-connect的意图是以服务的方式去运行,所以它提供了REST API去管理connectors,默认的端口是8083。

代码语言:javascript
复制
GET /connectors – 返回所有正在运行的connector名

POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。

GET /connectors/{name} – 获取指定connetor的信息

GET /connectors/{name}/config – 获取指定connector的配置信息

PUT /connectors/{name}/config – 更新指定connector的配置信息

GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。

GET /connectors/{name}/tasks – 获取指定connector正在运行的task。

GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息

PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。

PUT /connectors/{name}/resume – 恢复一个被暂停的connector

POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用

POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。

DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。
 
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
 GET /connectors/{name} – 获取指定connetor的信息
 GET /connectors/{name}/config – 获取指定connector的配置信息
 PUT /connectors/{name}/config – 更新指定connector的配置信息
 GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
 GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
 GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息
 PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
 PUT /connectors/{name}/resume – 恢复一个被暂停的connector
 POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
 POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
 DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。 

Debezium Mongodb Connector配置

Property

Default

Description

name

connector的名称(唯一指定)

connector.class

Mongodb connector 具体实现类,默认值为 io.debezium.connector.mongodb.MongoDbConnector

mongodb.hosts

mongodb 链接信息host:port,host:port,如果 mongodb.members.auto.discover 是false,需要指定具体的副本集名称例如 rs0/host:port。如果是shard 集群请配置config server的地址。

mongodb.name

采集好的数据会推送到kafka消息队列,topics为db.collection。如果配置了这个name,将在topics前加此name作为前缀。

mongodb.user

mongodb 用户名

mongodb.password

mongodb 密码

mongodb.authsource

admin

mongodb 鉴权库

mongodb.ssl.enabled

false

链接是否使用ssl

mongodb.ssl.invalid.hostname.allowed

false

是否严格检查主机名

database.whitelist

empty string

监听数据变更的db库白名单,与黑名单不能同时使用

database.blacklist

empty string

监听数据变更的db库黑名单, 与白名单不能同时使用

collection.whitelist

empty string

监听数据变更的collection库白名单, 与黑名单不能同时使用 。逗号分隔

collection.blacklist

empty string

监听数据变更的collection库黑名单, 与白名单不能同时使用 。逗号分隔

snapshot.mode

initial

默认为: initial ,在启动时如果在oplog中找不到偏移量,会创建一个快照进行初始化同步。如果不需要请设置为never。

field.blacklist

empty string

字段映射黑名单,配置的字段将不会同步 ,用逗号分隔

field.renames

empty string

字段重命名old:new,用逗号分隔

tasks.max

1

如果是副本集集群,默认值1是可以接收的。如果是shard cluster 最好大于等于分片数量

initial.sync.max.threads

1

初始化同步任务数

tombstones.on.delete

true

是否在delete之后推送 tombstone  事件

snapshot.delay.ms

connector启动后拍摄快照之前等待的时间,单位为(毫秒)避免集群中多个connector启动时中断快照。

snapshot.fetch.size

0

拍摄快照时每次拉取的最大数

启动debezium-connector数据采集任务

代码语言:javascript
复制
{
     "name" : "debezium",
     "config" : {
         "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
         "mongodb.hosts" : "vm4:27100",
         "mongodb.user" : "root",
         "mongodb.password" : "root",
         "mongodb.authsource" : "admin",
         "database.whitelist" : "test",
         "tasks.max" : "4",
         "mongodb.name" : "debezium"
     }
 }
代码语言:javascript
复制
curl -H "Content-Type: application/json" -X POST -d '{     "name" : "debezium",    "config" : {        "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",       "mongodb.hosts" : "vm4:27100",      "mongodb.user" : "root",        "mongodb.password" : "root",        "mongodb.authsource" : "admin",         "database.whitelist" : "test",      "tasks.max" : "4",      "mongodb.name" : "debezium"     } }' http://vm2:8083/connectors

Kafka-Connector-Mongodb-Sink配置

代码语言:javascript
复制
{

    "name" : "mongo-sink", #sink名称

    "config" : {

        "topics" : "debezium.sync.realtime_air", #监听的topics,多个用逗号分隔

        "max.batch.size" : "50", #每次处理的最大数量

        "connection.uri" : "mongodb://192.168.4.49:27017/sync", #目标mongodb链接

        "connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector", #mongodb链接实现类

        "change.data.capture.handler" : "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler", #CDC实现类

        "key.converter" : "org.apache.kafka.connect.json.JsonConverter", #键序列化类

        "key.converter.schemas.enable" : "true", #键转化是否包含架构

        "value.converter" : "org.apache.kafka.connect.json.JsonConverter", #值序列化类

        "value.converter.schemas.enable" : "true",#值转化是否包含架构

        "database" : "sync", #写入的数据库名称

        "collection" : "mongosink", #写入的集合名称

        "topic.override.debezium.sync.realtime_air.collection" : "realtime_air" #覆盖配置,设置debezium.sync.realtime_air 写入的集合名称为realtime_air

    }

}

启动数据落库任务

代码语言:javascript
复制
curl -H "Content-Type: application/json" -X POST -d '{     "name" : "mongo-sink",  "config" : {        "topics" : "debezium.sync.realtime_air",        "max.batch.size" : "50",        "connection.uri" : "mongodb://192.168.4.49:27017/sync",         "connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector",         "change.data.capture.handler" : "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler",       "key.converter" : "org.apache.kafka.connect.json.JsonConverter",        "key.converter.schemas.enable" : "true",        "value.converter" : "org.apache.kafka.connect.json.JsonConverter",      "value.converter.schemas.enable" : "true",      "database" : "sync",        "collection" : "mongosink",         "topic.override.debezium.sync.realtime_air.collection" : "realtime_air"     } }' http://vm2:8083/connectors

好了,这样一个实时mongodb同步任务就搭建完成了。

文章参考:

https://docs.mongodb.com/kafka-connector/current/kafka-sink-properties/

https://debezium.io/documentation/reference/1.1/connectors/mongodb.html


作者:cosmozhu --90后的老父亲,专注于保护地球的程序员

个人网站:https://www.cosmozhu.fun

欢迎转载,转载时请注明出处。

相关文章

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 环境准备
  • 两个概念
  • 为什么要使用两个connector?
  • 插件安装
  • 启动kafka-connect
  • Debezium Mongodb Connector配置
  • Kafka-Connector-Mongodb-Sink配置
    • 相关文章
    相关产品与服务
    数据库
    云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档