前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Intellj IDEA +SBT + Scala + Spark Sql读取HDFS数据

Intellj IDEA +SBT + Scala + Spark Sql读取HDFS数据

作者头像
我是攻城师
发布2018-05-11 18:15:20
1.8K0
发布2018-05-11 18:15:20
举报
文章被收录于专栏:我是攻城师我是攻城师

前提Spark集群已经搭建完毕,如果不知道怎么搭建,请参考这个链接: http://qindongliang.iteye.com/blog/2224797 注意提交作业,需要使用sbt打包成一个jar,然后在主任务里面添加jar包的路径远程提交即可,无须到远程集群上执行测试,本次测试使用的是Spark的Standalone方式 sbt依赖如下:

demo1:使用Scala读取HDFS的数据:

代码语言:javascript
复制
 /** *
    * Spark读取来自HDFS的数据
    */
def readDataFromHDFS(): Unit ={
    //以standalone方式运行,提交到远程的spark集群上面
    val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("load hdfs data")
    conf.setJars(Seq(jarPaths));
    //得到一个Sprak上下文
    val sc = new SparkContext(conf)
    val textFile=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000")
    //获取第一条数据
    //val data=textFile.first()
   // println(data)
    //遍历打印
      /**
       * collect() 方法 游标方式迭代收集每行数据
       * take(5)   取前topN条数据
       * foreach() 迭代打印
       * stop()    关闭链接
       */
   textFile.collect().take(5).foreach( line => println(line) )
    //关闭资源
    sc.stop()
}

demo2:使用Scala 在客户端造数据,测试Spark Sql:

Spark SQL 映射实体类的方式读取HDFS方式和字段,注意在Scala的Objcet最上面有个case 类定义,一定要放在 这里,不然会出问题:

demo3:使用Scala 远程读取HDFS文件,并映射成Spark表,以Spark Sql方式,读取top10:

代码语言:javascript
复制
 val jarPaths="target/scala-2.11/spark-hello_2.11-1.0.jar"
  /**Spark SQL映射的到实体类的方式**/
  def mapSQL2(): Unit ={
    //使用一个类,参数都是可选类型,如果没有值,就默认为NULL
    //SparkConf指定master和任务名
    val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("spark sql query hdfs file")
    //设置上传需要jar包
    conf.setJars(Seq(jarPaths));
    //获取Spark上下文
    val sc = new SparkContext(conf)
    //得到SQL上下文
    val sqlContext=new SQLContext(sc);
    //必须导入此行代码,才能隐式转换成表格
    import sqlContext.implicits._
    //读取一个hdfs上的文件,并根据某个分隔符split成数组
    //然后根据长度映射成对应字段值,并处理数组越界问题
    val model=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000").map(_.split("\1"))
      .map( p =>  ( if (p.length==4) Model(Some(p(0)), Some(p(1)), Some(p(2)), Some(p(3).toLong))
    else if (p.length==3) Model(Some(p(0)), Some(p(1)), Some(p(2)),None)
    else if (p.length==2) Model(Some(p(0)), Some(p(1)),None,None)
    else   Model( Some(p(0)),None,None,None )
      )).toDF()//转换成DF
    //注册临时表
    model.registerTempTable("monitor")
    //执行sql查询
    val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor  limit 10 ")
//    val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor WHERE title IS  NULL AND dtime IS NOT NULL      ")
      println("开始")
      it.collect().take(8).foreach(line => println(line))
      println("结束")
    sc.stop();
  }

在IDEA的控制台,可以输出如下结果:

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2015-08-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 我是攻城师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档