前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink开发-Hive数据导入Phoenix中

Flink开发-Hive数据导入Phoenix中

作者头像
码客说
发布2023-03-06 15:50:54
5900
发布2023-03-06 15:50:54
举报
文章被收录于专栏:码客码客

前言

Hbase中的数据读取起来不太方便,所以这里使用Phoenix来保存数据。

准备Hive

启动Hive服务

代码语言:javascript
复制
nohup $HIVE_HOME/bin/hiveserver2&

连接Hive服务

代码语言:javascript
复制
beeline -n hive -u jdbc:hive2://hadoop01:10000/default

插入数据

代码语言:javascript
复制
INSERT INTO t_user01(id,name) VALUES (1,'李四');

查询数据

代码语言:javascript
复制
select * from t_user01;
select * from t_user01 limit 10;

准备Phoenix

注意

在Phoenix中无论表还是字段只要没有双引号引起来的字段都会变成大写。 这里不建议用双引号,在后期拼接SQL的时候比较麻烦。

启动query server

代码语言:javascript
复制
queryserver.py start

lsof -i:8765

连接

代码语言:javascript
复制
sqlline-thin.py http://hadoop01:8765

创建schema

代码语言:javascript
复制
create schema mdb;

使用这个新建的 schema:

代码语言:javascript
复制
use mdb;

创建表

代码语言:javascript
复制
CREATE TABLE IF NOT EXISTS tuser(
  id VARCHAR primary key,
  name VARCHAR
);

插入数据

代码语言:javascript
复制
upsert into tuser values('1001','zhangsan');
upsert into tuser values('1002','lisi');
upsert into tuser(id,name) values('1003','liwu');

查询记录

代码语言:javascript
复制
select * from tuser;
select * from tuser where id='1001';
select * from tuser where name like 'li%';

分页查询

代码语言:javascript
复制
select * from tuser order by id desc limit 1 offset 0;
select * from tuser order by id desc limit 1 offset 1;

其中

  • limit 取多少条
  • offset 从多少条开始

Hive=>Phoenix

依赖

代码语言:javascript
复制
<!--JSON工具-->
<dependency>
  <groupId>com.alibaba.fastjson2</groupId>
  <artifactId>fastjson2</artifactId>
  <version>2.0.22</version>
</dependency>

<!--Hive JDBC-->
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>2.1.0</version>
</dependency>

<!--操作Phoenix-->
<dependency>
  <groupId>org.apache.phoenix</groupId>
  <artifactId>phoenix-queryserver-client</artifactId>
  <version>6.0.0</version>
</dependency>

主类

代码语言:javascript
复制
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Hive2Phoenix {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<JSONObject> sourceData = env.addSource(new HiveReader());
        sourceData.addSink(new PhoenixWriter());
        sourceData.print();
        env.execute("Hive2Phoenix");
    }
}

读取Hive

代码语言:javascript
复制
import com.alibaba.fastjson2.JSONObject;
import com.xhkjedu.pojo.DBModel;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

public class HiveReader extends RichSourceFunction<JSONObject> {
    private transient Statement st = null;
  
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        Connection con = DriverManager.getConnection("jdbc:hive2://192.168.7.101:10000/default", "hive", "hive");
        st = con.createStatement();
    }

    @Override
    public void run(SourceContext<JSONObject> ctx) throws Exception {

        ResultSet rs = st.executeQuery("select * from t_user");

        while (rs.next()) {
            Integer id = rs.getInt("id");
            String name = rs.getString("name");

            JSONObject json = new JSONObject();
            json.put("id", id);
            json.put("name", name);
            ctx.collect(json);
        }

        //rs.close();
        //st.close();
        //con.close();
    }

    @Override
    public void cancel() {

    }
}

ResultSet.next其实是取一条就跟数据库通讯拿一条数据,并不是全部取出放在内存,因为ResultSet.next之前,是获取了数据库连接的,数据库连接断开,你就获取不到数据了,说明是有通讯的。

写入Phoenix

代码语言:javascript
复制
import com.alibaba.fastjson2.JSONObject;
import com.xhkjedu.pojo.DBModel;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

public class PhoenixWriter extends RichSinkFunction<JSONObject> {

    private transient Statement st = null;
    static Connection conn;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        System.out.println("open:" + Thread.currentThread().getId());
        Class.forName("org.apache.phoenix.queryserver.client.Driver");
        if (conn == null) {
            conn = DriverManager.getConnection("jdbc:phoenix:thin:url=http://192.168.7.101:8765;serialization=PROTOBUF");
        }
        st = conn.createStatement();
    }

    @Override
    public void close() throws Exception {
        System.out.println("close");
        conn.commit();
        st.close();
        conn.close();
        super.close();
    }

    @Override
    public void invoke(JSONObject json, Context context) throws Exception {
        String id = json.getString("id");
        String name = json.getString("name");
        String sql = String.format("upsert into mdb.tuser(id,name) VALUES ('%s','%s')", id, name);
        System.out.println("sql: " + sql);
        st.execute(sql);
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-01-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 准备Hive
  • 准备Phoenix
  • Hive=>Phoenix
    • 依赖
      • 主类
        • 读取Hive
          • 写入Phoenix
          相关产品与服务
          数据库
          云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档