前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink写出数据到HBase的Sink实现

Flink写出数据到HBase的Sink实现

作者头像
大数据真好玩
发布2020-08-11 11:20:33
5K0
发布2020-08-11 11:20:33
举报
文章被收录于专栏:暴走大数据暴走大数据

文章目录
  • 一、MyHbaseSink
    • 1、继承RichSinkFunction<输入的数据类型>类
    • 2、实现open方法,创建连接对象
    • 3、实现invoke方法,批次写入数据到Hbase
    • 4、实现close方法,关闭连接
  • 二、HBaseUtil工具类

一、MyHbaseSink

1、继承RichSinkFunction<输入的数据类型>类
代码语言:javascript
复制
public class MyHbaseSink extends RichSinkFunction<Tuple2<String, Double>> {
    private transient Integer maxSize = 1000;
    private transient Long delayTime = 5000L;

    public MyHbaseSink() {
    }

    public MyHbaseSink(Integer maxSize, Long delayTime) {
        this.maxSize = maxSize;
        this.delayTime = delayTime;
    }

    private transient Connection connection;
    private transient Long lastInvokeTime;
    private transient List<Put> puts = new ArrayList<>(maxSize);
2、实现open方法,创建连接对象
代码语言:javascript
复制
 // 创建连接
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 获取全局配置文件,并转为ParameterTool
        ParameterTool params =
                (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        //创建一个Hbase的连接
        connection = HBaseUtil.getConnection(
                params.getRequired("hbase.zookeeper.quorum"),
                params.getInt("hbase.zookeeper.property.clientPort", 2181)
        );
        // 获取系统当前时间
        lastInvokeTime = System.currentTimeMillis();
    }
3、实现invoke方法,批次写入数据到Hbase
代码语言:javascript
复制
 @Override
    public void invoke(Tuple2<String, Double> value, Context context) throws Exception {
        String rk = value.f0;
        //创建put对象,并赋rk值
        Put put = new Put(rk.getBytes());
        // 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25
        put.addColumn("f1".getBytes(), "order".getBytes(), value.f1.toString().getBytes());
        puts.add(put);// 添加put对象到list集合
        //使用ProcessingTime
        long currentTime = System.currentTimeMillis();
        //开始批次提交数据
        if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {
            //获取一个Hbase表
            Table table = connection.getTable(TableName.valueOf("database:table"));
            table.put(puts);//批次提交
            puts.clear();
            lastInvokeTime = currentTime;
            table.close();
        }
    }
4、实现close方法,关闭连接
代码语言:javascript
复制
    @Override
    public void close() throws Exception {
        connection.close();
    }

二、HBaseUtil工具类

  • Hbase的工具类,用来创建Hbase的Connection
代码语言:javascript
复制
public class HBaseUtil {
    /**
     * @param zkQuorum zookeeper地址,多个要用逗号分隔
     * @param port     zookeeper端口号
     * @return connection
     */
    public static Connection getConnection(String zkQuorum, int port) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", zkQuorum);
        conf.set("hbase.zookeeper.property.clientPort", port + "");

        Connection connection = ConnectionFactory.createConnection(conf);
        return connection;
    }
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-08-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 一、MyHbaseSink
    • 1、继承RichSinkFunction<输入的数据类型>类
      • 2、实现open方法,创建连接对象
        • 3、实现invoke方法,批次写入数据到Hbase
          • 4、实现close方法,关闭连接
          • 二、HBaseUtil工具类
          相关产品与服务
          TDSQL MySQL 版
          TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档