前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink开发-Hive数据导入HBase中

Flink开发-Hive数据导入HBase中

作者头像
码客说
发布2023-02-02 11:21:55
8130
发布2023-02-02 11:21:55
举报
文章被收录于专栏:码客码客

正文

依赖

<!--JSON工具-->
<dependency>
  <groupId>com.alibaba.fastjson2</groupId>
  <artifactId>fastjson2</artifactId>
  <version>2.0.22</version>
</dependency>
<!--操作Hbase-->
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-client</artifactId>
  <version>2.1.10</version>
</dependency>

<!--Hive JDBC-->
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>2.1.0</version>
</dependency>

主类

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Hive2Hbase {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<JSONObject> hbaseData = env.addSource(new HiveReader());
        hbaseData.addSink(new HbaseWriter());
        hbaseData.print();
        env.execute("Flink write data to hbase");
    }
}

读取Hive

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

class HiveReader extends RichSourceFunction<JSONObject> {
    private transient Statement st = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        Connection con = DriverManager.getConnection("jdbc:hive2://192.168.7.101:10000/default", "hive", "hive");
        st = con.createStatement();
    }

    @Override
    public void run(SourceContext<JSONObject> ctx) throws Exception {

        ResultSet rs = st.executeQuery("select * from t_user");

        while (rs.next()) {
            Integer id = rs.getInt("id");
            String name = rs.getString("name");

            JSONObject json = new JSONObject();
            json.put("rowKey", id);
            json.put("name", name);
            ctx.collect(json);
        }

        //rs.close();
        //st.close();
        //con.close();
    }

    @Override
    public void cancel() {

    }
}

写入HBase

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.Random;

class HbaseWriter extends RichSinkFunction<JSONObject> {

    private Table queryListTable = null;
    private Random rnd = new Random();

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        // 线程池,性能未知
        org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(conf);
        queryListTable = connection.getTable(TableName.valueOf("zdb", "tuser"));
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
    
    @Override
    public void invoke(JSONObject json, Context context) throws Exception {
        try {
            System.out.println("json = " + json);
            String rowKey = json.getString("rowKey");
            String name = json.getString("name");

            System.out.println("rowKey:" + rowKey);
            System.out.println("name:" + name);
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes("name"), Bytes.toBytes(""), Bytes.toBytes(name));
            queryListTable.put(put);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

Hbase

删除命名空间下的表

disable 'zdb:tuser'
drop 'zdb:tuser'

创建表

create 'zdb:tuser','name'

查看表

describe 'zdb:tuser'

插入数据

put 'zdb:tuser','100','name','LiYing'

查询数据

get 'zdb:tuser','100'
get 'zdb:tuser','100','name'

scan 'zdb:tuser'

scan 'zdb:tuser', {FORMATTER => 'toString'}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-01-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 正文
    • 依赖
      • 主类
        • 读取Hive
          • 写入HBase
          • Hbase
          相关产品与服务
          TDSQL MySQL 版
          TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档