首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
客快物流大数据项目(一):物流项目介绍和内容大纲
2
客快物流大数据项目(二):物流项目详细介绍
3
客快物流大数据项目(三):项目解决方案
4
客快物流大数据项目(四):大数据项目为什么使用Docker
5
客快物流大数据项目(五):Docker介绍
6
客快物流大数据项目(六):Docker与虚拟机的形象比喻及组件介绍
7
客快物流大数据项目(七):Docker总结
8
客快物流大数据项目(八):Docker的安装和启动
9
客快物流大数据项目(九):Docker常用命令
10
客快物流大数据项目(十):Docker容器命令
11
客快物流大数据项目(十一):Docker应用部署
12
客快物流大数据项目(十二):Docker的迁移与备份
13
客快物流大数据项目(十三):Docker镜像
14
客快物流大数据项目(十四):DockerFile介绍与构建过程解析
15
客快物流大数据项目(十五):DockeFile常用命令
16
客快物流大数据项目(十六):使用脚本创建镜像
17
客快物流大数据项目(十七):自定义镜像mycentos
18
客快物流大数据项目(十九):项目环境准备
19
客快物流大数据项目(二十):物流管理系统服务器的数据路径配置和软件下载存放位置
20
客快物流大数据项目(二十一):Docker环境初始化
21
客快物流大数据项目(二十二):Docker环境中安装软件
22
客快物流大数据项目(二十三):OGG介绍
23
客快物流大数据项目(二十四):OGG安装部署
24
客快物流大数据项目(二十五):初始化业务数据
25
客快物流大数据项目(二十六):客户关系管理服务器
26
客快物流大数据项目(二十七):Cloudera Manager简单介绍
27
客快物流大数据项目(二十八):大数据服务器环境准备
28
客快物流大数据项目(二十九):下载CDH的安装包
29
客快物流大数据项目(三十):软件下载后存放位置
30
客快物流大数据项目(三十一):常用工具安装
31
客快物流大数据项目(三十二):安装CDH-6.2.1和初始化CDH服务所需的MySQL库
32
客快物流大数据项目(三十三):安装Server和Agent
33
客快物流大数据项目(三十四):CDH开始安装
34
客快物流大数据项目(三十五):CDH使用注意
35
客快物流大数据项目(三十六):安装ElasticSearch-7.6.1
36
客快物流大数据项目(三十七):安装Kinaba-7.6.1
37
客快物流大数据项目(三十八):安装Azkaban-3.71.0
38
客快物流大数据项目(三十九):Hue安装
39
客快物流大数据项目(四十):ETL实现方案
40
客快物流大数据项目(四十一):Kudu入门介绍
41
客快物流大数据项目(四十二):Java代码操作Kudu
42
客快物流大数据项目(四十三):kudu的分区方式
43
客快物流大数据项目(四十四):Spark操作Kudu创建表
44
客快物流大数据项目(四十五):Spark操作Kudu DML操作
45
客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu
46
客快物流大数据项目(四十七):Spark操作Kudu Native RDD
47
客快物流大数据项目(四十八):Spark操作Kudu 修改表
48
客快物流大数据项目(四十九):开发环境初始化
49
客快物流大数据项目(五十):项目框架初始化
50
客快物流大数据项目(五十一):数据库表分析

客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu

Spark操作Kudu dataFrame操作kudu

一、DataFrameApi读取kudu表中的数据

虽然我们可以通过上面显示的KuduContext执行大量操作,但我们还可以直接从默认数据源本身调用读/写API。要设置读取,我们需要为Kudu表指定选项,命名我们要读取的表以及为表提供服务的Kudu集群的Kudu主服务器列表。

  • 代码示例
代码语言:javascript
复制
/**
 * 使用DataFrameApi读取kudu表中的数据
 * @param sparkSession
 * @param kuduMaster
 * @param tableName
 */
def getTableData(sparkSession: SparkSession, kuduMaster: String, tableName: String): Unit = {
  //定义map集合,封装kudu的master地址和要读取的表名
  val options = Map(
    "kudu.master" -> kuduMaster,
    "kudu.table" -> tableName
  )
  sparkSession.read.options(options).kudu.show()
}

二、 DataFrameApi写数据到kudu表中

在通过DataFrame API编写时,目前只支持一种模式“append”。尚未实现的“覆盖”模式。

  • 代码示例
代码语言:javascript
复制
/**
 * 6)DataFrameApi写数据到kudu表中
 */
def dataFrame2Kudu(session: SparkSession, kuduContext: KuduContext): Unit ={
  val data = List(person(3, "canglaoshi", 14, 0), person(4, "xiaowang", 18, 1))
  import  session.implicits._
  val dataFrame = data.toDF

  //目前,在kudu中,数据的写入只支持append追加
  dataFrame.write.mode("append").options(kuduOptions).kudu

  //查看结果
  //导包
  import org.apache.kudu.spark.kudu._
  //加载表的数据,导包调用kudu方法,转换为dataFrame,最后在使用show方法显示结果
  sparkSession.read.options(kuduOptions).kudu.show()
}

三、​​​​​​​使用sparksql操作kudu表

可以选择使用Spark SQL直接使用INSERT语句写入Kudu表;与'append'类似,INSERT语句实际上将默认使用 UPSERT语义处理;

  • 代码示例
代码语言:javascript
复制
/**
 * 使用sparksql操作kudu表
 * @param sparkSession
 * @param sc
 * @param kuduMaster
 * @param tableName
 */
def SparkSql2Kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = {
  //定义map集合,封装kudu的master地址和表名
  val options = Map(
    "kudu.master" -> kuduMaster,
    "kudu.table" -> tableName
  )
  val data = List(persont(10, "小张", 30, 0), person(11, "小王", 40, 0))
  import sparkSession.implicits._
  val dataFrame: DataFrame = sc.parallelize(data).toDF
  //把dataFrame注册成一张表
  dataFrame.createTempView("temp1")

  //获取kudu表中的数据,然后注册成一张表
  sparkSession.read.options(options).kudu.createTempView("temp2")
  //使用sparkSQL的insert操作插入数据
  sparkSession.sql("insert into table temp2 select * from temp1")
  sparkSession.sql("select * from temp2 where age >30").show()
}
下一篇
举报
领券