Intellj IDEA +SBT + Scala + Spark Sql读取HDFS数据

前提Spark集群已经搭建完毕,如果不知道怎么搭建,请参考这个链接: http://qindongliang.iteye.com/blog/2224797 注意提交作业,需要使用sbt打包成一个jar,然后在主任务里面添加jar包的路径远程提交即可,无须到远程集群上执行测试,本次测试使用的是Spark的Standalone方式 sbt依赖如下:

demo1:使用Scala读取HDFS的数据:

 /** *
    * Spark读取来自HDFS的数据
    */
def readDataFromHDFS(): Unit ={
    //以standalone方式运行,提交到远程的spark集群上面
    val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("load hdfs data")
    conf.setJars(Seq(jarPaths));
    //得到一个Sprak上下文
    val sc = new SparkContext(conf)
    val textFile=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000")
    //获取第一条数据
    //val data=textFile.first()
   // println(data)
    //遍历打印
      /**
       * collect() 方法 游标方式迭代收集每行数据
       * take(5)   取前topN条数据
       * foreach() 迭代打印
       * stop()    关闭链接
       */
   textFile.collect().take(5).foreach( line => println(line) )
    //关闭资源
    sc.stop()
}

demo2:使用Scala 在客户端造数据,测试Spark Sql:

Spark SQL 映射实体类的方式读取HDFS方式和字段,注意在Scala的Objcet最上面有个case 类定义,一定要放在 这里,不然会出问题:

demo3:使用Scala 远程读取HDFS文件,并映射成Spark表,以Spark Sql方式,读取top10:

 val jarPaths="target/scala-2.11/spark-hello_2.11-1.0.jar"
  /**Spark SQL映射的到实体类的方式**/
  def mapSQL2(): Unit ={
    //使用一个类,参数都是可选类型,如果没有值,就默认为NULL
    //SparkConf指定master和任务名
    val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("spark sql query hdfs file")
    //设置上传需要jar包
    conf.setJars(Seq(jarPaths));
    //获取Spark上下文
    val sc = new SparkContext(conf)
    //得到SQL上下文
    val sqlContext=new SQLContext(sc);
    //必须导入此行代码,才能隐式转换成表格
    import sqlContext.implicits._
    //读取一个hdfs上的文件,并根据某个分隔符split成数组
    //然后根据长度映射成对应字段值,并处理数组越界问题
    val model=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000").map(_.split("\1"))
      .map( p =>  ( if (p.length==4) Model(Some(p(0)), Some(p(1)), Some(p(2)), Some(p(3).toLong))
    else if (p.length==3) Model(Some(p(0)), Some(p(1)), Some(p(2)),None)
    else if (p.length==2) Model(Some(p(0)), Some(p(1)),None,None)
    else   Model( Some(p(0)),None,None,None )
      )).toDF()//转换成DF
    //注册临时表
    model.registerTempTable("monitor")
    //执行sql查询
    val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor  limit 10 ")
//    val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor WHERE title IS  NULL AND dtime IS NOT NULL      ")
      println("开始")
      it.collect().take(8).foreach(line => println(line))
      println("结束")
    sc.stop();
  }

在IDEA的控制台,可以输出如下结果:

原文发布于微信公众号 - 我是攻城师(woshigcs)

原文发表时间:2015-08-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏伦少的博客

SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

1836
来自专栏祝威廉

利用 Spark DataSource API 实现Rest数据源

先说下这个需求的来源。通常在一个流式计算的主流程里,会用到很多映射数据,譬如某某对照关系,而这些映射数据通常是通过HTTP接口暴露出来的,尤其是外部系统,你基本...

1562
来自专栏奇点大数据

用SparkStreaming做奇怪的事

作者:尹会生 无需授权即可转载,甚至无需保留以上版权声明 Spark Steaming 是非常著名的流式计算工具,这次用它来搞一个奇葩的需求:开发给定一个日志...

29410
来自专栏伦少的博客

spark基本概念(便于自己随时查阅--摘自Spark快速大数据分析)

转载请务必注明原创地址为:http://dongkelun.com/2018/01/23/sparkBasicConcept/

3808
来自专栏王小雷

Spark学习之Spark调优与调试(7)

Spark学习之Spark调优与调试(7) 1. 对Spark进行调优与调试通常需要修改Spark应用运行时配置的选项。 当创建一个SparkContext时就...

2407
来自专栏数据科学与人工智能

【Spark研究】Spark编程指南(Python版)

Spark编程指南 译者说在前面:最近在学习Spark相关的知识,在网上没有找到比较详细的中文教程,只找到了官网的教程。出于自己学习同时也造福其他初学者的目的,...

1.5K5
来自专栏null的专栏

Hive——巧用transform处理复杂的字符串问题

相比于Map-Reduce,Hive对数据的处理相对简单,但是Hive本身提供的函数,对于处理复杂的字符串问题,就显得不是很方便,此时,可以借助transfor...

4125
来自专栏Small Code

【Python】自动生成命令行工具 - fire 简介

Python 中用于生成命令行接口(Command Line Interfaces, CLIs)的工具已经有一些了,例如已经成为 Python 标准库的 arg...

4549
来自专栏文渊之博

pyspark 内容介绍(一)

pyspark 包介绍 子包 pyspark.sql module pyspark.streaming module pyspark.ml package py...

6136
来自专栏AILearning

Apache Spark 2.2.0 中文文档 - Spark 编程指南 | ApacheCN

Spark 编程指南 概述 Spark 依赖 初始化 Spark 使用 Shell 弹性分布式数据集 (RDDs) 并行集合 外部 Data...

3006

扫码关注云+社区

领取腾讯云代金券