前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用户投稿 | IDEA 调试 Dinky Kafka 问题经验分享

用户投稿 | IDEA 调试 Dinky Kafka 问题经验分享

作者头像
文末丶
发布2024-02-29 16:27:33
3010
发布2024-02-29 16:27:33
举报
文章被收录于专栏:DataLink数据中台

导读:本文来自社区用户武舞悟老师在 IDEA 中逐步排查 Dinky 使用 Flink kafka 连接器问题的经验分享。

GitHub 地址

https://github.com/DataLinkDC/dinky

https://gitee.com/DataLinkDC/Dinky

欢迎大家为 Dinky 送上小星星~

一、准备工作

代码编译

本文所使用的IDEA版本信息如下:

从https://gitee.com/DataLinkDC/Dinky.git下载代码后,切换到0.7.5分支,不做任何修改。jdk、maven 等基本的,自己配好。

在 IDEA 右边 Maven 选项中进行 Maven Profile 勾选:dev、flink-1.17、jdk1.8、scala-2.12、web,然后继续使用它进行 maven clean,maven install,随后就是漫长的等待(编译时间长短取决机器硬件),等待编译结束。

运行环境

说明项

内容

Hadoop 版本

hadoop-3.1.4

Flink 任务执行模式

Yarn Session

Flink 版本

flink-1.17.0

Dinky 版本

dlink-release-0.7.5

Kafka 版本

kafka_2.12-3.0.0

Kafka 运行模式

zookeeper

Mysql 版本

5.7.28

HDFS集群、YARN集群、Dlink环境的搭建和启动,这里略过,假设已经完成。

初始化数据库

在 MySQL 数据库创建 dlink_075 用户并在 dlink_075 数据库中执行 dlink-doc/sql/dinky.sql 文件。

IDEA 运行配置

查看dlink根目录下

/dlink-admin/src/main/resources/application.ym 文件,该文件最上面可以看到:

代码语言:javascript
复制
spring:
  datasource:
    url: jdbc:mysql://${MYSQL_ADDR:127.0.0.1:3306}/${MYSQL_DATABASE:dlink}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
    username: ${MYSQL_USERNAME:dlink}
    password: ${MYSQL_PASSWORD:dlink}
    driver-class-name: com.mysql.cj.jdbc.Driver

可以看到${}中有些变量(如:MYSQL_ADDR、MYSQL_DATABAS、MYSQL_USERNAME、MYSQL_PASSWORD),是可以外部指定的,如果没有指定,则使用冒号后面的值。

现在,在IDEA设置 yml 配置文件的参数,进入 Run/Debug Conigurations 配置页面(Run ---> Edit Conigurations...):

在Environment variables中填入相关变量的值 ,变量名=变量值,不同部分间用分号间隔:

代码语言:javascript
复制
MYSQL_ADDR=192.168..;MYSQL_DATABASE=dlink_075;MYSQL_USERNAME=root;MYSQL_PASSWORD=

二、运行 Flink SQL 作业

创建 Session 集群

在 Flink 根目录下执行以下命令向 yarn 集群申请资源,开启一个 yarn会话,启动 Flink 集群:

代码语言:javascript
复制
./bin/yarn-session.sh -d -nm ww

可以在 Yarn Web UI 中看到我们新启动的 yarn 会话:

参数说明:

  • -d:分离模式,如果你不想让 Flink yarn 客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,yarn session也可以后台运行。
  • -nm(--name):配置在 yarn UI界面上显示的任务名。

运行 Flink SQL CDC 作业

在编辑器中输入以下内容:

代码语言:javascript
复制
DROP TABLE IF EXISTS employees;
CREATE TABLE IF NOT EXISTS employees (
    `emp_no` INT NOT NULL,
    `birth_date` DATE,
    `first_name` STRING,
    `last_name` STRING,
    `gender` STRING,
    `hire_date` DATE,
    proctime as PROCTIME(),
    PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.*.*',
    'port' = '3306',
    'username' = 'root',
    'password' = '****',
    'scan.incremental.snapshot.enabled' = 'true',
    'debezium.snapshot.mode' = 'latest-offset',
    'database-name' = 'nfp_ep',
    'table-name' = 'employees_dinky'
);
DROP TABLE IF EXISTS dim_sex;
CREATE TABLE dim_sex (
    sex STRING,
    caption STRING,
    PRIMARY KEY (sex) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.*.*:3306/employees',
    'table-name' = 'dim_sex',
    'username' = 'root',
    'password' = '****'
);

select
    *
from
    employees
    left join dim_sex FOR SYSTEM_TIME AS OF employees.proctime ON employees.gender = dim_sex.sex;

这是一个最基本的 FlinkSQL 任务,运行正常,源端表有新增或修改,可以在编辑界面下方的结果 Sheet 看到最新的结果变化,点击“获取最新数据”按钮,可以看到数据:

运行 Flink SQL kafka 作业

在编辑器中输入以下内容:

代码语言:javascript
复制
DROP TABLE IF EXISTS employees_kafka;
CREATE TABLE IF NOT EXISTS employees_kafka (
    `emp_no` INT NOT NULL,
    `birth_date` DATE,
    `first_name` STRING,
    `last_name` STRING,
    `gender` STRING,
    `hire_date` DATE
) WITH (
    'connector' = 'kafka',
    'topic' = 'flink-cdc-kafka',
    'properties.bootstrap.servers' = 'bd171:9092,bd172:9092,bd173:9092',
    'properties.group.id' = 'flink-cdc-kafka-group',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
);
CREATE TABLE IF NOT EXISTS employees_sink (
    `emp_no` INT NOT NULL,
    `birth_date` DATE,
    `first_name` STRING,
    `last_name` STRING,
    `gender` STRING,
    `hire_date` DATE,
    PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.*.*:3306/employees?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',
    'table-name' = 'employees_kafka_sink',
    'driver' = 'com.mysql.cj.jdbc.Driver', 
    'username' = 'root', 
    'password' = '****' 
    );
insert into
    employees_sink
select
    emp_no,
    birth_date,
    first_name,
    last_name,
    gender,
    hire_date
from
    employees_kafka;

运行时会报错,如下:

代码语言:javascript
复制
[dlink] 2023-11-30 21:36:27.751  ERROR 16072 --- [nio-8888-exec-9] com.dlink.utils.LogUtil: 2023-11-30T21:36:27.750: Exception in executing FlinkSQL:
insert into
    employees_sink
select
    emp_no,
    birth_date,
    first_name,
    last_name,
    gender,
    hire_date
from
    employees_kafka 
Error message:
 org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.employees_kafka'.

Table options are:

'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='bd171:9092,bd172:9092,bd173:9092'
'properties.group.id'='flink-cdc-kafka-group'
'scan.startup.mode'='latest-offset'
'topic'='flink-cdc-kafka'
  at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:167)
  at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:192)
  ......
  at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
  at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.

Available factory identifiers are:

raw
  at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)
  at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:1130)
  at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:1046)
  at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.getValueDecodingFormat(KafkaDynamicTableFactory.java:330)
  at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:183)
  at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:164)
  ... 114 more

三、问题解决

解决依赖缺失

由上述异常可得知缺少json相关依赖,于是在 dlink-admin 的pom.xml文件里加上以下依赖:

代码语言:javascript
复制
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-json</artifactId>
     <version>${flink.version}</version>
</dependency>

准备再次编译整个Dinky工程,你会问为什么不单独编译 dlink-admin,因为会报错,如下:

