首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
客快物流大数据项目(一):物流项目介绍和内容大纲
2
客快物流大数据项目(二):物流项目详细介绍
3
客快物流大数据项目(三):项目解决方案
4
客快物流大数据项目(四):大数据项目为什么使用Docker
5
客快物流大数据项目(五):Docker介绍
6
客快物流大数据项目(六):Docker与虚拟机的形象比喻及组件介绍
7
客快物流大数据项目(七):Docker总结
8
客快物流大数据项目(八):Docker的安装和启动
9
客快物流大数据项目(九):Docker常用命令
10
客快物流大数据项目(十):Docker容器命令
11
客快物流大数据项目(十一):Docker应用部署
12
客快物流大数据项目(十二):Docker的迁移与备份
13
客快物流大数据项目(十三):Docker镜像
14
客快物流大数据项目(十四):DockerFile介绍与构建过程解析
15
客快物流大数据项目(十五):DockeFile常用命令
16
客快物流大数据项目(十六):使用脚本创建镜像
17
客快物流大数据项目(十七):自定义镜像mycentos
18
客快物流大数据项目(十九):项目环境准备
19
客快物流大数据项目(二十):物流管理系统服务器的数据路径配置和软件下载存放位置
20
客快物流大数据项目(二十一):Docker环境初始化
21
客快物流大数据项目(二十二):Docker环境中安装软件
22
客快物流大数据项目(二十三):OGG介绍
23
客快物流大数据项目(二十四):OGG安装部署
24
客快物流大数据项目(二十五):初始化业务数据
25
客快物流大数据项目(二十六):客户关系管理服务器
26
客快物流大数据项目(二十七):Cloudera Manager简单介绍
27
客快物流大数据项目(二十八):大数据服务器环境准备
28
客快物流大数据项目(二十九):下载CDH的安装包
29
客快物流大数据项目(三十):软件下载后存放位置
30
客快物流大数据项目(三十一):常用工具安装
31
客快物流大数据项目(三十二):安装CDH-6.2.1和初始化CDH服务所需的MySQL库
32
客快物流大数据项目(三十三):安装Server和Agent
33
客快物流大数据项目(三十四):CDH开始安装
34
客快物流大数据项目(三十五):CDH使用注意
35
客快物流大数据项目(三十六):安装ElasticSearch-7.6.1
36
客快物流大数据项目(三十七):安装Kinaba-7.6.1
37
客快物流大数据项目(三十八):安装Azkaban-3.71.0
38
客快物流大数据项目(三十九):Hue安装
39
客快物流大数据项目(四十):ETL实现方案
40
客快物流大数据项目(四十一):Kudu入门介绍
41
客快物流大数据项目(四十二):Java代码操作Kudu
42
客快物流大数据项目(四十三):kudu的分区方式
43
客快物流大数据项目(四十四):Spark操作Kudu创建表
44
客快物流大数据项目(四十五):Spark操作Kudu DML操作
45
客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu
46
客快物流大数据项目(四十七):Spark操作Kudu Native RDD
47
客快物流大数据项目(四十八):Spark操作Kudu 修改表
48
客快物流大数据项目(四十九):开发环境初始化
49
客快物流大数据项目(五十):项目框架初始化
50
客快物流大数据项目(五十一):数据库表分析

客快物流大数据项目(四十五):Spark操作Kudu DML操作

Spark操作Kudu DML操作

Kudu支持许多DML类型的操作,其中一些操作包含在Spark on Kudu集成. 包括:

  • INSERT - 将DataFrame的行插入Kudu表。请注意,虽然API完全支持INSERT,但不鼓励在Spark中使用它。 使用INSERT是有风险的,因为Spark任务可能需要重新执行,这意味着可能要求再次插入已插入的行。这样做会导致失败,因为如果行已经存在,INSERT将不允许插入行(导致失败)。相反,我们鼓励使用下面描述 的INSERT_IGNORE。
  • INSERT-IGNORE - 将DataFrame的行插入Kudu表。如果表存在,则忽略插入动作。
  • DELETE - 从Kudu表中删除DataFrame中的行
  • UPSERT - 如果存在,则在Kudu表中更新DataFrame中的行,否则执行插入操作。
  • UPDATE - 更新dataframe中的行

一、插入数据insert操作

先创建一张表,然后把数据插入到表中

代码语言:javascript
复制
package cn.it

import java.util

import cn.it.SparkKuduDemo.{TABLE_NAME, it}
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}


object SparkKuduTest {
  //定义样例类
  case class person(id:Int, name:String, age:Int, sex:Int)
  
  def main(args: Array[String]): Unit = {
    //构建sparkConf对象
    val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]")

    //构建SparkSession对象
    val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //获取sparkContext对象
    val sc: SparkContext = sparkSession.sparkContext
    sc.setLogLevel("warn")

    //构建KuduContext对象
    val kuduContext = new KuduContext("node2:7051", sc)

    //1.创建表操作
    createTable(kuduContext)

    /**
     * 创建表
     *
     * @param kuduContext
     * @return
     */
    def createTable(kuduContext: KuduContext) = {
      //如果表不存在就去创建
      if (!kuduContext.tableExists(TABLE_NAME)) {

        //构建创建表的表结构信息,就是定义表的字段和类型
        val schema: StructType = StructType(
          StructField("userId", StringType, false) ::
            StructField("name", StringType, false) ::
            StructField("age", IntegerType, false) ::
            StructField("sex", StringType, false) :: Nil)

        //指定表的主键字段
        val keys = List("userId")

        //指定创建表所需要的相关属性
        val options: CreateTableOptions = new CreateTableOptions
        //定义分区的字段
        val partitionList = new util.ArrayList[String]
        partitionList.add("userId")
        //添加分区方式为hash分区
        options.addHashPartitions(partitionList, 6)

        //创建表
        kuduContext.createTable(TABLE_NAME, schema, keys, options)
      }
    }

    /**
     * 2)加载数据
     * @param session
     * @param sc
     * @param kuduContext
     */
    def inserData(session: SparkSession, sc: SparkContext, kuduContext: KuduContext): Unit = {
      //定义数据
      val data = List(person(1, "tom", 30, 1), person(2, "mark", 26, 0))
      val personRDD = sc.makeRDD(data)
      import session.implicits._
      val dataFrame: DataFrame = personRDD.toDF

      kuduContext.insertRows(dataFrame, TABLE_NAME)
    }
  }
}

二、删除数据delete操作

代码语言:javascript
复制
/**
 * 4)删除数据
 * @param session
 * @param kuduContext
 */
def deleteData(session: SparkSession, kuduContext: KuduContext): Unit = {
  //定义数据
  val data = List(person(1, "tom", 50, 1), person(2, "mark", 30, 0))

  import session.implicits._
  val dataFrame: DataFrame = data.toDF().select("id")

  kuduContext.deleteRows(dataFrame, TABLE_NAME)
}

三、更新数据upsert操作

代码语言:javascript
复制
/**
 * 3)修改数据
 * @param session
 * @param kuduContext
 */
def upDATEData(session: SparkSession, kuduContext: KuduContext): Unit = {
  //定义数据
  val data = List(person(1, "tom", 50, 1), person(2, "mark", 30, 0))

  import session.implicits._
  val dataFrame: DataFrame = data.toDF()

  kuduContext.upDATERows(dataFrame, TABLE_NAME)
}
下一篇
举报
领券