前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink连接Hbase时的kafka报错:java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils

Flink连接Hbase时的kafka报错:java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils

作者头像
火之高兴
发布2024-07-25 15:38:59
300
发布2024-07-25 15:38:59
举报
文章被收录于专栏:大数据应用技术

书接上文 【Flink实时数仓】需求一:用户属性维表处理-Flink CDC 连接 MySQL 至 Hbase 实验及报错分析http://t.csdn.cn/bk96r 我隔了一天跑Hbase中的数据,发现kafka报错,但是kafka在这个代码段中并没有使用,原因就是我在今天的其他项目中添加的kafka依赖导致了冲突。

错误全文

代码语言:javascript
复制
+--------+
| result |
+--------+
|     OK |
+--------+
1 row in set
[WARN ] 2023-07-23 12:48:34,083(0) --> [main] org.apache.flink.runtime.webmonitor.WebMonitorUtils$LogFileLocation.find(WebMonitorUtils.java:82): Log file environment variable 'log.file' is not set.  
[WARN ] 2023-07-23 12:48:34,088(5) --> [main] org.apache.flink.runtime.webmonitor.WebMonitorUtils$LogFileLocation.find(WebMonitorUtils.java:88): JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'.  
[WARN ] 2023-07-23 12:48:35,781(1698) --> [Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) -> NotNullEnforcer(fields=[id]) -> Sink: Collect table sink (1/1)#0] org.apache.flink.runtime.metrics.groups.TaskMetricGroup.getOrAddOperator(TaskMetricGroup.java:154): The operator name Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) exceeded the 80 characters length limit and was truncated.  
[WARN ] 2023-07-23 12:48:36,481(2398) --> [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) -> NotNullEnforcer(fields=[id]) -> Sink: Collect table sink (1/1)#0] org.apache.kafka.connect.runtime.WorkerConfig.logPluginPathConfigProviderWarning(WorkerConfig.java:420): Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.  
[ERROR] 2023-07-23 12:48:36,487(2404) --> [debezium-engine] com.ververica.cdc.debezium.internal.Handover.reportError(Handover.java:147): Reporting error:  
java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils
	at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.start(FlinkOffsetBackingStore.java:152)
	at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.configure(FlinkOffsetBackingStore.java:71)
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:690)
	at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.ThreadUtils
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 7 more
[WARN ] 2023-07-23 12:48:36,499(2416) --> [Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) -> NotNullEnforcer(fields=[id]) -> Sink: Collect table sink (1/1)#0] org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1074): Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) -> NotNullEnforcer(fields=[id]) -> Sink: Collect table sink (1/1)#0 (472d9a4f02e261cfd2f115da78d97e03) switched from RUNNING to FAILED with failure cause: java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils
	at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.start(FlinkOffsetBackingStore.java:152)
	at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.configure(FlinkOffsetBackingStore.java:71)
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:690)
	at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.ThreadUtils
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 7 more
  
[WARN ] 2023-07-23 12:48:36,581(2498) --> [main] org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:215): Failed to get job status so we assume that the job has terminated. Some data might be lost.  
java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
	at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:852)
	at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:752)
	at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:705)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:90)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:203)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:117)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
	at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
	at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
	at demo.UserInfo2Hbase.main(UserInfo2Hbase.java:93)
[WARN ] 2023-07-23 12:48:36,582(2499) --> [main] org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:215): Failed to get job status so we assume that the job has terminated. Some data might be lost.  
java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
	at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:852)
	at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:752)
	at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:705)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:90)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:203)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.cancelJob(CollectResultFetcher.java:225)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.close(CollectResultFetcher.java:150)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:108)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
	at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
	at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
	at demo.UserInfo2Hbase.main(UserInfo2Hbase.java:93)
Exception in thread "main" java.lang.RuntimeException: Failed to fetch next result
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
	at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
	at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
	at demo.UserInfo2Hbase.main(UserInfo2Hbase.java:93)
Caused by: java.io.IOException: Failed to fetch job execution result
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
	... 5 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
	... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
	... 7 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils
	at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.start(FlinkOffsetBackingStore.java:152)
	at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.configure(FlinkOffsetBackingStore.java:71)
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:690)
	at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.ThreadUtils
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 7 more

Process finished with exit code 1

Flink测试代码

代码语言:javascript
复制
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

 tenv.executeSql("CREATE TABLE ums_member (\n" +
                "    id    BIGINT,        \n" +
                "    username  STRING,        \n" +
                "    phone    STRING,        \n" +
                "    status    int,           \n" +
                "    create_time timestamp(3),  \n" +
                "    gender \t\tint,           \n" +
                "    birthday\tdate,          \n" +
                "    city \t\tSTRING,        \n" +
                "    job \t\tSTRING ,       \n" +
                "    source_type INT ,  \n" +
                "    PRIMARY KEY(id) NOT ENFORCED\n" +
                " ) WITH (\n" +
                " 'connector' = 'mysql-cdc',\n" +
                " 'hostname' = 'hadoop10',\n" +
                " 'port' = '3306',\n" +
                " 'username' = 'root',\n" +
                " 'password' = '0000',\n" +
                " 'database-name' = 'db1',\n" +
                //" 'scan.startup.mode' = 'latest-offset',\n" +
                " 'scan.incremental.snapshot.enabled' = 'false',\n" +
                " 'table-name' = 'ums_member')").print();
tenv.executeSql("select * from ums_member").print();

姐姐方案

注释掉kafka依赖,此时我又重新跑,仍然报错。

经过我一顿全网搜索,解决方法五花八门,八仙过海。 我选择了重启idea2020,随后解决。

在这里插入图片描述
在这里插入图片描述

数据成功回到了hbase。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-07-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 错误全文
  • Flink测试代码
  • 姐姐方案
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档