package cn.it
import java.util
import cn.it.SparkKuduDemo.TABLE_NAME
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object SparkKuduTest {
def main(args: Array[String]): Unit = {
//构建sparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]")
//构建SparkSession对象
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//获取sparkContext对象
val sc: SparkContext = sparkSession.sparkContext
sc.setLogLevel("warn")
//构建KuduContext对象
val kuduContext = new KuduContext("node2:7051", sc)
//1.创建表操作
createTable(kuduContext)
/**
* 创建表
*
* @param kuduContext
* @return
*/
def createTable(kuduContext: KuduContext) = {
//如果表不存在就去创建
if (!kuduContext.tableExists(TABLE_NAME)) {
//构建创建表的表结构信息,就是定义表的字段和类型
val schema: StructType = StructType(
StructField("userId", StringType, false) ::
StructField("name", StringType, false) ::
StructField("age", IntegerType, false) ::
StructField("sex", StringType, false) :: Nil)
//指定表的主键字段
val keys = List("userId")
//指定创建表所需要的相关属性
val options: CreateTableOptions = new CreateTableOptions
//定义分区的字段
val partitionList = new util.ArrayList[String]
partitionList.add("userId")
//添加分区方式为hash分区
options.addHashPartitions(partitionList, 6)
//创建表
kuduContext.createTable(TABLE_NAME, schema, keys, options)
}
}
}
}
定义表时要注意的是Kudu表选项值。你会注意到在指定组成范围分区列的列名列表时我们调用“asJava”方 法。这是因为在这里,我们调用了Kudu Java客户端本身,它需要Java对象(即java.util.List)而不是Scala的List对 象;(要使“asJava”方法可用,请记住导入JavaConverters库。) 创建表后,通过将浏览器指向http//master主机名:8051/tables
点击Table id 可以观察到表的schema等信息: