前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flinkx数据同步

flinkx数据同步

作者头像
yiduwangkai
发布2021-11-22 10:42:03
1.9K0
发布2021-11-22 10:42:03
举报
文章被收录于专栏:大数据进阶大数据进阶

本文会描述如下几部分的数据同步

  1. mysql2mysql
  2. mysql2hive

flinkx的版本1.12-SNAPSHOT

1.拉取代码

git clone https://github.com/DTStack/flinkx.git

2.编译

mvn clean package -DskipTests=true

注:这里需要提前运行sh install_jars.sh脚本

另在执行如下命令

代码语言:javascript
复制
mvn install:install-file -DgroupId=com.dm -DartifactId=Dm7JdbcDriver18 -Dversion=7.6.0.197 -Dpackaging=jar -Dfile=Dm7JdbcDriver18.jar

3.运行

注:这里要先删除掉lib目录下面的所有jar,不然会出现如下错误

代码语言:javascript
复制
错误: 找不到或无法加载主类 .Users.wangkai.apps.src.github.flinkx.lib.flinkx-launcher-1.6.jar

json模式

local模式测试

1.命令

代码语言:javascript
复制
bin/flinkx -mode local \
-jobType sync \
-job /Users/wangkai/apps/install/flinkx/mysql2mysql.json \
-flinkxDistDir flinkx-dist

2.mysql2mysql.json

代码语言:javascript
复制
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "birthday",
                "type": "TIMESTAMP"
              },
              {
                "name": "ts",
                "type": "TIMESTAMP"
              }
            ],
            "customSql": "",
            "where": "id < 1000",
            "splitPk": "id",
            "increColumn": "id",
            "startLocation": "2",
            "polling": true,
            "pollingInterval": 3000,
            "queryTimeOut": 1000,
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://localhost:3306/test?useSSL=false"
                ],
                "table": [
                  "users"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://localhost:3306/test?useSSL=false",
                "table": [
                  "test_users2"
                ]
              }
            ],
            "writeMode": "insert",
            "flushIntervalMills":"3000",
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "birthday",
                "type": "TIMESTAMP"
              },
              {
                "name": "ts",
                "type": "TIMESTAMP"
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "restoreColumnName": "id"
      },
      "speed": {
        "channel": 1,
        "bytes": 0
      }
    }
  }
}

per-job模式

1.命令

代码语言:javascript
复制
bin/flinkx \
    -mode yarn-per-job \
    -jobType sync \
    -job /Users/wangkai/apps/install/flinkx/mysql2mysql.json \
    -flinkxDistDir flinkx-dist \
    -flinkConfDir /Users/wangkai/apps/install/flink-1.12.1/conf \
    -flinkLibDir /Users/wangkai/apps/install/flink-1.12.1/lib

2.mysql2mysql.json

代码语言:javascript
复制
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "birthday",
                "type": "TIMESTAMP"
              },
              {
                "name": "ts",
                "type": "TIMESTAMP"
              }
            ],
            "customSql": "",
            "where": "id < 1000",
            "splitPk": "id",
            "increColumn": "id",
            "startLocation": "2",
            "polling": true,
            "pollingInterval": 3000,
            "queryTimeOut": 1000,
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://localhost:3306/test?useSSL=false"
                ],
                "table": [
                  "users"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://localhost:3306/test?useSSL=false",
                "table": [
                  "test_users3"
                ]
              }
            ],
            "writeMode": "insert",
            "flushIntervalMills":"3000",
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "birthday",
                "type": "TIMESTAMP"
              },
              {
                "name": "ts",
                "type": "TIMESTAMP"
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "restoreColumnName": "id"
      },
      "speed": {
        "channel": 1,
        "bytes": 0
      }
    }
  }
}

这里会有些许错误:

1.hadoop相关的错误

代码语言:javascript
复制
]
2021-11-09 14:57:37,102 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - Starting the SlotManager.
2021-11-09 14:57:37,109 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 2 ms
2021-11-09 14:57:37,111 ERROR org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - Exception on heartbeat
java.lang.NoSuchMethodError: org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse.getAMRMToken()Lorg/apache/hadoop/yarn/api/records/Token;
    at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:307) ~[hadoop-yarn-client-2.7.5.jar:?]
    at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:237) [hadoop-yarn-client-2.7.5.jar:?]
2021-11-09 14:57:37,113 INFO  org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - Interrupted while waiting for queue
java.lang.InterruptedException: null
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[?:1.8.0_291]
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) ~[?:1.8.0_291]
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) ~[?:1.8.0_291]
    at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) [hadoop-yarn-client-2.7.5.jar:?]
2021-11-09 14:57:37,115 ERROR org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl [] - Stopping callback due to:
java.lang.NoSuchMethodError: org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse.getAMRMToken()Lorg/apache/hadoop/yarn/api/records/Token;
代码语言:javascript
复制
org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newBuilder()Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest$AllocateRequestBuilder;

需要修改flinkx依赖的hadoop的代码,统一hadoop version

2.flink相关的错误

代码语言:javascript
复制
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not deserialize inputs
    at org.apache.flink.streaming.api.graph.StreamConfig.getInputs(StreamConfig.java:275) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:281) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.wrapOperatorIntoOutput(OperatorChain.java:671) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:617) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:549) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:170) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:509) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_291]
Caused by: java.io.InvalidClassException: org.apache.flink.streaming.api.graph.StreamConfig$NetworkInputConfig; local class incompatible: stream classdesc serialVersionUID = 3698633776553163849, local class serialVersionUID = -3137689219135046939

flink版本要统一

mysql2hive

代码语言:javascript
复制
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "birthday",
                "type": "TIMESTAMP"
              },
              {
                "name": "ts",
                "type": "TIMESTAMP"
              }
            ],
            "customSql": "",
            "where": "id < 1000",
            "splitPk": "",
            "queryTimeOut": 1000,
            "username": "root",
            "password": "root",
            "requestAccumulatorInterval": 2,
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://localhost:3306/test?useSSL=false"
                ],
                "table": [
                  "users"
                ]
              }
            ]
          }
        },
        "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://localhost:10000/kudu",
            "username" : "wangkai",
            "password" : "wangkai",
            "fileType" : "text",
            "writeMode" : "overwrite",
            "compress" : "",
            "schema" : "kudu",
            "charsetName" : "UTF-8",
            "maxFileSize" : 1073741824,
            "tablesColumn" : "{\"demonstrate_users\": [{\"key\": \"id\",\"type\": \"BIGINT\"}, {\"key\": \"name\",\"type\": \"string\"}, {\"key\": \"birthday\",\"type\": \"TIMESTAMP\"},{\"key\": \"ts\",\"type\": \"TIMESTAMP\"}]}",
            "defaultFS" : "hdfs://localhost:9000"
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "maxRowNumForCheckpoint": 0,
        "isRestore": false,
        "restoreColumnName": "",
        "restoreColumnIndex": 0
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0
      },
      "speed": {
        "bytes": 1048576,
        "channel": 1
      }
    }
  }
}

命令

代码语言:javascript
复制
bin/flinkx \
    -mode yarn-per-job \
    -jobType sync \
    -job /Users/wangkai/apps/install/flinkx/mysql2mysql.json \
    -flinkxDistDir flinkx-dist \
    -flinkConfDir /Users/wangkai/apps/install/flink-1.12.1/conf \
    -flinkLibDir /Users/wangkai/apps/install/flink-1.12.1/lib
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档