前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark On HBase

Spark On HBase

作者头像
大数据和云计算技术
发布2019-09-24 16:22:09
1.1K0
发布2019-09-24 16:22:09
举报

一.前言

MapReduce早已经对接了HBase,以HBase作为数据源,完成批量数据的读写。如今继MapReduce之后的Spark在大数据领域有着举足轻重的地位,无论跑批,流处理,甚至图计算等都有它的用武之地。Spark对接HBase成为不少用户的需求。

二.Spark On HBase

1.可以解决的问题

Spark和HBase无缝对接意味着我们不再需要关心安全和RDD与HBase交互的细节。更方便应用Spark带来的批处理,流处理等能力。比如以下常见的应用场景:

  1. 以HBase作为存储,通过Spark对流式数据处理。
  2. 以HBase作为存储,完成大规模的图或者DAG的计算。
  3. 通过Spark对HBase做BulkLoad操作
  4. 同Spark SQL对HBase数据做交互式分析

2.社区相关的工作

目前已经有多种Spark对接HBase的实现,这里我们选取三个有代表的工作进行分析:

2.1 华为: Spark-SQL-on-HBase

特点: 扩展了Spark SQL的parse功能来对接HBase。通过coprocessor和自定义filter来提升读写性能。

优点:

  • 扩展了对应的cli功能,支持Scala shell和Python shell
  • 多种性能优化方式,甚至支持sub plan到coprocessor实现partial aggregation.
  • 支持Java和Python API
  • 支持row key组合
  • 支持常用DDL和DML(包括bulkload,但不支持update)

缺点:

  • 不支持支持基于时间戳和版本的查询
  • 不支持安全
  • row key支持原始类型或者String,不支持复杂数据类型

使用示例:

在HBase中创建表,并写入数据

代码语言:javascript
复制
$HBase_Home/bin/hbase shell
create 'hbase_numbers', 'f'for i in '1'..'100' do for j in '1'..'2' do put 'hbase_numbers', "row#{i}", "f:c#{j}", "#{i}#{j}" end end

使用Spark SQL创建表并与HBase表建立映射

代码语言:javascript
复制
$SPARK_HBASE_Home/bin/hbase-sqlCREATE TABLE numbers
rowkey STRING, a STRING, b STRING, PRIMARY KEY (rowkey)
MAPPED BY hbase_numbers COLS=[a=f.c1, b=f.c2];

查询

代码语言:javascript
复制
select a, b from numbers where b > "980"
2.2 Hortonworks: Apache HBase Connector

特点: 以简单的方式实现了标准的Spark Datasource API,使用Spark Catalyst引擎做查询优化。同时通过scratch来构建RDD,也实现了许多常见的查询优化。

优点:

  • native avro支持
  • 谓词下推和分区裁剪
  • 支持row key组合
  • 支持安全

缺点:

  • SQL语法不够丰富,只支持spark sql原有的语法
  • 只支持java原始类型
  • 不支持多语言API

使用示例:

定义 HBase Catalog

代码语言:javascript
复制
def catalog = s"""{
       |"table":{"namespace":"default", "name":"table1"},
       |"rowkey":"key",
       |"columns":{
         |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
         |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
         |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
         |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
         |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
         |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
         |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
         |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
         |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
       |}
     |}""".stripMargin

使用SQL查询

代码语言:javascript
复制
// Load the dataframeval df = withCatalog(catalog)//SQL exampledf.createOrReplaceTempView("table")
sqlContext.sql("select count(col1) from table").show
2.3 Cloudrea: SparkOnHBase

特点: 通过简单的接口实现链接Spark与HBase, 支持常用的bulk读写。架构图如下:

优点

  • 支持安全
  • 通过get或者scan直接生成rdd, 并可以使用API完成更高级的功能
  • 支持组合rowkey
  • 支持多种bulk操作
  • 为spark和 spark streaming提供相似的API
  • 支持谓词下推优化

缺点

  • 不支持复杂数据类型
  • SQL只支持spark sql原有的语法

使用示例

直接使用scan创建一个RDD

代码语言:javascript
复制
SparkConf sparkConf = new SparkConf().setAppName(  
               "Scan_RDD").set("spark.executor.memory", "2000m").setMaster(  
               "spark://xx.xx.xx.xx:7077")
                .setJars(new String[]{"/path/to/hbase.jar"});  val sc = new SparkContext(sparkConf)val conf = HBaseConfiguration.create()val hbaseContext = new HBaseContext(sc, conf)var scan = new Scan()
scan.setCaching(100)var getRdd = hbaseContext.hbaseRDD(tableName, scan)

创建一个RDD并把RDD的内容写入HBase

代码语言:javascript
复制
val sc = new SparkContext(sparkConf)//This is making a RDD of//(RowKey, columnFamily, columnQualifier, value)val rdd = sc.parallelize(Array(
     (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
     (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
     (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
     (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
     (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
    )
   )//Create the HBase config like you normally would  then//Pass the HBase configs and SparkContext to the HBaseContextval conf = HBaseConfiguration.create();
val hbaseContext = new HBaseContext(sc, conf);//Now give the rdd, table name, and a function that will convert a RDD record to a put, and finally// A flag if you want the puts to be batchedhbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
   tableName,    //This function is really important because it allows our source RDD to have data of any type
   // Also because puts are not serializable
   (putRecord) > {
     val put = new Put(putRecord._1)
     putRecord._2.foreach((putValue) > put.add(putValue._1, putValue._2, putValue._3))
      put
   },    true);
2.4 综合对比

产品

SQL支持优化

支持安全

接口丰富易用度

易集成到HBase

社区活跃度

华为

近两年无更新

Hortonworks

较多

近一个月内有更新

Cloudrea

较高

已集成到HBASE trunk且持续更新

3. 最后

社区中有不少Spark on HBase的工作,出发点都是为了提供更易用,更高效的接口。其中Cloudrea的SparkOnHbase更加灵活简单,在2015年8月被提交到HBase的主干(trunk)上,模块名为HBase-Spark Module,目前准备在HBASE 2.0 正式Release, 相信这个特性一定是HBase新版本的一个亮点。 于此同时云HBase也会与社区同步发展,使用包括但不限于Spark On HBase的新特性,届时欢迎大家尝鲜。如若文章中有不准确的描述,请多多指正,谢谢!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-07-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据和云计算技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一.前言
  • 二.Spark On HBase
    • 1.可以解决的问题
      • 2.社区相关的工作
        • 2.1 华为: Spark-SQL-on-HBase
        • 2.2 Hortonworks: Apache HBase Connector
        • 2.3 Cloudrea: SparkOnHBase
        • 2.4 综合对比
      • 3. 最后
      相关产品与服务
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档