Spark工程开发常用函数与方法(Scala语言)

import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.{SaveMode, DataFrame} import scala.collection.mutable.ArrayBuffer import main.asiainfo.coc.tools.Configure import org.apache.spark.sql.hive.HiveContext import java.sql.DriverManager import java.sql.Connection

 1 连接前台数据源 查询前台MYSQL中的数据

val DIM_COC_INDEX_INFO_DDL = s"""
CREATE TEMPORARY TABLE DIM_COC_INDEX_INFO
USING org.apache.spark.sql.jdbc
OPTIONS (
url '${mySQLUrl}',
dbtable 'DIM_COC_INDEX_INFO'
)""".stripMargin

sqlContext.sql(DIM_COC_INDEX_INFO_DDL)
val DIM_COC_INDEX_INFO = sql("SELECT * FROM DIM_COC_INDEX_INFO").cache()

2   在A表中筛选出 B表中获取的TARGET_TABLE_CODE 然后再按照DATA_SRC_CODE排序,查询出源表的集合

val sources = DIM_COC_INDEX_INFO.filter("TARGET_TABLE_CODE ='"+TARGET_TABLE_CODE+"'")
        .select("DATA_SRC_CODE").groupBy("DATA_SRC_CODE").agg(DIM_COC_INDEX_INFO("DATA_SRC_CODE")).collect

3 将表进行关联

resultIndexTableDF = resultIndexTableDF.join(SOURCE_TABLE,ALL_USERS.col(ALL_USER_JOIN_COLUMN_NAME) === SOURCE_TABLE.col(SOURCE_TABLE_JOIN_COLUMN_NAME),"left_outer")
resultIndexTableDF.dtypes.foreach(println)

4 根据条件筛选

val labels = CI_MDA_SYS_TABLE.join(CI_MDA_SYS_TABLE_COLUMN,CI_MDA_SYS_TABLE("TABLE_ID") === CI_MDA_SYS_TABLE_COLUMN("TABLE_ID"),"inner")
      .join(CI_LABEL_EXT_INFO,CI_MDA_SYS_TABLE_COLUMN("COLUMN_ID") === CI_LABEL_EXT_INFO("COLUMN_ID"),"inner")
      .join(CI_LABEL_INFO,CI_LABEL_EXT_INFO("LABEL_ID") === CI_LABEL_INFO("LABEL_ID"),"inner")
      .join(CI_APPROVE_STATUS,CI_LABEL_INFO("LABEL_ID") === CI_APPROVE_STATUS("RESOURCE_ID"),"inner")
      .filter(CI_APPROVE_STATUS("CURR_APPROVE_STATUS_ID") === CI_APPROVE_STATUS_SUCCESS_CODE
      and (CI_LABEL_INFO("DATA_STATUS_ID") === 1 || CI_LABEL_INFO("DATA_STATUS_ID") === 2)
      and (CI_LABEL_EXT_INFO("COUNT_RULES_CODE") isNotNull  //TODO   trim.length>0
      )
      and CI_MDA_SYS_TABLE("UPDATE_CYCLE") === TABLE_DATA_CYCLE
      ).cache()

5 根据某字段对表进行排序

    val labelTargetTables = labels.groupBy("CI_MDA_SYS_TABLE.TABLE_ID","CI_MDA_SYS_TABLE.TABLE_NAME").agg(labels("CI_MDA_SYS_TABLE.TABLE_ID"),labels("CI_MDA_SYS_TABLE.TABLE_NAME")).collect

6 创建parquet格式的表 可使用schema.生成到指定的schema.

        sqlContext.sql("create table "+labelTargetTableName+" stored as parquet as select * from default."+labelTargetTableNameJson)

7 保存数据格式,可以指定生成的格式

 resultLabelTable.saveAsTable(tableName = labelTargetTableName, source="parquet", mode=SaveMode.Overwrite)

8 根据筛选查询出相应数据,由于cache方法并不属于action操作,接下来的操作需要这一步所执行的数据信息,所以这里使用collect方法,再执行遍历方法

      val r0000Labels = labelInThisTargetTable.filter("COUNT_RULES_CODE = 'R_00000'").select("CI_LABEL_INFO.LABEL_ID","COLUMN_NAME").collect
for(r0000Label <- r0000Labels){
   ........
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏数据和云

案例分析:倾斜值传入导致 SQL 资源消耗升高

作者 | 邓秋爽:云和恩墨技术工程师,有超过七年超大型数据库专业服务经验,擅长 Oracle 数据库优化、SQL 优化和 Troubleshooting。

13540
来自专栏java达人

mysql left( right ) join使用on 与where 筛选的差异

有这样的一个问题mysql查询使用mysql中left(right)join筛选条件在on与where查询出的数据是否有差异。 可能只看着两个关键字看不出任...

24270
来自专栏乐沙弥的世界

PL/SQL --> 存储过程

存储过程子程序的一种类型,能够完成一些任务,作为schema对象存储于数据库。是一个有名字的PL/SQL代码块,支持接收或不接受参数

9030
来自专栏杨建荣的学习笔记

通过pl/sql来格式化sql(r4笔记第63天)

在之前的一篇博文中分享了通过java来格式化sql,http://blog.itpub.net/23718752/viewspace-1444910/ 今天突然...

35040
来自专栏乐沙弥的世界

Oracle字符集与字符类型存储空间占用

10220
来自专栏c#开发者

oracle 常用command

Lunatic 整理 1. 删除表的注意事项 在删除一个表中的全部数据时,须使用TRUNCATE TABLE 表名;因为用DROP TABLE,DE...

37930
来自专栏逸鹏说道

快速对表的某字段赋递增的数值

假如有这张一张表,当时创建时没有用来存放递增的数值的int型字段。在使用的过程中,有这样的需求。 USE AdventureWorks2008R2;GOIF O...

22160
来自专栏技术之路

使用Linq to Sql 创建数据库和表

1.建一个类Article 1 using System.Data.Linq.Mapping; 2 3 4 5 [Table(Name =...

20770
来自专栏数据之美

Hive 基础(2):库、表、字段、交互式查询的基本操作

1、命令行操作 (1)打印查询头,需要显示设置: set hive.cli.print.header=true; (2)加"--",其后的都被认为是注释...

749100
来自专栏沃趣科技

语句效率统计视图 | 全方位认识 sys 系统库

在上一篇《统计信息查询视图|全方位认识 sys 系统库》中,我们介绍了利用sys 系统库的查询统计信息的快捷视图,本期将为大家介绍语句查询效率语句统计信息相关的...

26650

扫码关注云+社区

领取腾讯云代金券