Spark支持多种数据源,但是Spark对HBase 的读写都没有相对优雅的api,但spark和HBase整合的场景又比较多,故通过spark的DataSource API自己实现了一套比较方便操作HBase的API。
写HBase会根据Dataframe的schema写入对应数据类型的数据到Hbase,先上使用示例:
import spark.implicits._
import org.apache.hack.spark._
val df = spark.createDataset(Seq(("ufo", "play"), ("yy", ""))).toDF("name", "like")
// 方式一
val options = Map(
"hbase.table.rowkey.field" -> "name",
"hbase.table.numReg" -> "12",
"hbase.table.rowkey.prefix" -> "00",
"bulkload.enable" -> "false"
)
df.saveToHbase("hbase_table", Some("XXX:2181"), options)
// 方式二
df1.write.format("org.apache.spark.sql.execution.datasources.hbase")
.options(Map(
"hbase.table.rowkey.field" -> "name",
"hbase.table.name" -> "hbase_table",
"hbase.zookeeper.quorum" -> "XXX:2181",
"hbase.table.rowkey.prefix" -> "00",
"hbase.table.numReg" -> "12",
"bulkload.enable" -> "false"
)).save()
上面两种方式实现的效果是一样的,下面解释一下每个参数的含义:
示例代码如下:
// 方式一
import org.apache.hack.spark._
val options = Map(
"spark.table.schema" -> "appid:String,appstoreid:int,firm:String",
"hbase.table.schema" -> ":rowkey,info:appStoreId,info:firm"
)
spark.hbaseTableAsDataFrame("hbase_table", Some("XXX:2181")).show(false)
// 方式二
spark.read.format("org.apache.spark.sql.execution.datasources.hbase").
options(Map(
"spark.table.schema" -> "appid:String,appstoreid:int,firm:String",
"hbase.table.schema" -> ":rowkey,info:appStoreId,info:firm",
"hbase.zookeeper.quorum" -> "XXX:2181",
"hbase.table.name" -> "hbase_table"
)).load.show(false)
spark和hbase表的schema映射关系指定不是必须的,默认会生成rowkey和content两个字段,content是由所有字段组成的json字符串,可通过field.type.fieldname
对单个字段设置数据类型,默认都是StringType。这样映射出来还得通过spark程序转一下才是你想要的样子,而且所有字段都会去扫描,相对来说不是特别高效。
故我们可自定义schema映射来获取数据:
注意这两个schema是一一对应的,Hbase只会扫描hbase.table.schema
对应的列。
源码在我的 GitHub,欢迎star