代码语言:javascript
复制
[INFO] --- spotless-maven-plugin:2.27.1:check (default) @ dlink-admin ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  36.993 s
[INFO] Finished at: 2023-11-30T22:02:29+08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal com.diffplug.spotless:spotless-maven-plugin:2.27.1:check (default) on project dlink-admin: Execution default of goal com.diffplug.spotless:spotless-maven-plugin:2.27.1:check failed: Unable to locate file with path: style/spotless_dlink_formatter.xml: Could not find resource 'style/spotless_dlink_formatter.xml'. -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException

编译整个Dinky工程前,在IDEA右边Maven选项中,把Maven Profile里对“web”勾选去掉,静态的web项目再编译是没必要的,并且编译它太耗时了。

排查 kafka 依赖冲突

经过对整个Dinky工程的编译,启动 dinky,再次执行上面的 FlinkSQL ,会发现又有问题,IDEA中有以下报错:

代码语言:javascript
复制
[dlink] 2023-11-30 22:22:52.386   INFO  5668 --- [ent-IO-thread-1] org.apache.flink.client.program.rest.RestClusterClient: Submitting job 'kafka到mysql单表employees_savepoint' (16258731657846a524dd565dcfbef607).
[dlink] 2023-11-30 22:22:54.522   INFO  5668 --- [ent-IO-thread-4] org.apache.flink.client.program.rest.RestClusterClient: Successfully submitted job 'kafka到mysql单表employees_savepoint' (16258731657846a524dd565dcfbef607) to 'http://bd171:18081'.
[dlink] 2023-11-30 22:22:54.698  ERROR  5668 --- [nio-8888-exec-8] com.dlink.utils.LogUtil: 2023-11-30T22:22:54.698: Exception in executing FlinkSQL:
insert into
    employees_sink
select
    emp_no,
    birth_date,
    first_name,
    last_name,
    gender,
    hire_date
from
    employees_kafka 
Error message:
 org.apache.flink.table.api.TableException: Failed to execute sql
  at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:938)
  at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883)
  .....
  at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
  at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'kafka到mysql单表employees_savepoint'.
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212)
  at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
  at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921)
  ... 94 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
  at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
  at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
  ......
  at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
  at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
  at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
  ......
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: employees_kafka[1] -> ConstraintEnforcer[2] -> Sink: employees_sink[2]
  at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
  at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
  ... 3 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: employees_kafka[1] -> ConstraintEnforcer[2] -> Sink: employees_sink[2]
  at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
  at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
  ... 3 more
Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: employees_kafka[1] -> ConstraintEnforcer[2] -> Sink: employees_sink[2]
  at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:229)
  ......
  at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
  ... 4 more
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
  at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
  ......
  at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
  ... 20 more

可以发现最核心的问题是:

代码语言:javascript
复制
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer

单独部署的 dinky 运行这个 FlinkSQL,是没有这个问题的;这看起来是kafka.clients 的原始包,与被着色(maven-shade-plugin执行的结果 )后的包发生类型不一致,翻一下 flink1.17 的源码,看看 flink-sql-connector-kafka 子项目的 pom.xml 文件,可以看到:

