前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >java使用spark/spark-sql处理schema数据

java使用spark/spark-sql处理schema数据

作者头像
用户1225216
发布2018-03-05 14:18:02
9960
发布2018-03-05 14:18:02
举报
文章被收录于专栏:扎心了老铁扎心了老铁

1、spark是什么?

Spark是基于内存计算的大数据并行计算框架。

1.1 Spark基于内存计算

相比于MapReduce基于IO计算,提高了在大数据环境下数据处理的实时性。

1.2 高容错性和高可伸缩性

与mapreduce框架相同,允许用户将Spark部署在大量廉价硬件之上,形成集群。

2、spark编程

每一个spark应用程序都包含一个驱动程序(driver program ),他会运行用户的main函数,并在集群上执行各种并行操作(parallel operations)

spark提供的最主要的抽象概念有两种:  弹性分布式数据集(resilient distributed dataset)简称RDD ,他是一个元素集合,被分区地分布到集群的不同节点上,可以被并行操作,RDDS可以从hdfs(或者任意其他的支持Hadoop的文件系统)上的一个文件开始创建,或者通过转换驱动程序中已经存在的Scala集合得到,用户也可以让spark将一个RDD持久化到内存中,使其能再并行操作中被有效地重复使用,最后RDD能自动从节点故障中恢复

spark的第二个抽象概念是共享变量(shared variables),它可以在并行操作中使用,在默认情况下,当spark将一个函数以任务集的形式在不同的节点上并行运行时,会将该函数所使用的每个变量拷贝传递给每一个任务中,有时候,一个变量需要在任务之间,或者驱动程序之间进行共享,spark支持两种共享变量:  广播变量(broadcast variables),它可以在所有节点的内存中缓存一个值。  累加器(accumulators):只能用于做加法的变量,例如计算器或求和器

3、spark-sql

spark-sql是将hive sql跑在spark引擎上的一种方式,提供了基于schema处理数据的方式。

4、代码详解

java spark和spark-sql依赖。

pom.xml

代码语言:javascript
复制
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>

基于spark1.6创建HiveContext客户端。在spark2.1已经开始使用sparksession了。请注意。

代码语言:javascript
复制
package com.xiaoju.dqa.fireman.driver;
import com.xiaoju.dqa.fireman.exception.SparkInitException;
import com.xiaoju.dqa.fireman.utils.PropertiesUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.hive.HiveContext;

import java.io.IOException;
import java.util.Properties;

public class SparkClient {
    private SparkConf sparkConf;
    private JavaSparkContext javaSparkContext;

    public SparkClient() {
        initSparkConf();
        javaSparkContext = new JavaSparkContext(sparkConf);
    }

    public SQLContext getSQLContext() throws SparkInitException {
        return new SQLContext(javaSparkContext);
    }

    public HiveContext getHiveContext() throws SparkInitException {
        return new HiveContext(javaSparkContext);
    }

    private void initSparkConf() {
        try {
            PropertiesUtil propUtil = new PropertiesUtil("fireman.properties");
            Properties prop = propUtil.getProperties();
            String warehouseLocation = System.getProperty("user.dir");
            sparkConf = new SparkConf()
                    .setAppName(prop.getProperty("spark.appname"))
                    .set("spark.sql.warehouse.dir", warehouseLocation)
                    .setMaster(prop.getProperty("spark.master"));
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

}

驱动程序driver

1、这里要实现可序列化接口,否则spark并不会识别这个类。

2、这里在通过spark-sql读取到row数据之后,将schema解析出来,并且映射为hashmap。

代码语言:javascript
复制
public class FiremanDriver implements Serializable {
    private String db;
    private String table;
private HiveContext hiveContext;public FiremanDriver(String db, String table) {
        try {
            this.db = db;
            this.table = table;
            SparkClient sparkClient = new SparkClient();
            hiveContext = sparkClient.getHiveContext();
        } catch (SparkInitException ex) {
            ex.printStackTrace();
        }
    }

  public void check() {
        HashMap<String, Object> result = null;
        try {
            String query = String.format("select * from %s.%s", db ,table);
            System.out.println(query);
            DataFrame rows = hiveContext.sql(query);
            JavaRDD<Row> rdd = rows.toJavaRDD();
            result = rdd.map(new Function<Row, HashMap<String, Object>>() {
                @Override
                public HashMap<String, Object> call(Row row) throws Exception {
                    HashMap<String, Object> fuseResult = new HashMap<String, Object>();
                    HashMap<String, Object> rowMap = formatRowMap(row);
                    // 实际map过程
                    return mapResult;
                }
            }).reduce(new Function2<HashMap<String, Object>, HashMap<String, Object>, HashMap<String, Object>>() {
                @Override
                public HashMap<String, Object> call(HashMap<String, Object> map1, HashMap<String, Object> map2) throws Exception {
                    // reduce merge过程                    
            return mergeResult;
                }
            });

        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

  // 读取shema,这里在通过spark-sql读取到row数据之后,将schema解析出来,并且映射为hashmap
    private HashMap<String, Object> formatRowMap(Row row){
        HashMap<String, Object> rowMap = new HashMap<String, Object>();
        try {
        for (int i=0; i<row.schema().fields().length; i++) {
                String colName = row.schema().fields()[i].name();
                Object colValue = row.get(i);
                rowMap.put(colName, colValue);
        }catch (Exception ex) {
            ex.printStackTrace();
        }
        return rowMap;
    }

    public static void main(String[] args) {
        String db = args[0];
        String table = args[1];
        FiremanDriver firemanDriver = new FiremanDriver(db, table);
        firemanDriver.check();
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-05-22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档