前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 1.9 — SQL 创建 Kafka 数据源

Flink 1.9 — SQL 创建 Kafka 数据源

作者头像
LakeShen
发布2022-06-23 14:46:53
5940
发布2022-06-23 14:46:53
举报
文章被收录于专栏:数据库和大数据技术原理解析

前言

目前 Flink 1.9 SQL 支持用户直接使用 SQL 语句创建 Kafka 数据源,这极大的方便了用户开发 Flink 实时任务,你可以像 Hive 一样,使用 Create Table 语句来创建 Kafka Source,同时在也可以使用 Select 语句,从这个表中读取数据,进行窗口、ETL等操作。本文主要讲解 Flink 1.9 SQL 创建 Kafka 的 SQL 语法使用,当然,使用这个功能的前提,是你选择使用 Blink Planner。

引入Maven依赖

代码语言:javascript
复制
 <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>1.9.0</version>
  </dependency>

Flink SQL Kafka Source DDL 语句

首先,一般你的 Kafka 数据源里面的消息格式为 Json ,这样在 Flink SQL 创建 Kafka 数据源的时候,指定消息格式为 Json,表中的定义的确保字段的名称和 Json 中的字段保持一致,下面是 Flink SQL 代码实例:

代码语言:javascript
复制
create table kafka_topic_src
(
id varchar,
name varchar,
age varchar,
) with (
'connector.type' = 'kafka',
'connector.version' = '0.10',
'connector.topic' = 'your_topic',
'connector.properties.0.key' = 'group.id',
'connector.properties.0.value' = 'your_consumer_id',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'your_bootstrap_servers',
'connector.property-version' = '1',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append');

上面的 Flink SQL 语句中,定义了三个字段,id、name、age。所以你的 Json 数据格式要包含这三个字段,如果没有包含某个字段,Flink 默认会使用 null 进行填充。

当然,你也可以使用 Json 中部分字段进行使用,比如你只需要 Json 中的 id、name,你也可以这样定义:

代码语言:javascript
复制
create table kafka_topic_src
(
id varchar,
name varchar
) with (
'connector.type' = 'kafka',
'connector.version' = '0.10',
'connector.topic' = 'your_topic',
'connector.properties.0.key' = 'group.id',
'connector.properties.0.value' = 'your_consumer_id',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'your_bootstrap_servers',
'connector.property-version' = '1',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append');

注意,如果你的 kafka 消息不是 Json的话,Flink 任务会一直报错,目前 Kafka 的 upadte-mode 只支持 append 模式。

Flink SQL Kafka Source DDL 属性值

  1. connector.topic , kafka Topic
  2. connector.startup-mode , Flink kafka 消费者启动模式
  3. format.type , kafka 消息内容格式

Flink SQL Kafka Source DDL 注意点

Flink SQL 设置 kafka 消费者 group id

代码语言:javascript
复制
'connector.properties.0.key' = 'group.id',
'connector.properties.0.value' = 'track.log.teamtype.join'

这两个参数一起来进行设置,在 with 后面的语句中。

设置 kafka bootstrap.servers

代码语言:javascript
复制
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'your_kafka_boots_servers'

这两个参数要一起设置,具体的 bootstrap.servers 就是你所使用 Topic 所在集群的链接信息。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-01-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 LakeShen 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档