前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >(5)FlinkSQL将socket数据写入到mysql方式二

(5)FlinkSQL将socket数据写入到mysql方式二

原创
作者头像
NBI大数据
发布2022-08-08 11:16:23
9140
发布2022-08-08 11:16:23
举报
文章被收录于专栏:用户3645619的专栏
代码语言:javascript
复制
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");
        SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] split = s.split(",");
                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
            }
        });

        // 将流转化为表
        Table table = tableEnv.fromDataStream(waterDS,
                $("id"),
                $("ts"),
                $("vc"),
                $("pt").proctime());

        tableEnv.createTemporaryView("EventTable", table);


        tableEnv.executeSql("CREATE TABLE flinksink (" +
                "componentname STRING," +
                "componentcount BIGINT NOT NULL," +
                "componentsum BIGINT" +
                ") WITH (" +
                "'connector.type' = 'jdbc'," +
                "'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
                "'connector.table' = 'flinksink'," +
                "'connector.driver' =  'com.mysql.cj.jdbc.Driver'," +
                "'connector.username' = 'root'," +
                "'connector.password' = 'root'," +
                "'connector.write.flush.max-rows'='3'\r\n" +
                ")"
        );
        Table mysql_user = tableEnv.from("flinksink");
        mysql_user.printSchema();

        Table result = tableEnv.sqlQuery(
                "SELECT " +
                        "id as componentname, " +                //window_start, window_end,
                        "COUNT(ts) as componentcount ,SUM(ts) as componentsum " +
                        "FROM TABLE( " +
                        "TUMBLE( TABLE EventTable , " +
                        "DESCRIPTOR(pt), " +
                        "INTERVAL '10' SECOND)) " +
                        "GROUP BY id , window_start, window_end"
        );

        //方式一:写入数据库
//        result.executeInsert("flinksink").print(); //;.insertInto("flinksink");

        //方式二:写入数据库
        tableEnv.createTemporaryView("ResultTable", result);
        tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();

//        tableEnv.toAppendStream(result, Row.class).print("toAppendStream");           //追加模式
        env.execute();
    }

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据处理套件 TBDS
腾讯大数据处理套件(Tencent Big Data Suite,TBDS)依托腾讯多年海量数据处理经验,基于云原生技术和泛 Hadoop 生态开源技术提供的可靠、安全、易用的大数据处理平台。 TBDS可在公有云、私有云、非云化环境,根据不同数据处理需求组合合适的存算分析组件,包括 Hive、Spark、HBase、Flink、Presto、Iceberg、Elasticsearch、StarRocks 等,以快速构建企业级数据湖仓。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档