前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink开发-Mysql数据导入Hive中

Flink开发-Mysql数据导入Hive中

作者头像
码客说
发布2023-03-06 15:50:06
1.7K0
发布2023-03-06 15:50:06
举报
文章被收录于专栏:码客码客

前言

Mysql中ResultSet默认会将一次查询的结果存入内存中。如果数据量比较大,就会占用大量的内存。如果内存不够,就会报错。

方式1

流式处理结果,让驱动每次返回1行数据

代码语言:javascript
复制
conn = DriverManager.getConnection("jdbc:mysql://192.168.7.102/", "user", "123456");
stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY,java.sql.ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(Integer.MIN_VALUE);
rs = stmt.executeQuery("SELECT * FROM tablename");

statement进行java.sql.ResultSet.TYPE_FORWARD_ONLY,java.sql.ResultSet.CONCUR_READ_ONLYstmt.setFetchSize(Integer.MIN_VALUE)的组合设置,会告诉mysql服务器流式处理返回结果,一行一行的返回数据。

这是mysql规定的设置,一开始还搞不懂为啥setFetchSize会是Integer.MIN_VALUE,设置上就完了,这是规定的设置!

注意:当你使用此方式处理数据时,你必须处理完resultset中的所有数据,或者将resultset关闭后才能使用此连接进行下一次的查询等操作,否则会抛出异常。

方式2

使用基于游标的处理,setFetchSize

代码语言:javascript
复制
conn = DriverManager.getConnection("jdbc:mysql://192.168.7.102/?useCursorFetch=true", "user", "123456");
stmt = conn.createStatement();
stmt.setFetchSize(100);
rs = stmt.executeQuery("SELECT * FROM tablename");

Mysql=>Hive

依赖

代码语言:javascript
复制
<!--JSON工具-->
<dependency>
  <groupId>com.alibaba.fastjson2</groupId>
  <artifactId>fastjson2</artifactId>
  <version>2.0.22</version>
</dependency>

<!--操作Mysql-->
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.38</version>
</dependency>

<!--Hive JDBC-->
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>2.1.0</version>
</dependency>

主类

代码语言:javascript
复制
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Mysql2Hive {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<JSONObject> mysqlData = env.addSource(new MysqlReader());
        mysqlData.addSink(new HiveWriter());
        mysqlData.print();
        env.execute("Mysql2Hive");
    }
}

读取Mysql

代码语言:javascript
复制
package com.xhkjedu.mysql2hive;

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

class MysqlReader extends RichSourceFunction<JSONObject> {
    private transient Statement st = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("com.mysql.jdbc.Driver");
        Connection con = DriverManager.getConnection("jdbc:mysql://192.168.7.102/zdb?useCursorFetch=true", "root", "123456");
        st = con.createStatement();
        st.setFetchSize(3);
    }

    @Override
    public void run(SourceContext<JSONObject> ctx) throws Exception {

        ResultSet rs = st.executeQuery("select * from t_user01");

        while (rs.next()) {
            Integer id = rs.getInt("id");
            String name = rs.getString("name");

            JSONObject json = new JSONObject();
            json.put("rowKey", id);
            json.put("name", name);
            ctx.collect(json);
        }

        //rs.close();
        //st.close();
        //con.close();
    }

    @Override
    public void cancel() {

    }
}

ResultSet.next其实是取一条就跟数据库通讯拿一条数据,并不是全部取出放在内存,因为ResultSet.next之前,是获取了数据库连接的,数据库连接断开,你就获取不到数据了,说明是有通讯的。

写入Hive

代码语言:javascript
复制
package com.xhkjedu.mysql2hive;

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

class HiveWriter extends RichSinkFunction<JSONObject> {

    private transient Statement st = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("com.mysql.jdbc.Driver");
        Connection con = DriverManager.getConnection("jdbc:hive2://192.168.7.101:10000/default", "root", "123456");
        st = con.createStatement();
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public void invoke(JSONObject json, Context context) throws Exception {
        Integer id = json.getInteger("id");
        String name = json.getString("name");
        String sql = "insert into t_user02(id,name) VALUES (" + id + ",'" + name + "')";
        System.out.println("Running: " + sql);
        st.execute(sql);
    }
}

方式2

主类

代码语言:javascript
复制
package com.xhkjedu.mysql2hive;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;

public class Mysql2Hive {
    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Row> mysqlData = env.addSource(new MysqlReader());
        
