专栏首页earthchen的专栏使用 flink table sql接收 json 数据写入 mysql 中
原创

使用 flink table sql接收 json 数据写入 mysql 中

使用 flink(table sql)+kafka+mysql 实现一个简单的 demo

在 gradle.build 中引入相关依赖

plugins {
    id 'java'
    id "com.github.johnrengelman.shadow" version "6.1.0"
}

group 'io.github.earthchen'
version '1.0-SNAPSHOT'

repositories {
    mavenLocal()
    maven { url "http://maven.aliyun.com/nexus/content/groups/public/" }
    maven { url "http://maven.aliyun.com/nexus/content/repositories/jcenter" }
    maven { url 'https://jitpack.io' }
    mavenCentral()
}

def conditionDependencies = [
        "org.apache.flink:flink-table:${flinkVersion}",
        "org.apache.flink:flink-table-api-java:${flinkVersion}",
        "org.apache.flink:flink-table-api-java-bridge_${scalaBinaryVersion}:${flinkVersion}",
//        "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}",
        "org.apache.flink:flink-table-planner-blink_${scalaBinaryVersion}:${flinkVersion}",
        "org.apache.flink:flink-table-planner_${scalaBinaryVersion}:${flinkVersion}",
        "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}",
//        "ch.qos.logback:logback-core:1.2.3",
//        "ch.qos.logback:logback-classic:1.2.3"
]

def prod = System.getProperty("prod") ?: false;

dependencies {
    // https://mvnrepository.com/artifact/mysql/mysql-connector-java
    implementation group: 'mysql', name: 'mysql-connector-java', version: '8.0.19'
// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc
    implementation group: 'org.apache.flink', name: 'flink-connector-jdbc_2.11', version: '1.12.0'
// https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
    implementation group: 'org.apache.flink', name: 'flink-sql-connector-kafka_2.11', version: '1.12.0'
    // https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
    implementation group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.12.0'

// https://mvnrepository.com/artifact/org.apache.flink/flink-csv
    implementation group: 'org.apache.flink', name: 'flink-json', version: '1.12.0'
    print("----当前prod=${prod}-------")
    if (prod) {
        compileOnly(conditionDependencies)
    } else {
        implementation(conditionDependencies)
    }
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
    testImplementation("org.hamcrest:hamcrest-all:1.3")
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
}

test {
    useJUnitPlatform()
}

jar {
    manifest {
        attributes "Main-Class": "io.github.earthchen.json.KafkaJsonMain"
    }
}

configurations.compile.dependencies.remove dependencies.gradleApi()

shadowJar {
    mergeServiceFiles()
}

准备 source 和 sink 的 sql

kafka 的 source sql

CREATE TABLE KafkaTable
(
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING
)
WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = '127.0.0.1:31090',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
)

mysql 的 sink sql

CREATE TABLE kafka_sink_table
(
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING
)
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://127.0.0.1:3306/test',
    'username'= 'root'
    'password'= '123456'
    'table-name' = 'kafka_sink_table'
);

mysql 创表语句

create table kafka_sink_table
(
    id       int auto_increment
        primary key,
    user_id  bigint       null,
    item_id  bigint       null,
    behavior varchar(256) null
);

逻辑编写

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author earthchen
 * @date 2021/5/26
 **/
public class KafkaJsonMain {

    public static void main(String[] args) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        // ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TableEnvironment tEnv = TableEnvironment.create(settings);
        tEnv.executeSql("CREATE TABLE kafka_source (\n" +
                "                            `user_id` BIGINT,\n" +
                "                            `item_id` BIGINT,\n" +
                "                            `behavior` STRING\n" +
                // "                            `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic' = 'user_behavior1',\n" +
                "    'properties.bootstrap.servers' = '127.0.0.1:9000'," +
                "\n" +
                "    'properties.group.id' = 'testGroup',\n" +
                "    'scan.startup.mode' = 'latest-offset',\n" +
                "    'json.fail-on-missing-field' = 'false'," +
                "    'json.ignore-parse-errors' = 'true'," +
                "    'format' = 'json'\n" +
                ")");
        Table kafkaJsonSource = tEnv.from("kafka_source");
        tEnv.executeSql("CREATE TABLE print_table WITH ('connector' = 'print')\n" +
                "LIKE kafka_source (EXCLUDING ALL)");

        tEnv.executeSql("CREATE TABLE kafka_sink_table\n" +
                "(\n" +
                "    `user_id` BIGINT,\n" +
                "    `item_id` BIGINT,\n" +
                "    `behavior` STRING\n" +
                ")\n" +
                "WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://127.0.0.1:3306/test',\n" +
                "    'username'= 'root',\n" +
                "    'password'= 'Root$%^7',\n" +
                "    'table-name' = 'kafka_sink_table'\n" +
                ")");
        // tEnv.executeSql("select * from kafka_source").print();

