Databus调研踩坑记录

简介:

Databus是一个低延迟、可靠的、支持事务的、保持一致性的数据变更抓取系统。由LinkedIn于2013年开源。Databus通过挖掘数据库日志的方式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化client实时获取变更并进行其他业务逻辑。

现实期望:

计划基于binlog同步,DB到DB,DB到es的管道,实现业务层面上的数据解耦. 以下是前期调研接触到的一些安装、测试环节涉及的一些流程

以下是简单脱敏后的草图,欢迎大家留言指出问题点:

下面是安装调试具体遇到的一些错误情况

错误1:

Could not resolve all files for configuration ':databus2-relay:databus2-event-producer-mock:compileClasspath'.
> Could not find ojdbc6.jar (com.oracle:ojdbc6:11.2.0.2.0).
  Searched in the following locations:
      file:/Users/wenba/Desktop/tools/databus/databus/sandbox-repo/com/oracle/ojdbc6/11.2.0.2.0/ojdbc6-11.2.0.2.0.jar

解决方式: 下载一个ojdbc6-11.2.0.2.0.jar的jar包放到/Users/wenba/Desktop/tools/databus/databus/sandbox-repo/com/oracle/ojdbc6/11.2.0.2.0/目录下。

错误2:

Cannot bind to URL rmi://localhost:1099 ServiceUnavailableException

修改: com.linkedin.databus2.core.container.netty.ServerContainer 的 initializeContainerJmx() 方法中加上一句

LocateRegistry.createRegistry(_containerStaticConfig.getJmx().getRmiRegistryPort());
protected void initializeContainerJmx() {
    if (_containerStaticConfig.getJmx().isRmiEnabled()) {
        try {
            JMXServiceURL jmxServiceUrl = new JMXServiceURL(
                    "service:jmx:rmi://" + _containerStaticConfig.getJmx().getJmxServiceHost() + ":"
                            + _containerStaticConfig.getJmx().getJmxServicePort() + "/jndi/rmi://"
                            + _containerStaticConfig.getJmx().getRmiRegistryHost() + ":"
                            + _containerStaticConfig.getJmx().getRmiRegistryPort() + "/jmxrmi"
                            + _containerStaticConfig.getJmx().getJmxServicePort());
          LocateRegistry.createRegistry(_containerStaticConfig.getJmx().getRmiRegistryPort());
            _jmxConnServer = JMXConnectorServerFactory.newJMXConnectorServer(jmxServiceUrl, null, getMbeanServer());
        } catch (Exception e) {
            LOG.warn("Unable to instantiate JMX server", e);
        }
    }
}

错误3

com.linkedin.databus.core.DatabusRuntimeException: com.linkedin.databus2.core.DatabusException: pk is assigned to key but fieldList is id,firstName,lastName,birthDate,deleted,

解决:数据库名字 表结构一定要和实际对应

source-person.json

{
    "name" : "person",
    "id"  : 1,
    "uri" : "mysql://root%2Fwangyu123@10.1.58.111:3306/1001/mysql-bin",
    "slowSourceQueryThreshold" : 2000,
    "sources" :
    [
        {
        "id" : 40,
        "name" : "com.linkedin.events.example.person.Person",
        "uri": "test.person",
        "partitionFunction" : "constant:1"
         }
    ]
}

字段定义 com.linkedin.events.example.person.Person.1.avsc

{
  "name" : "Person_V1",
  "doc" : "Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST",
  "type" : "record",
  "meta" : "dbFieldName=sy$person;pk=key;",
  "namespace" : "com.linkedin.events.example.test",
  "fields" : [ {
    "name" : "key",
    "type" : [ "long", "null" ],
    "meta" : "dbFieldName=KEY;dbFieldPosition=0;"  //主键ID
  }, {
    "name" : "firstName",
    "type" : [ "string", "null" ],
    "meta" : "dbFieldName=FIRST_NAME;dbFieldPosition=1;"
  }, {
    "name" : "lastName",
    "type" : [ "string", "null" ],
    "meta" : "dbFieldName=LAST_NAME;dbFieldPosition=2;"
  }, {
    "name" : "birthDate",
    "type" : [ "long", "null" ],
    "meta" : "dbFieldName=BIRTH_DATE;dbFieldPosition=3;"
  }, {
    "name" : "deleted",
    "type" : [ "string", "null" ],
    "meta" : "dbFieldName=DELETED;dbFieldPosition=4;"
  } ]
}

错误4 多个端口监控报9001端口冲突:

databus2-example/databus2-example-client-pkg/conf/client_person.properties

#指定端口
databus.client.container.httpPort=9111
databus.relay.container.httpPort=11125
databus.relay.container.jmx.rmiEnabled=false
.....

databus2-example/databus2-example-client-pkg/conf/client_user.properties

