前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink sql使用kafka作为source和sink

flink sql使用kafka作为source和sink

作者头像
yiduwangkai
发布2019-09-17 16:02:37
1.7K0
发布2019-09-17 16:02:37
举报
文章被收录于专栏:大数据进阶

大家都知道sql有着简单,直接,容易上手等优势,所以现在大有用sql去掉api的趋势。那么我们少说废话,下面先上个sql的列子

代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(10000)
    env.setParallelism(1)
    //注入数据源
    var tableEnv: StreamTableEnvironment  = TableEnvironment.getTableEnvironment(env)
    tableEnv.registerExternalCatalog("kafka", new UDMExternalCatalog())
    tableEnv.sqlUpdate(
      s"""INSERT INTO `kafka.kafka-k8s.pb_sink_test`
         |select
         |fstDeptSet,
         |filedName1,
         |filedName2,
         |userId,
         |brandNames
         |from kafka.`kafka-k8s`.`pb_internal_test`
         | """.stripMargin)
    env.execute("Flink SQL Skeleton")

上面是一个查询,插入语句,在flink中会被转为一个任务进行提交

下面我们大概讲一下flink内部kafka的实例化过程

有图可知,主要分为4大步骤,先通过calcite分析sql,转为相应的relnode,在根据用户配置的schema和Java spi,过滤出需要的kafka produce和kafka consumer版本。

kafka consumer对应于select部分

kafka produce对应于insert部分

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档