前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Databus调研踩坑记录

Databus调研踩坑记录

作者头像
用户2825413
发布2019-07-15 17:51:16
2K0
发布2019-07-15 17:51:16
举报

简介:

Databus是一个低延迟、可靠的、支持事务的、保持一致性的数据变更抓取系统。由LinkedIn于2013年开源。Databus通过挖掘数据库日志的方式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化client实时获取变更并进行其他业务逻辑。

现实期望:

计划基于binlog同步,DB到DB,DB到es的管道,实现业务层面上的数据解耦. 以下是前期调研接触到的一些安装、测试环节涉及的一些流程

以下是简单脱敏后的草图,欢迎大家留言指出问题点:

下面是安装调试具体遇到的一些错误情况

错误1:

Could not resolve all files for configuration ':databus2-relay:databus2-event-producer-mock:compileClasspath'.
> Could not find ojdbc6.jar (com.oracle:ojdbc6:11.2.0.2.0).
  Searched in the following locations:
      file:/Users/wenba/Desktop/tools/databus/databus/sandbox-repo/com/oracle/ojdbc6/11.2.0.2.0/ojdbc6-11.2.0.2.0.jar

解决方式: 下载一个ojdbc6-11.2.0.2.0.jar的jar包放到/Users/wenba/Desktop/tools/databus/databus/sandbox-repo/com/oracle/ojdbc6/11.2.0.2.0/目录下。

错误2:

Cannot bind to URL rmi://localhost:1099 ServiceUnavailableException

修改: com.linkedin.databus2.core.container.netty.ServerContainer 的 initializeContainerJmx() 方法中加上一句

LocateRegistry.createRegistry(_containerStaticConfig.getJmx().getRmiRegistryPort());
protected void initializeContainerJmx() {
    if (_containerStaticConfig.getJmx().isRmiEnabled()) {
        try {
            JMXServiceURL jmxServiceUrl = new JMXServiceURL(
                    "service:jmx:rmi://" + _containerStaticConfig.getJmx().getJmxServiceHost() + ":"
                            + _containerStaticConfig.getJmx().getJmxServicePort() + "/jndi/rmi://"
                            + _containerStaticConfig.getJmx().getRmiRegistryHost() + ":"
                            + _containerStaticConfig.getJmx().getRmiRegistryPort() + "/jmxrmi"
                            + _containerStaticConfig.getJmx().getJmxServicePort());
          LocateRegistry.createRegistry(_containerStaticConfig.getJmx().getRmiRegistryPort());
            _jmxConnServer = JMXConnectorServerFactory.newJMXConnectorServer(jmxServiceUrl, null, getMbeanServer());
        } catch (Exception e) {
            LOG.warn("Unable to instantiate JMX server", e);
        }
    }
}

错误3

com.linkedin.databus.core.DatabusRuntimeException: com.linkedin.databus2.core.DatabusException: pk is assigned to key but fieldList is id,firstName,lastName,birthDate,deleted,

解决:数据库名字 表结构一定要和实际对应

source-person.json

{
    "name" : "person",
    "id"  : 1,
    "uri" : "mysql://root%2Fwangyu123@10.1.58.111:3306/1001/mysql-bin",
    "slowSourceQueryThreshold" : 2000,
    "sources" :
    [
        {
        "id" : 40,
        "name" : "com.linkedin.events.example.person.Person",
        "uri": "test.person",
        "partitionFunction" : "constant:1"
         }
    ]
}

字段定义 com.linkedin.events.example.person.Person.1.avsc

{
  "name" : "Person_V1",
  "doc" : "Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST",
  "type" : "record",
  "meta" : "dbFieldName=sy$person;pk=key;",
  "namespace" : "com.linkedin.events.example.test",
  "fields" : [ {
    "name" : "key",
    "type" : [ "long", "null" ],
    "meta" : "dbFieldName=KEY;dbFieldPosition=0;"  //主键ID
  }, {
    "name" : "firstName",
    "type" : [ "string", "null" ],
    "meta" : "dbFieldName=FIRST_NAME;dbFieldPosition=1;"
  }, {
    "name" : "lastName",
    "type" : [ "string", "null" ],
    "meta" : "dbFieldName=LAST_NAME;dbFieldPosition=2;"
  }, {
    "name" : "birthDate",
    "type" : [ "long", "null" ],
    "meta" : "dbFieldName=BIRTH_DATE;dbFieldPosition=3;"
  }, {
    "name" : "deleted",
    "type" : [ "string", "null" ],
    "meta" : "dbFieldName=DELETED;dbFieldPosition=4;"
  } ]
}

错误4 多个端口监控报9001端口冲突:

databus2-example/databus2-example-client-pkg/conf/client_person.properties

#指定端口
databus.client.container.httpPort=9111
databus.relay.container.httpPort=11125
databus.relay.container.jmx.rmiEnabled=false
.....

databus2-example/databus2-example-client-pkg/conf/client_user.properties

