前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用 flink table sql接收 json 数据写入 mysql 中

使用 flink table sql接收 json 数据写入 mysql 中

原创
作者头像
earthchen
修改2021-05-28 10:00:15
5.2K0
修改2021-05-28 10:00:15
举报
文章被收录于专栏:earthchen的专栏earthchen的专栏

使用 flink(table sql)+kafka+mysql 实现一个简单的 demo

在 gradle.build 中引入相关依赖

代码语言:txt
复制
plugins {
    id 'java'
    id "com.github.johnrengelman.shadow" version "6.1.0"
}

group 'io.github.earthchen'
version '1.0-SNAPSHOT'

repositories {
    mavenLocal()
    maven { url "http://maven.aliyun.com/nexus/content/groups/public/" }
    maven { url "http://maven.aliyun.com/nexus/content/repositories/jcenter" }
    maven { url 'https://jitpack.io' }
    mavenCentral()
}

def conditionDependencies = [
        "org.apache.flink:flink-table:${flinkVersion}",
        "org.apache.flink:flink-table-api-java:${flinkVersion}",
        "org.apache.flink:flink-table-api-java-bridge_${scalaBinaryVersion}:${flinkVersion}",
//        "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}",
        "org.apache.flink:flink-table-planner-blink_${scalaBinaryVersion}:${flinkVersion}",
        "org.apache.flink:flink-table-planner_${scalaBinaryVersion}:${flinkVersion}",
        "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}",
//        "ch.qos.logback:logback-core:1.2.3",
//        "ch.qos.logback:logback-classic:1.2.3"
]

def prod = System.getProperty("prod") ?: false;

dependencies {
    // https://mvnrepository.com/artifact/mysql/mysql-connector-java
    implementation group: 'mysql', name: 'mysql-connector-java', version: '8.0.19'
// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc
    implementation group: 'org.apache.flink', name: 'flink-connector-jdbc_2.11', version: '1.12.0'
// https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
    implementation group: 'org.apache.flink', name: 'flink-sql-connector-kafka_2.11', version: '1.12.0'
    // https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
    implementation group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.12.0'

// https://mvnrepository.com/artifact/org.apache.flink/flink-csv
    implementation group: 'org.apache.flink', name: 'flink-json', version: '1.12.0'
    print("----当前prod=${prod}-------")
    if (prod) {
        compileOnly(conditionDependencies)
    } else {
        implementation(conditionDependencies)
    }
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
    testImplementation("org.hamcrest:hamcrest-all:1.3")
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
}

test {
    useJUnitPlatform()
}

jar {
    manifest {
        attributes "Main-Class": "io.github.earthchen.json.KafkaJsonMain"
    }
}

configurations.compile.dependencies.remove dependencies.gradleApi()

shadowJar {
    mergeServiceFiles()
}

准备 source 和 sink 的 sql

kafka 的 source sql

代码语言:txt
复制
CREATE TABLE KafkaTable
(
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING
)
WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = '127.0.0.1:31090',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
)

mysql 的 sink sql

代码语言:txt
复制
CREATE TABLE kafka_sink_table
(
    `user_id` BIGINT,
    `item_id` BIGINT,
    `behavior` STRING
)
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://127.0.0.1:3306/test',
    'username'= 'root'
    'password'= '123456'
    'table-name' = 'kafka_sink_table'
);

mysql 创表语句

代码语言:txt
复制
create table kafka_sink_table
(
    id       int auto_increment
        primary key,
    user_id  bigint       null,
    item_id  bigint       null,
    behavior varchar(256) null
);

逻辑编写

代码语言:txt
复制
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author earthchen
 * @date 2021/5/26
 **/
public class KafkaJsonMain {

    public static void main(String[] args) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        // ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TableEnvironment tEnv = TableEnvironment.create(settings);
        tEnv.executeSql("CREATE TABLE kafka_source (\n" +
                "                            `user_id` BIGINT,\n" +
                "                            `item_id` BIGINT,\n" +
                "                            `behavior` STRING\n" +
                // "                            `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic' = 'user_behavior1',\n" +
                "    'properties.bootstrap.servers' = '127.0.0.1:9000'," +
                "\n" +
                "    'properties.group.id' = 'testGroup',\n" +
                "    'scan.startup.mode' = 'latest-offset',\n" +
                "    'json.fail-on-missing-field' = 'false'," +
                "    'json.ignore-parse-errors' = 'true'," +
                "    'format' = 'json'\n" +
                ")");
        Table kafkaJsonSource = tEnv.from("kafka_source");
        tEnv.executeSql("CREATE TABLE print_table WITH ('connector' = 'print')\n" +
                "LIKE kafka_source (EXCLUDING ALL)");

        tEnv.executeSql("CREATE TABLE kafka_sink_table\n" +
                "(\n" +
                "    `user_id` BIGINT,\n" +
                "    `item_id` BIGINT,\n" +
                "    `behavior` STRING\n" +
                ")\n" +
                "WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://127.0.0.1:3306/test',\n" +
                "    'username'= 'root',\n" +
                "    'password'= 'Root$%^7',\n" +
                "    'table-name' = 'kafka_sink_table'\n" +
                ")");
        // tEnv.executeSql("select * from kafka_source").print();

        kafkaJsonSource.select($("user_id"),
                $("item_id"),
                $("behavior")).executeInsert("kafka_sink_table");

    }
}

向 kakfa 的 topic 写入几条消息

代码语言:txt
复制
{"user_id":111,"item_id":111,"behavior":{"aaa":"aaaaa","bbb":"aaaa222"}}
Untitled.png
Untitled.png

在 mysql 中查看结果

Untitled 1.png
Untitled 1.png

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 在 gradle.build 中引入相关依赖
  • 准备 source 和 sink 的 sql
    • kafka 的 source sql
      • mysql 的 sink sql
        • mysql 创表语句
        • 逻辑编写
        • 向 kakfa 的 topic 写入几条消息
        • 在 mysql 中查看结果
        相关产品与服务
        云数据库 SQL Server
        腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档