专栏首页cosmozhu技术篇mongodb:实时数据同步(一)

mongodb:实时数据同步(一)

关于mongodb数据实时同步,如果只是做数据备份的话,直接搭建一个replica set集群或者shard集群就可以实现目的了。但这样的话作为备份库的节点都是secondery,你没法往备份库上写数据上去。

不幸的是我最近就遇到了这样的需求,一个云上mongodb和一个云下机房的mongodb。云上的数据需要实时同步到云下,但云下的数据库会写入一些其它业务。

这样的话我只能将数据实时从云上采集到云下库。

本文介绍的是基于kafka-connector的一种解决方案。

环境准备

已有搭建好的kafka集群,可以参考cosmo这篇《Kafka集群搭建》快速搭建一个开发用的kafka集群。 debezium提供的 connector 插件:debezium-connector-mongodb mongodb官方提供的connector插件:mongo-kafka-connect-1.0.1-all.jar

两个概念

kafka-connector 由两个重要的部分组成source和sink。source用来从数据源采集数据,sink用来将数据保存到目标数据源。

为什么要使用两个connector?

本文将使用debezium提供的变更数据事件采集器来采集数据,使用 mongodb 官方提供的connector中的sink将数据推送给下游数据源。

插件安装

将下载下来的两个压缩包放在kafka插件目录下

/usr/local/share/kafka/plugins

如果目录不存在请新建

解压 debezium-connector-mongodbmongo-kafka-connect-1.0.1-all.jar

启动kafka-connect

kafka-connector启动分为单机版和集群版,我们这里不讨论单机版。

#在所有kafka brokers上执行下面命令,启动connector
bin/connect-distributed.sh -daemon config/connect-distributed.properties

因为kafka-connect的意图是以服务的方式去运行,所以它提供了REST API去管理connectors,默认的端口是8083。

GET /connectors – 返回所有正在运行的connector名

POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。

GET /connectors/{name} – 获取指定connetor的信息

GET /connectors/{name}/config – 获取指定connector的配置信息

PUT /connectors/{name}/config – 更新指定connector的配置信息

GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。

GET /connectors/{name}/tasks – 获取指定connector正在运行的task。

GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息

PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。

PUT /connectors/{name}/resume – 恢复一个被暂停的connector

POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用

POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。

DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。
 
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
 GET /connectors/{name} – 获取指定connetor的信息
 GET /connectors/{name}/config – 获取指定connector的配置信息
 PUT /connectors/{name}/config – 更新指定connector的配置信息
 GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
 GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
 GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息
 PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
 PUT /connectors/{name}/resume – 恢复一个被暂停的connector
 POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
 POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
 DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。 

Debezium Mongodb Connector配置

Property

Default

Description

name

connector的名称(唯一指定)

connector.class

Mongodb connector 具体实现类,默认值为 io.debezium.connector.mongodb.MongoDbConnector

mongodb.hosts

mongodb 链接信息host:port[,host:port],如果 mongodb.members.auto.discover 是false,需要指定具体的副本集名称例如 rs0/host:port。如果是shard 集群请配置config server的地址。

mongodb.name

采集好的数据会推送到kafka消息队列,topics为[db].[collection]。如果配置了这个name,将在topics前加此name作为前缀。

mongodb.user

mongodb 用户名

mongodb.password

mongodb 密码

mongodb.authsource

admin

mongodb 鉴权库

mongodb.ssl.enabled

false

链接是否使用ssl

mongodb.ssl.invalid.hostname.allowed

false

是否严格检查主机名

database.whitelist

empty string

监听数据变更的db库白名单,与黑名单不能同时使用

database.blacklist

empty string

监听数据变更的db库黑名单, 与白名单不能同时使用

collection.whitelist

empty string

监听数据变更的collection库白名单, 与黑名单不能同时使用 。逗号分隔

collection.blacklist

empty string

监听数据变更的collection库黑名单, 与白名单不能同时使用 。逗号分隔

snapshot.mode

initial

默认为: initial ,在启动时如果在oplog中找不到偏移量,会创建一个快照进行初始化同步。如果不需要请设置为never。

field.blacklist

empty string

字段映射黑名单,配置的字段将不会同步 ,用逗号分隔

field.renames

empty string

字段重命名[old]:[new],用逗号分隔

tasks.max

1

如果是副本集集群,默认值1是可以接收的。如果是shard cluster 最好大于等于分片数量

initial.sync.max.threads

1

初始化同步任务数

tombstones.on.delete

true

是否在delete之后推送 tombstone  事件

snapshot.delay.ms

connector启动后拍摄快照之前等待的时间,单位为(毫秒)避免集群中多个connector启动时中断快照。

snapshot.fetch.size

0

拍摄快照时每次拉取的最大数

启动debezium-connector数据采集任务

{
     "name" : "debezium",
     "config" : {
         "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
         "mongodb.hosts" : "vm4:27100",
         "mongodb.user" : "root",
         "mongodb.password" : "root",
         "mongodb.authsource" : "admin",
         "database.whitelist" : "test",
         "tasks.max" : "4",
         "mongodb.name" : "debezium"
     }
 }