#指定端口
databus.client.container.httpPort=9112
databus.relay.container.httpPort=11126
databus.relay.container.jmx.rmiEnabled=false
databus.relay.eventBuffer.allocationPolicy=DIRECT_MEMORY

剩下的模仿person example新建就可以了, 具体错误见:https://github.com/linkedin/databus/issues/26

错误5 安装mysql依赖拓展包

build.gradle中:

subprojects {
    apply from: rootProject.file("subprojects.gradle")
    apply plugin:'java'

    dependencies {
        //runtime externalDependency.log4j
        // Force easymock to version 3.1. One of the espresso dependencies changes it to 2.4
        // and v2.4 does not support mocking of classes, causing our espresso unit tests
        // to break.
        //runtime(externalDependency.easymock) {
        //    force = true
        //}
        //compile(externalDependency.easymock) {
        //    force = true
        //}
        compile ("mysql:mysql-connector-java:5.1.24")

    }
}

task wrapper(type: Wrapper) {
    gradleVersion = '1.8'
}

tasks.withType(JavaCompile) { options.encoding = "UTF-8" }

编写业务逻辑

/Users/wenba/Desktop/tools/databus/databus/databus2-example/databus2-example-client/src/main/java/com/linkedin/databus/client/example/PersonConsumer.java
     //获取字段
     Utf8 firstName = (Utf8) decodedEvent.get("firstName");
      Utf8 lastName = (Utf8) decodedEvent.get("lastName");
      Long birthDate = (Long) decodedEvent.get("birthDate");
      Utf8 deleted = (Utf8) decodedEvent.get("deleted");
      //获取业务标示ID  40
      LOG.info("id :"+event.getSourceId());
      //获取主键
      LOG.info(decodedEvent.get("key")); 

获取操作事件

//DbusOpcode.UPSERT  DbusOpcode.DELETE
LOG.info(event.getOpcode());  //UPSERT  DELETE 

//如果是更新或添加操作
event.getOpcode().equals(DbusOpcode.UPSERT)

一步步再监控一张表

数据源端:

第一步:

databus2-example/databus2-example-relay-pkg/conf/下面创建一个source源

第二步:

databus2-example/databus2-example-relay-pkg/schemas_registry下创建个表字段定义。把新加的文件名加到 index.schemas_registry

第三步:

databus2-example/databus2-example-relay-pkg/script下创建启动脚本,脚本中的涉及到的source_name都要改成你新加的这个包名。

第四步(代码部分):

databus2-example/databus2-example-relay/src/main/java/com/linkedin/databus/relay/example/下新创建启动类。将包名、配置json改成新创建的

客户端:

第六步:

databus2-example/databus2-example-client-pkg/conf/下新建配置文件,修改不同配置

第七步:

databus2-example/databus2-example-client-pkg/script/start-user-client.sh下再新建个启动脚本,改成新的source_name

第八步(代码部分):

databus2-example/databus2-example-client/src/main/java/com/linkedin/databus/client/example/ 新建client监听类,将里面的包名、监听端口改成数据源端设置的端口号。注册消费类

第九步(代码部分)

databus2-example/databus2-example-client/src/main/java/com/linkedin/databus/client/example/ 新建消费类,主要代码在processEvent下。数据事件和源封装在DbusEvent和DbusEventDecoder对象中。

第十步(代码部分)

创建具体消费类,对数据进行处理


构建:

编译:

Build:Databus采用gradle进行编译,所以需要安装gradle环境,安装安成后进入databus根目录执行命令

gradle -Dopen_source=true assemble

即可完成build,成功后在databus根目录下生成名为build的文件夹

启动Relay:

cd build/databus2-example-relay-pkg/distributions
tar -zxvf databus2-example-relay-pkg.tar.gz解压
执行启动脚本  
person表
./bin/start-example-relay.sh person -Y ./conf/sources-person.json 
user表
 ./bin/start-user-relay.sh user -Y ./conf/sources-user.json 



执行命令 curl -s http://localhost:11115/sources

启动Client:

cd build/databus2-example-client-pkg/distributions
tar -zxvf databus2-example-client-pkg.tar.gz解压
执行启动脚本 
    ./bin/start-example-client.sh person
    ./bin/start-user-client.sh user

执行命令 curl http://localhost:11115/relayStats/outbound/http/clients

测试:

Relay和Client启动成功后,就已经开始对person表进行数据变更捕获了。日志查看:

databus2-example-relay-pkg/distributions/logs下的relay.log

databus2-example-client-pkg/distributions/logs下的client.log

希望大家以后工作中如果有机会用到,可以避免踩一些坑。

参考文档:https://blog.csdn.net/feng12345zi/article/details/80843554 https://www.jianshu.com/p/9df54eb1ec35

原文发布于微信公众号 - 呆呆熊一点通(gh_93f28f51010a)

原文发表时间:2018-11-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券