专栏首页我是攻城师如何使用scala+spark读写hbase?

如何使用scala+spark读写hbase?

最近工作有点忙,所以文章更新频率低了点,希望大家可以谅解,好了,言归正传,下面进入今天的主题:

如何使用scala+spark读写Hbase

软件版本如下:

scala2.11.8

spark2.1.0

hbase1.2.0

公司有一些实时数据处理的项目,存储用的是hbase,提供实时的检索,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为OLAP的数据分析引擎,这块后续有空在研究下。

接着上面说的,hbase存储着一些实时的数据,前两周新需求需要对hbase里面指定表的数据做一次全量的update以满足业务的发展,平时操作hbase都是单条的curd,或者插入一个批量的list,用的都是hbase的java api比较简单,但这次涉及全量update,所以如果再用原来那种单线程的操作api,势必速度回慢上许多。

关于批量操作Hbase,一般我们都会用MapReduce来操作,这样可以大大加快处理效率,原来也写过MR操作Hbase,过程比较繁琐,最近一直在用scala做spark的相关开发,所以就直接使用scala+spark来搞定这件事了,当然底层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一样的,在spark里面把从hbase里面读取的数据集转成rdd了,然后做一些简单的过滤,转化,最终在把结果写入到hbase里面。

整个流程如下:

(1)全量读取hbase表的数据

(2)做一系列的ETL

(3)把全量数据再写回hbase

核心代码如下:

从上面的代码可以看出来,使用spark+scala操作hbase是非常简单的。下面我们看一下,中间用到的几个自定义函数:

第一个函数:checkNotEmptyKs

作用:过滤掉空列簇的数据

第二个函数:forDatas

作用:读取每一条数据,做update后,在转化成写入操作

第三个函数:checkNull

作用:过滤最终结果里面的null数据

上面就是整个处理的逻辑了,需要注意的是对hbase里面的无效数据作过滤,跳过无效数据即可,逻辑是比较简单的,代码量也比较少。

除了上面的方式,还有一些开源的框架,也封装了相关的处理逻辑,使得spark操作hbase变得更简洁,有兴趣的朋友可以了解下,github链接如下:

https://github.com/nerdammer/spark-hbase-connector

https://github.com/hortonworks-spark/shc

本文分享自微信公众号 - 我是攻城师(woshigcs),作者:woshigcs

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-06-12

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Hadoop+Hbase集群数据迁移问题

    我是攻城师
  • Spark如何读取Hbase特定查询的数据

    我是攻城师
  • ES-Hadoop插件介绍

    我是攻城师
  • hbase因为数据空洞故障导致读写缓慢

    腾讯云某客户的开发者反馈,大数据集群的hbase读写非常缓慢。我们使用测试程序,也复现该问题。因此,我们需要对hbase集群进行全面检测。

    mikealzhou
  • 安装单机hbase

    安装jdk 下载hbase wget http://archive.apache.org/dist/hbase/hbase-1.0.0/hbase-1.0.0-...

    零月
  • HBase介绍

    Clive
  • Hbase伪分布式集群搭建

    CoderJed
  • 技术干货 | hbase配置详解

    为了能够让namespace支持使用配置属性,如:namespace下表个数(hbase.namespace.quota.maxtables)或者region个...

    加米谷大数据
  • hbase分布式集群搭建

    程序员同行者
  • hadoop2-HBase的安装和测试

    https://www.cnblogs.com/hongten/p/hongten_hadoop_hbase.html

    Hongten

扫码关注云+社区

领取腾讯云代金券