curl -H "Content-Type: application/json" -X POST -d '{     "name" : "debezium",    "config" : {        "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",       "mongodb.hosts" : "vm4:27100",      "mongodb.user" : "root",        "mongodb.password" : "root",        "mongodb.authsource" : "admin",         "database.whitelist" : "test",      "tasks.max" : "4",      "mongodb.name" : "debezium"     } }' http://vm2:8083/connectors

Kafka-Connector-Mongodb-Sink配置

{

    "name" : "mongo-sink", #sink名称

    "config" : {

        "topics" : "debezium.sync.realtime_air", #监听的topics,多个用逗号分隔

        "max.batch.size" : "50", #每次处理的最大数量

        "connection.uri" : "mongodb://192.168.4.49:27017/sync", #目标mongodb链接

        "connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector", #mongodb链接实现类

        "change.data.capture.handler" : "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler", #CDC实现类

        "key.converter" : "org.apache.kafka.connect.json.JsonConverter", #键序列化类

        "key.converter.schemas.enable" : "true", #键转化是否包含架构

        "value.converter" : "org.apache.kafka.connect.json.JsonConverter", #值序列化类

        "value.converter.schemas.enable" : "true",#值转化是否包含架构

        "database" : "sync", #写入的数据库名称

        "collection" : "mongosink", #写入的集合名称

        "topic.override.debezium.sync.realtime_air.collection" : "realtime_air" #覆盖配置,设置debezium.sync.realtime_air 写入的集合名称为realtime_air

    }

}

启动数据落库任务

curl -H "Content-Type: application/json" -X POST -d '{     "name" : "mongo-sink",  "config" : {        "topics" : "debezium.sync.realtime_air",        "max.batch.size" : "50",        "connection.uri" : "mongodb://192.168.4.49:27017/sync",         "connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector",         "change.data.capture.handler" : "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler",       "key.converter" : "org.apache.kafka.connect.json.JsonConverter",        "key.converter.schemas.enable" : "true",        "value.converter" : "org.apache.kafka.connect.json.JsonConverter",      "value.converter.schemas.enable" : "true",      "database" : "sync",        "collection" : "mongosink",         "topic.override.debezium.sync.realtime_air.collection" : "realtime_air"     } }' http://vm2:8083/connectors

好了,这样一个实时mongodb同步任务就搭建完成了。

文章参考: https://docs.mongodb.com/kafka-connector/current/kafka-sink-properties/ https://debezium.io/documentation/reference/1.1/connectors/mongodb.html


作者:cosmozhu --90后的老父亲,专注于保护地球的程序员 个人网站:https://www.cosmozhu.fun 欢迎转载,转载时请注明出处。

相关文章

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink-1.9流计算开发:十三、min、minBy、max、maxBy函数

    Flink是下一代大数据计算平台,可处理流计算和批量计算。《Flink-1.9流计算开发:十三、min、minBy、max、maxBy函数》cosmozhu写的...

    cosmozhu
  • Kafka:MirrorMaker-V1搭建步骤

    通过上一篇文章Kafka:MirrorMaker-V1我们已经知道了MirrorMaker-V1的基本概念,这篇文章我们来给Kafka-cluster搭建一个m...

    cosmozhu
  • java代码薄:常用的缓存

    现在的项目中应该基本都用redis做缓存了,本文提供一个简单的线程安全缓存类,提供超时淘汰策略。方便没必要引进第三方缓存时使用。

    cosmozhu
  • java获取指定文件夹下的所有文件名

    http://blog.csdn.net/tomorrowzm/article/details/3693653

    bear_fish
  • [C#]使用Costura.Fody将源DLL合并到目标EXE

    本文为原创文章,如转载,请在网页明显位置标明原文名称、作者及网址,谢谢! 一、本文主要是使用Costura.Fody工具将源DLL合并到目标EXE,因此,需要...

    CNXY
  • [C#]使用Costura.Fody将源DLL合并到目标EXE

    本文为原创文章,如转载,请在网页明显位置标明原文名称、作者及网址,谢谢![http://www.cnc6.cn]

    CNXY
  • Python学习-hashlib

    Python的hashlib提供了常见的摘要算法,主要提供 SHA1, SHA224, SHA256, SHA384, SHA512 ,MD5 算法。

    py3study
  • JoomScan:一款开源的OWASP Joomla漏洞扫描器

    OWASPJoomla!漏洞扫描器(JoomScan)是一个开源项目,其主要目的是实现漏洞检测的自动化,以增强Joomla CMS开发的安全性。该工具基于Per...

    FB客服
  • ubuntu16.04安装mongodb教程

    因为最近需要用到mongodb,就在腾讯云的主机上搭建了一个mongodb服务,使用主机为ubuntu 16.04(腾讯云主机的用户名为ubuntu…),这里记...

    用户1188347
  • 超级简单POI多sheet导出Excel实战

    这里导出两个sheet为例进行讲解,第一个sheet导出学生基本信息,表结构和数据参考上一章节,第二个sheet导出区域基本信息,具体数据和脚本如下

    sunny1009

扫码关注云+社区

领取腾讯云代金券