#指定端口
databus.client.container.httpPort=9112
databus.relay.container.httpPort=11126
databus.relay.container.jmx.rmiEnabled=false
databus.relay.eventBuffer.allocationPolicy=DIRECT_MEMORY

剩下的模仿person example新建就可以了, 具体错误见:https://github.com/linkedin/databus/issues/26

错误5 安装mysql依赖拓展包

build.gradle中:

subprojects {
    apply from: rootProject.file("subprojects.gradle")
    apply plugin:'java'

    dependencies {
        //runtime externalDependency.log4j
        // Force easymock to version 3.1. One of the espresso dependencies changes it to 2.4
        // and v2.4 does not support mocking of classes, causing our espresso unit tests
        // to break.
        //runtime(externalDependency.easymock) {
        //    force = true
        //}
        //compile(externalDependency.easymock) {
        //    force = true
        //}
        compile ("mysql:mysql-connector-java:5.1.24")

    }
}

task wrapper(type: Wrapper) {
    gradleVersion = '1.8'
}

tasks.withType(JavaCompile) { options.encoding = "UTF-8" }

编写业务逻辑

/Users/wenba/Desktop/tools/databus/databus/databus2-example/databus2-example-client/src/main/java/com/linkedin/databus/client/example/PersonConsumer.java
     //获取字段
     Utf8 firstName = (Utf8) decodedEvent.get("firstName");
      Utf8 lastName = (Utf8) decodedEvent.get("lastName");
      Long birthDate = (Long) decodedEvent.get("birthDate");
      Utf8 deleted = (Utf8) decodedEvent.get("deleted");
      //获取业务标示ID  40
      LOG.info("id :"+event.getSourceId());
      //获取主键
      LOG.info(decodedEvent.get("key")); 

获取操作事件

//DbusOpcode.UPSERT  DbusOpcode.DELETE
LOG.info(event.getOpcode());  //UPSERT  DELETE 

//如果是更新或添加操作
event.getOpcode().equals(DbusOpcode.UPSERT)
一步步再监控一张表
数据源端:

第一步:

databus2-example/databus2-example-relay-pkg/conf/下面创建一个source源

第二步:

databus2-example/databus2-example-relay-pkg/schemas_registry下创建个表字段定义。把新加的文件名加到 index.schemas_registry

第三步:

databus2-example/databus2-example-relay-pkg/script下创建启动脚本,脚本中的涉及到的source_name都要改成你新加的这个包名。

第四步(代码部分):

databus2-example/databus2-example-relay/src/main/java/com/linkedin/databus/relay/example/下新创建启动类。将包名、配置json改成新创建的

客户端:

第六步:

databus2-example/databus2-example-client-pkg/conf/下新建配置文件,修改不同配置

第七步:

databus2-example/databus2-example-client-pkg/script/start-user-client.sh下再新建个启动脚本,改成新的source_name

第八步(代码部分):

databus2-example/databus2-example-client/src/main/java/com/linkedin/databus/client/example/ 新建client监听类,将里面的包名、监听端口改成数据源端设置的端口号。注册消费类

第九步(代码部分)

databus2-example/databus2-example-client/src/main/java/com/linkedin/databus/client/example/ 新建消费类,主要代码在processEvent下。数据事件和源封装在DbusEvent和DbusEventDecoder对象中。

第十步(代码部分)

创建具体消费类,对数据进行处理


构建:

编译:

Build:Databus采用gradle进行编译,所以需要安装gradle环境,安装安成后进入databus根目录执行命令

gradle -Dopen_source=true assemble

即可完成build,成功后在databus根目录下生成名为build的文件夹

启动Relay:

cd build/databus2-example-relay-pkg/distributions
tar -zxvf databus2-example-relay-pkg.tar.gz解压
执行启动脚本  
person表
./bin/start-example-relay.sh person -Y ./conf/sources-person.json 
user表
 ./bin/start-user-relay.sh user -Y ./conf/sources-user.json 



执行命令 curl -s http://localhost:11115/sources

启动Client:

cd build/databus2-example-client-pkg/distributions
tar -zxvf databus2-example-client-pkg.tar.gz解压
执行启动脚本 
    ./bin/start-example-client.sh person
    ./bin/start-user-client.sh user

执行命令 curl http://localhost:11115/relayStats/outbound/http/clients

测试:

Relay和Client启动成功后,就已经开始对person表进行数据变更捕获了。日志查看:

databus2-example-relay-pkg/distributions/logs下的relay.log

databus2-example-client-pkg/distributions/logs下的client.log

希望大家以后工作中如果有机会用到,可以避免踩一些坑。

参考文档:https://blog.csdn.net/feng12345zi/article/details/80843554 https://www.jianshu.com/p/9df54eb1ec35

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-11-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 呆呆熊的技术路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介:
  • 错误1:
  • 错误2:
  • 错误3
  • 错误4 多个端口监控报9001端口冲突:
  • 错误5 安装mysql依赖拓展包
  • 编写业务逻辑
    • 一步步再监控一张表
      • 数据源端:
      • 客户端:
  • 构建:
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档