代码语言:javascript
复制
    <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <executions>
          <execution>
            <id>shade-flink</id>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <artifactSet>
                <includes>
                  <include>org.apache.flink:flink-connector-base</include>
                  <include>org.apache.flink:flink-connector-kafka</include>
                  <include>org.apache.kafka:*</include>
                </includes>
              </artifactSet>
              <filters>
                <filter>
                  <artifact>org.apache.kafka:*</artifact>
                  <excludes>
                    <exclude>kafka/kafka-version.properties</exclude>
                    <exclude>LICENSE</exclude>
                    <!-- Does not contain anything relevant.
                      Cites a binary dependency on jersey, but this is neither reflected in the
                      dependency graph, nor are any jersey files bundled. -->
                    <exclude>NOTICE</exclude>
                    <exclude>common/**</exclude>
                  </excludes>
                </filter>
              </filters>
              <relocations>
                <relocation>
                  <pattern>org.apache.kafka</pattern>
                  <shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

kafka.clients中的类的包名由“org.apache.kafka”着色成“org.apache.flink.kafka.shaded.org.apache.kafka”。上面以 kafka 作为数据源的FlinkSQL为什么会报错呢,回看IDEA中报错的关键点,其中有:

代码语言:javascript
复制
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
......
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
......
  at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
  at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:371)
  at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:348)
......  
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
  at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:534)
  at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:522)
  at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
  at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:471)
  at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
  at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
  ... 20 more

远端的flink集群的jobmaster运行中有错,在反序列化任务对象时出错,显然这个错是dinky和远端的flink集群中的类不一致引起的,再看看flink的jobmaster的日志:

显然,IDEA端的错误,部分就转自远端Flink集群的JobMaster错误;来看看IDEA中dinky项目所使用jar包中,与kafka在关的jar包有哪些,File ---> Projec Structure ---> Project Settings ---> Libraries:

Dinky源码工程在编译后运行时,用到的主要是:flink-connector-kafka-1.17.1.jar,kafka-clients-3.0.2.jar;再看看flink用的是什么kafka有关的包:

看来flink用的是:flink-sql-connector-kafka-1.17.0.jar;把flink的kafka换成和dinky端一样的flink-connector-kafka-1.17.1.jar吧;然后再次启动flink集群,启动成功后再次运行前面的FlinkSQL,在IDEA端运行正常,没有报错:

代码语言:javascript
复制
[dlink] 2023-11-30 23:31:13.209   INFO  5668 --- [adPool-Worker-5] com.dlink.api.FlinkAPI: Unable to connect to Flink JobManager: http://FINISHED
[dlink] 2023-11-30 23:31:13.216   WARN  5668 --- [adPool-Worker-5] com.alibaba.druid.pool.DruidAbstractDataSource: discard long time none received connection. , jdbcUrl : jdbc:mysql://192.168.1.198/dlink_075?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true, version : 1.2.8, lastPacketReceivedIdleMillis : 60278
[dlink] 2023-11-30 23:41:29.010   WARN  5668 --- [io-8888-exec-10] com.alibaba.druid.pool.DruidAbstractDataSource: discard long time none received connection. , jdbcUrl : jdbc:mysql://192.168.1.198/dlink_075?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true, version : 1.2.8, lastPacketReceivedIdleMillis : 615551
[dlink] 2023-11-30 23:42:58.238   WARN  5668 --- [io-8888-exec-10] com.alibaba.druid.pool.DruidAbstractDataSource: discard long time none received connection. , jdbcUrl : jdbc:mysql://192.168.1.198/dlink_075?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true, version : 1.2.8, lastPacketReceivedIdleMillis : 88776
[dlink] 2023-11-30 23:42:58.980   INFO  5668 --- [io-8888-exec-10] com.dlink.executor.Executor: Simple authentication mode
[dlink] 2023-11-30 23:42:58.996   INFO  5668 --- [io-8888-exec-10] com.dlink.executor.Executor: Simple authentication mode
[dlink] 2023-11-30 23:42:59.008   INFO  5668 --- [io-8888-exec-10] com.dlink.executor.Executor: Simple authentication mode
[dlink] 2023-11-30 23:42:59.017   INFO  5668 --- [io-8888-exec-10] com.dlink.executor.Executor: Simple authentication mode
[dlink] 2023-11-30 23:42:59.137   INFO  5668 --- [ent-IO-thread-1] org.apache.flink.client.program.rest.RestClusterClient: Submitting job 'kafka到mysql单表employees_savepoint' (2fa25e0cbab4e2ba11a6818fe2da2677).
[dlink] 2023-11-30 23:42:59.588   INFO  5668 --- [ent-IO-thread-4] org.apache.flink.client.program.rest.RestClusterClient: Successfully submitted job 'kafka到mysql单表employees_savepoint' (2fa25e0cbab4e2ba11a6818fe2da2677) to 'http://bd171:18081'.
[dlink] 2023-11-30 23:44:02.942   WARN  5668 --- [adPool-Worker-5] com.alibaba.druid.pool.DruidAbstractDataSource: discard long time none received connection. , jdbcUrl : jdbc:mysql://192.168.1.198/dlink_075?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true, version : 1.2.8, lastPacketReceivedIdleMillis : 60395

使用正确的 kafka 依赖

但是在远端flink集群上却有错:

还是把dinky和flink端的jar包统一成flink-sql-connector-kafka-1.17.0.jar吧;flink端更换jar的过程略;在IDEA中全局搜索包含flink-connector-kafka的xml文件:

在只更新dlink-flink-1.17下的,把它替换为:

代码语言:javascript
复制
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-sql-connector-kafka</artifactId>
     <version>${flink.version}</version>
</dependency>

然后:重新编译dinky项目,启动flink集群,启动dinky,运行前面的FlinkSQL,这回一切正常。

代码语言:javascript
复制
2023-11-30 23:55:34,027 WARN  org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig [] - The configuration 'client.id.prefix' was supplied but isn't a known config.
2023-11-30 23:55:34,027 WARN  org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig [] - The configuration 'partition.discovery.interval.ms' was supplied but isn't a known config.
2023-11-30 23:55:34,033 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: 2.7.2
2023-11-30 23:55:34,034 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: 37a1cc36bf4d76f3
2023-11-30 23:55:34,034 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1701335854027
2023-11-30 23:55:34,065 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0
2023-11-30 23:55:34,083 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Subscribed to partition(s): flink-cdc-kafka-0
2023-11-30 23:55:34,095 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Seeking to LATEST offset of partition flink-cdc-kafka-0
2023-11-30 23:55:35,618 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Resetting the last seen epoch of partition flink-cdc-kafka-0 to 3 since the associated topicId changed from null to oQYrIKJBRe-oWt7Q0nZi7A
2023-11-30 23:55:35,625 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Cluster ID: _nGd57n0QxGTp130IKGwDQ
2023-11-30 23:55:35,669 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Resetting offset for partition flink-cdc-kafka-0 to position FetchPosition{offset=1222, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[bd171:9092 (id: 171 rack: null)], epoch=3}}.

向kafka相关的主题里写入若干数据,数据最终端落到数据库表里了。

四、结论

最后总结为直接在 Dinky 及 Flink 环境中使用 flink-sql-connector-kafka 可以避免很多依赖问题。此外,类似与 mysql cdc 等依赖的引入也建议使用带有 flink-sql 的 jar 包可以避免很多问题。

以上浓缩了从发现问题到解决核心问题的全过程,这个过程,可不像上面描述的这么容易解决了。

继续努力!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-02-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Dinky开源 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 代码编译
  • 运行环境
  • 初始化数据库
  • IDEA 运行配置
  • 可以在 Yarn Web UI 中看到我们新启动的 yarn 会话:
  • 运行 Flink SQL CDC 作业
  • 在编辑器中输入以下内容:
  • 运行 Flink SQL kafka 作业
  • 解决依赖缺失
  • 排查 kafka 依赖冲突
  • 可以发现最核心的问题是:
  • 单独部署的 dinky 运行这个 FlinkSQL,是没有这个问题的;这看起来是kafka.clients 的原始包,与被着色(maven-shade-plugin执行的结果 )后的包发生类型不一致,翻一下 flink1.17 的源码,看看 flink-sql-connector-kafka 子项目的 pom.xml 文件,可以看到:
  • 远端的flink集群的jobmaster运行中有错,在反序列化任务对象时出错,显然这个错是dinky和远端的flink集群中的类不一致引起的,再看看flink的jobmaster的日志:
  • 使用正确的 kafka 依赖
  • 在只更新dlink-flink-1.17下的,把它替换为:
  • 向kafka相关的主题里写入若干数据,数据最终端落到数据库表里了。
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档