前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >0871-6.3.2-如何基于CDH6环境编译Hudi-0.9.0并使用

0871-6.3.2-如何基于CDH6环境编译Hudi-0.9.0并使用

作者头像
Fayson
发布2022-03-21 13:11:40
2.7K0
发布2022-03-21 13:11:40
举报
文章被收录于专栏:Hadoop实操Hadoop实操

1.文档编写目的

Apache Hudi是一个Data Lakes的开源方案,是Hadoop Updates and Incrementals的简写,它是由Uber开发并开源的Data Lakes解决方案。Hudi 是一个丰富的平台,用于构建具有增量数据管道的流式数据湖,具有如下基本特性/能力:

  • Hudi能够摄入(Ingest)和管理(Manage)基于HDFS之上的大型分析数据集,主要目的是高效的减少入库延时。
  • Hudi基于Spark来对HDFS上的数据进行更新、插入、删除等。
  • Hudi在HDFS数据集上提供如下流原语:插入更新(如何改变数据集);增量拉取(如何获取变更的数据)。
  • Hudi可以对HDFS上的parquet格式数据进行插入/更新操作。
  • Hudi通过自定义InputFormat与Hadoop生态系统(Spark、Hive、Parquet)集成。
  • Hudi通过Savepoint来实现数据恢复。
  • Hudi支持Spark 2.x版本,建议使用2.4.4+版本的Spark。

本篇文章Fayson主要介绍如何基于CDH6.3.2版本编译Hudi

2.编译环境准备

1.本次的编译环境主要是基于Intellij Idea工具进行编译,打开Idea开发工具,从git上将hudi的源码checkout下来。

点击“Get from VCS”,选择GitHub方式,填写Hudi的git地址:https://github.com/apache/hudi.git

点击Clone将Hudi的master代码拉至本地

选中hudi工程,右键切换分支版本至0.9.0

点击“Branches”,选择0.9.0版本并checkout

到此完成了Hudi源码的Checkout,接下来调整依赖包版本及简单的调整代码进行编译。

注意:Hudi是Java开发,在自己的开发环境中还需要调整后自己的Java环境变量。

3.源码编译及修改

本次编译主要是为了能够更好的适配CDH6.3.2集群,因此在编译的过程中需要将Maven依赖调整为CDH6.3.2版本。

1.修改pom.xml配置文件,将里面的依赖修改为如下

  • 确认<repositories></repositories>部分是否有Cloudera的Maven源

代码语言:javascript
复制
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/public/</url>
      <releases>
        <enabled>true</enabled>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
  • 改<properties></properties>部分的hadoop.version、hive.version以及spark2.version的版本
代码语言:javascript
复制
<hadoop.version>3.0.0-cdh6.3.2</hadoop.version>
<hive.version>2.1.1-cdh6.3.2</hive.version>
<spark2.version>2.4.0-cdh6.3.2</spark2.version>
  • 修改hive-jdbc和hive-service两个依赖的配置,添加排除
代码语言:javascript
复制
    <exclusion>
      <groupId>org.glassfish</groupId>
      <artifactId>javax.el</artifactId>
    </exclusion>

2.修改hudi-spark模块的org.apache.hudi.DefaultSource类中的部分代码段

使用CDH6.3.2版本的Spark依赖包找不到org.apache.spark.sql.execution.datasources.orc.OrcFileFormat

3.hudi-utilities模块代码修改

  • org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector类中的getFileAttributesFromRecord(JSONObject record)方法

该方法未抛出JSONExcepiton,导致编译失败

  • org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector类中processAndDeleteInvalidMessages方法添加JSONException异常抛出

  • org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelector类中testFileAttributesFromRecordShouldReturnsExpectOutput方法添加JSONException异常抛出
  • org.apache.hudi.utilities.sources.helpers.TestS3EventsMetaSelector类中testNextEventsFromQueueShouldReturnsEventsFromQueue方法添加JSONException异常抛出

4.hudi-integ-test模块代码修改,注释pom.xml文件中jackson-annotations依赖的scope)

5.hudi-spark-datasource/hudi-spark-common模块的

org.apache.hudi.DataSourceReadOptions,将如下截图部分代码注释(204-228行的if判断)

6.完成上修改后,通过idea执行编译操作

等待命令执行成功

至此完成了Hudi0.9.0版本的编译。

4.Hudi与Spark集成测试

1.在前面完成了Hudi源码的编译,在packaging目录下的hudi-spark-bundle模块可以找到编译好的hudi-spark-bundle_2.11-0.9.0的jar包

2.将编译好的jar包上传至CDH集群任意有Spark Gateway节点的服务器上

3.使用spark-shell命令集成hudi并测试基本功能

代码语言:javascript
复制
spark-shell \
  --jars hudi-spark-bundle_2.11-0.9.0.jar \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

4.在命令行执行如下代码,创建一个hudi的表并插入数据

代码语言:javascript
复制
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow1"
val basePath = "hdfs:///tmp/hudi_trips_cow1"
val dataGen = new DataGenerator
//写入数据
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option("hoodie.embed.timeline.server","false").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

5.执行如下代码读取数据

代码语言:javascript
复制
val tripsSnapshotDF = spark.read.
  format("hudi").
  load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

查看该表的总数据量

6.执行如下代码删除数据

代码语言:javascript
复制
// 取出两条要删除的数据
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

// 删除
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION_OPT_KEY,"delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  option("hoodie.embed.timeline.server","false").
  mode(Append).
  save(basePath)

// 再次查询
val roAfterDeleteViewDF = spark.
  read.
  format("hudi").
  load(basePath)

roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// 应该返回两条数据
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").show()

7.查看HDFS上的hudi数据目录

代码语言:javascript
复制
hadoop fs -ls -R /tmp/hudi_trips_cow1

到完成了简单的Spark与Hudi的集成测试

5.总结

1.Hudi0.9.0版本与Spark的集成,至少需要Spark2.4.4及以上版本,在更高版本中引入的ORC的支持,因此使用CDH6.3.2版本依赖进行编译是去掉了ORC相关的代码段

2.在编译的过程中,hudi依赖的hive依赖中存在低版本的jetty依赖包,导致在执行写入时报如下异常:对于该异常的处理方式,需要在执行写入hudi数据的代码段中增加option("hoodie.embed.timeline.server","false").

代码语言:javascript
复制
java.lang.NoSuchMethodError: org.apache.hudi.org.apache.jetty.server.session.SessionHandler.setHttpOnly(Z)V
  at io.javalin.core.util.JettyServerUtil.defaultSessionHandler(JettyServerUtil.kt:50)
  at io.javalin.Javalin.<init>(Javalin.java:94)

3.在后续的文章中会使用Hudi与支持的Hive、Spark、MR等进行详细的测试。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-03-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Hadoop实操 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
专用宿主机
专用宿主机(CVM Dedicated Host,CDH)提供用户独享的物理服务器资源,满足您资源独享、资源物理隔离、安全、合规需求。专用宿主机搭载了腾讯云虚拟化系统,购买之后,您可在其上灵活创建、管理多个自定义规格的云服务器实例,自主规划物理资源的使用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档