前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >FlinkCDC的探索与实践【SQL部分】2

FlinkCDC的探索与实践【SQL部分】2

原创
作者头像
LarkMidTable
修改2022-09-23 22:33:59
1.3K0
修改2022-09-23 22:33:59
举报
文章被收录于专栏:FlinkCDC

1.示例代码

代码语言:javascript
复制
public class FlinkCDCSQL {

	public static void main(String[] args) throws Exception {
		// 1.设置流的环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//		// 2.设置并行度
		env.setParallelism(1);
		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

		//2.创建 Flink-MySQL-CDC 的 Source
		tableEnv.executeSql("CREATE TABLE student2 (" +
				" id STRING," +
				" name STRING," +
				" address STRING" +
				") WITH (" +
				" 'connector' = 'mysql-cdc'," +
				" 'scan.startup.mode' = 'latest-offset'," +
				" 'scan.incremental.snapshot.enabled' = 'false'," +
				" 'hostname' = 'localhost'," +
				" 'port' = '3306'," +
				" 'username' = 'root'," +
				" 'password' = 'root'," +
				" 'database-name' = 'test'," +
				" 'table-name' = 'student'" +
				")");
		tableEnv.executeSql("select * from student2").print();
		env.execute();
	}
}

2.遇到的问题

问题1

代码语言:javascript
复制
Caused by: java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;at com.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:80)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
... 19 more

解决办法:

代码语言:javascript
复制
Maven 将Flink版本换成
<flink-version>1.13.0</flink-version>

问题2

代码语言:javascript
复制
Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
	at com.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:182)
	at com.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
	at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:128)
	... 19 more

解决方案

代码语言:javascript
复制
再SQL语句中添加这一行,问题解决,1.13版本需要表有主键
" 'scan.incremental.snapshot.enabled' = 'false',"

本人开通付费的知识群,如果需要可以添加QQ:975863632,需要99.9元即可加入,添加需要备注【云雀课堂知识群】,这里可以获取到上面的源码,如果遇到问题可以一起解决,同时可以一起学习和进步。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.示例代码
  • 2.遇到的问题
    • 问题1
      • 解决办法:
        • 问题2
          • 解决方案
          相关产品与服务
          云数据库 MySQL
          腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档