        kafkaJsonSource.select($("user_id"),
                $("item_id"),
                $("behavior")).executeInsert("kafka_sink_table");

    }
}

向 kakfa 的 topic 写入几条消息

{"user_id":111,"item_id":111,"behavior":{"aaa":"aaaaa","bbb":"aaaa222"}}
Untitled.png

在 mysql 中查看结果

Untitled 1.png

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink新增特性 | CDC(Change Data Capture) 原理和实践应用

    CDC,Change Data Capture,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以...

    王知无-import_bigdata
  • Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

    北京理工大学硕士毕业,2015 年加入阿里巴巴,参与阿里巴巴实时计算引擎 JStorm 的开发与设计。2016 年开始从事阿里新一代实时计算引擎 Blink S...

    用户6259908
  • Flink CDC 原理、实践和优化

    CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)...

    腾讯云大数据
  • Flink CDC 原理、实践和优化

    CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)...

    KyleMeow
  • 实时数仓|基于Flink1.11的SQL构建实时数仓探索实践

    实时数仓主要是为了解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的OLAP分析、实时的数据看板、业务指标实时监控等场景。虽然关于实时数仓的架构及技术选型...

    Spark学习技巧
  • 深入解读flink sql cdc的使用以及源码分析

    CDC,Change Data Capture,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以...

    大数据技术与应用实战
  • Flink SQL DDL 和 窗口函数实战

    2019 年 8 月 22 日,Flink 发布了 1.9 版本,社区版本的 Flink 新增 了一个 SQL DDL 的新特性,但是暂时还不支持流式的一些概念...

    kk大数据
  • 基于 Flink SQL CDC 的实时数据同步方案

    Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink PMC,阿里巴巴技术专家...

    Spark学习技巧
  • Flink SQL Client综合实战

    在《Flink SQL Client初探》一文中,我们体验了Flink SQL Client的基本功能,今天来通过实战更深入学习和体验Flink SQL;

    程序员欣宸
  • 触宝科技基于Apache Hudi的流批一体架构实践

    当前公司的大数据实时链路如下图,数据源是MySQL数据库,然后通过Binlog Query的方式消费或者直接客户端采集到Kafka,最终通过基于Spark/Fl...

    ApacheHudi
  • 袋鼠云:基于Flink构建实时计算平台的总体架构和关键技术点

    数栈是云原生—站式数据中台PaaS,我们在github和gitee上有一个有趣的开源项目:FlinkX,FlinkX是一个基于Flink的批流统一的数据同步工具...

    数栈DTinsight
  • Debezium-Flink-Hudi:实时流式CDC

    Debezium是一个开源的分布式平台,用于捕捉变化数据(change data capture)的场景。它可以捕捉数据库中的事件变化(例如表的增、删、改等),...

    zhisheng
  • Flink SQL CDC 上线!我们总结了 13 条生产实践经验

    摘要:7月,Flink 1.11 新版发布,在生态及易用性上有大幅提升,其中 Table & SQL 开始支持 Change Data Capture(CDC)...

    zhisheng
  • Flink SQL 流计算可视化 UI 平台

    flink-streaming-platform-web系统是基于flink封装的一个可视化的web系统,用户只需在web界面进行sql配置就能完成流计算任务,...

    zhisheng
  • Flink or Spark?实时计算框架在K12场景的应用实践

    如今,越来越多的业务场景要求 OLTP 系统能及时得到业务数据计算、分析后的结果,这就需要实时的流式计算如Flink等来保障。例如,在 TB 级别数据量的数据库...

    芋道源码
  • 基于流计算 Oceanus 和 Elasticsearch 构建日志分析系统

    实时即未来,最近在腾讯云流计算 Oceanus(Flink)进行实时计算服务,以下为MySQL 到 Flink 进行处理分析,再存储到ES的实践。分享给大家~

    吴云涛
  • Flink SQL on Zeppelin - 打造自己的可视化Flink SQL开发平台

    目前开发Flink的方式有很多,一般来说都是开发同学写JAVA/SCALA/PYTHON项目,然后提交到集群上运行。这种做法较为灵活,因为你在代码里面可以写任务...

    王知无-import_bigdata
  • Oceanus实践-从0到1开发MySQL-cdc到ES SQL作业

    实时即未来,最近在腾讯云Oceanus进行实时计算服务,以下为mysql到flink到ES实践。分享给大家~

    吴云涛
  • flink sql 知其所以然(一)| source\sink 原理

    本文从以下五个小节介绍 flink sql source\sink\format 的概念、原理。

    公众号:大数据羊说

扫码关注云+社区

领取腾讯云代金券