前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark-shell操作hudi并使用hbase作为索引

spark-shell操作hudi并使用hbase作为索引

作者头像
从大数据到人工智能
发布2022-01-19 08:17:12
4220
发布2022-01-19 08:17:12
举报
文章被收录于专栏:大数据-BigData

前言

接上一篇文章,上篇文章说到hudi适配hbase 2.2.6,这篇文章在spark-shell中操作hudi,并使用hbase作为索引。要完成以下实验,请先确保你已经按照文章步骤对hudi进行适配。并且得到了hudi-spark3-bundle_2.12-0.9.0.jar

当然,如果你想先做一个实验,那么可以从这里以下链接下载我已经编译好的jar包。

组件版本以及前提要求:

组件版本:

hudi 0.9.0

hbase 2.2.6

spark 3.0.1

hadoop 3.2.0

hive 3.1.2

zookeeper:3.5.9

前提要求:

要完成以下实验,当然首先你需要有一个可以用的hadoop 3.2.0集群、hbase 2.2.6集群、主机环境中已经下载spark 3.0.1二进制包。

环境说明:

本实验环境使用的相关配置如下:

  • hdfs:hdfs://host117:8020
  • zookeeper:host117:2181
  • hbase对应zk_node_path:/hbase-secure
  • 在hbase上建一个名为hudi_hbase_index_test、列族为_s的表用于存放索引信息。命令为
代码语言:javascript
复制
create 'hudi_hbase_index_test', '_s'Copy

拷贝hbase相关包到spark的jars目录下

我们在spark中使用hbase作为hudi的索引时,需要hbase相关jar包,所以我们需要将hbase目录下的以下jar包拷贝到spark的jars目录下:

  • hbase-protocol-shaded-2.2.6.jar
  • hbase-shaded-netty-2.2.1.jar
  • hbase-shaded-miscellaneous-2.2.1.jar

拷贝hudi-spark3-bundle_2.12-0.9.0.jar到spark的jars目录下

cp hudi-spark3-bundle_2.12-0.9.0.jar spark/jars

启动spark-shell执行hudi相关操作

启动spark-shell

代码语言:javascript
复制
./bin/spark-shell --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'Copy

使用DataGenerator类生成随机数据并写入hudi

代码语言:javascript
复制
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieHBaseIndexConfig._
import org.apache.hudi.config.HoodieIndexConfig._

val tableName = "spark_hudi_hbase_index_test"
val basePath =  "hdfs://host117:8020/tmp/spark_hudi_hbase_index_test"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator").
  option(INDEX_TYPE.key(), "HBASE").
  option(ZKPORT.key(), "2181").
  option(QPS_FRACTION.key(), 0.5).
  option(TABLENAME.key(), "hudi_hbase_index_test").
  option(ZK_NODE_PATH.key(), "/hbase-secure").
  option(ZKQUORUM.key(), "host117").
  option(MAX_QPS_FRACTION.key(), 10000).
  option(MIN_QPS_FRACTION.key(), 1000).
  option(SLEEP_MS_FOR_PUT_BATCH.key(), 100).
  option(SLEEP_MS_FOR_GET_BATCH.key(), 100).
  option(GET_BATCH_SIZE.key(), 100).
  option(QPS_ALLOCATOR_CLASS_NAME.key(), "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator").
  option(UPDATE_PARTITION_PATH_ENABLE.key(), "false").
  option(PUT_BATCH_SIZE_AUTO_COMPUTE.key(), "false").
  option(MAX_QPS_PER_REGION_SERVER.key(), 1000).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)
Copy

注意事项:在使用hbase作为索引时,官网上关于hbase index 的配置说,某些配置项是可选的,但是实际在操作过程中发现其实那些配置项是必选的,比如QPS_ALLOCATOR_CLASS_NAME.key(),所以如果你在实际操作过程中,如果发现存在空指针错误的报错,那么可以按照报错信息查看是不是某些配置没有配导致的。

查看hbase上hudi表的索引信息

在完成上述数据写入之后,我们查看hbase中关于该表的索引信息:

查看hudi表中的数据

执行如下命令

代码语言:javascript
复制
val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*")
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()Copy

查询结果

更新hudi表中数据

代码语言:javascript
复制
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator").
  option(INDEX_TYPE.key(), "HBASE").
  option(ZKPORT.key(), "2181").
  option(TABLENAME.key(), "hudi_hbase_index_test").
  option(ZK_NODE_PATH.key(), "/hbase-secure").
  option(ZKQUORUM.key(), "host117").
  option(MAX_QPS_FRACTION.key(), 10000).
  option(MIN_QPS_FRACTION.key(), 1000).
  option(QPS_FRACTION.key(), 0.5).
  option(SLEEP_MS_FOR_PUT_BATCH.key(), 100).
  option(SLEEP_MS_FOR_GET_BATCH.key(), 100).
  option(GET_BATCH_SIZE.key(), 100).
  option(QPS_ALLOCATOR_CLASS_NAME.key(), "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator").
  option(UPDATE_PARTITION_PATH_ENABLE.key(), "true").
  option(PUT_BATCH_SIZE_AUTO_COMPUTE.key(), "true").
  option(MAX_QPS_PER_REGION_SERVER.key(), 1000).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)Copy

增量查询hudi表中数据

代码语言:javascript
复制
spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*").
  createOrReplaceTempView("hudi_trips_snapshot")

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in

// incrementally query data
val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()Copy

相关结果如下所示:

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/1936507

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-11-,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 组件版本以及前提要求:
    • 组件版本:
      • 前提要求:
        • 环境说明:
        • 拷贝hbase相关包到spark的jars目录下
        • 拷贝hudi-spark3-bundle_2.12-0.9.0.jar到spark的jars目录下
        • 启动spark-shell执行hudi相关操作
          • 启动spark-shell
            • 使用DataGenerator类生成随机数据并写入hudi
              • 查看hbase上hudi表的索引信息
                • 查看hudi表中的数据
                  • 更新hudi表中数据
                    • 增量查询hudi表中数据
                    相关产品与服务
                    TDSQL MySQL 版
                    TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档