        //构造hive catalog
        String name = "hive";      // Catalog名称,定义一个唯一的名称表示
        String defaultDatabase = "default";  // 默认数据库名称
        String hiveConfDir = "/data/tools/bigdata/apache-hive-2.1.0-bin/conf";  // hive-site.xml路径

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tEnv.registerCatalog("myhive", hive);
        tEnv.useCatalog("myhive");
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tEnv.useDatabase("default");
        Table table = tEnv.fromDataStream(mysqlData);
        table.execute().print();
//        tEnv.createTemporaryView("flink_user", table);
//        String insertSql = "insert into t_user02(id,name) SELECT id, name FROM flink_user";
//        tEnv.executeSql(insertSql);
    }

    private static RowTypeInfo getRowTypeInfo(Row row) {
        TypeInformation[] types = new TypeInformation[row.getArity()];
        String[] fieldNames = new String[row.getArity()];
        for (int i = 0; i < row.getArity(); i++) {
            Object field = row.getField(i);
            if (field instanceof Integer) {
                types[i] = BasicTypeInfo.INT_TYPE_INFO;
            } else {
                types[i] = BasicTypeInfo.STRING_TYPE_INFO;
            }
            fieldNames[i] = "f" + i;
        }
        return new RowTypeInfo(types, fieldNames);
    }
}

代码语言:javascript
复制
package com.xhkjedu.mysql2hive;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

class MysqlReader extends RichSourceFunction<Row> {
    private transient Statement st = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("com.mysql.jdbc.Driver");
        Connection con = DriverManager.getConnection("jdbc:mysql://192.168.7.102/ztest?useCursorFetch=true", "root", "123456");
        st = con.createStatement();
        st.setFetchSize(3);
    }

    @Override
    public void run(SourceContext<Row> ctx) throws Exception {

        ResultSet rs = st.executeQuery("select * from t_user");

        while (rs.next()) {
            Integer id = rs.getInt("id");
            String name = rs.getString("name");

            ctx.collect(Row.ofKind(RowKind.INSERT, id, name));
        }

        //rs.close();
        //st.close();
        //con.close();
    }

    @Override
    public void cancel() {

    }
}

支持的数据类型

前面示例中的 DataStream,流中的数据类型都是定义好的 POJO 类。如果 DataStream 中的类型是简单的基本类型,还可以直接转换成表吗?这就涉及了Table 中支持的数据类型。

整体来看,DataStream 中支持的数据类型,Table 中也是都支持的,只不过在进行转换时需要注意一些细节。

1. 原子类型

在 Flink 中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称作”原子类型”。原子类型的DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在 fromDataStream()方法里增加参数,用来重新命名列字段。

代码语言:javascript
复制
// 将数据流转换成动态表,动态表只有一个字段,重命名为 myLong
Table table = tableEnv.fromDataStream(stream, $(“myLong”));

2. Tuple 类型

当原子类型不做重命名时,默认的字段名就是”f0”,容易想到,这其实就是将原子类型看作了一元组Tuple1 的处理结果。 Table 支持 Flink 中定义的元组类型Tuple,对应在表中字段名默认就是元组中元素的属性名 f0、f1、f2…。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通过调用表达式的 as()方法来进行重命名。

代码语言:javascript
复制
// 将数据流转换成只包含 f1 字段的表
Table table = tableEnv.fromDataStream(stream, $("f1"));
// 将数据流转换成包含 f0 和 f1 字段的表,在表中 f0 和 f1 位置交换
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));
// 将 f1 字段命名为 myInt,f0 命名为 myLong
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));

3. POJO 类型

Flink 也支持多种数据类型组合成的”复合类型”,最典型的就是简单 Java 对象(POJO 类型)。由于 POJO 中已经定义好了可读性强的字段名,这种类型的数据流转换成 Table 就显得无比顺畅了。

将 POJO 类型的DataStream 转换成 Table,如果不指定字段名称,就会直接使用原始 POJO类型中的字段名称。POJO 中的字段同样可以被重新排序、提却和重命名,这在之前的例子中已经有过体现。

代码语言:javascript
复制
Table table = tableEnv.fromDataStream(stream);
Table table = tableEnv.fromDataStream(stream, $("user"));
Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"), $("url").as("myUrl"));

4. Row 类型

Flink 中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是 Table 中数据的基本组织形式。Row 类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息;我们在创建 Table 时调用的 CREATE 语句就会将所有的字段名称和类型指定,这在 Flink 中被称为表的”模式结构”(Schema)。除此之外,Row 类型还附加了一个属性 RowKind,用来表示当前行在更新操作中的类型。这样, Row 就可以用来表示更新日志流(changelog stream)中的数据,从而架起了 Flink 中流和表的转换桥梁。

所以在更新日志流中,元素的类型必须是 Row,而且需要调用 ofKind()方法来指定更新类型。下面是一个具体的例子:

代码语言:javascript
复制
DataStream<Row> dataStream = env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
// 将更新日志流转换为表
Table table = tableEnv.fromChangelogStream(dataStream);
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-01-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • 方式1
      • 方式2
      • Mysql=>Hive
        • 依赖
          • 主类
            • 读取Mysql
              • 写入Hive
              • 方式2
                • 主类
                  • 支持的数据类型
                    • 1. 原子类型
                      • 2. Tuple 类型
                        • 3. POJO 类型
                          • 4. Row 类型
                          相关产品与服务
                          云数据库 SQL Server
                          腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档