RDD支持两种类型的操作: 转换操作(transformations): 从现有数据集创建一个新数据集 动作操作(actions): 在数据集上进行计算后将值返回给驱动程序 例如,map 是一个转换操作...4.1 map(func) 映射 将函数应用于 RDD 中的每个元素,将返回值构成新的 RDD。...> rdd = aRDD.intersection(bRDD); // 3 4.7 subtract(otherDataset) 差集 subtract 接受另一个 RDD 作为参数,返回一个由只存在第一个...,返回(K,V)键值对的数据集,使用给定的reduce函数 func 聚合每个键的值,该函数类型必须是(V,V)=> V。...动作操作 (Action) 下面列出了Spark支持的一些常见操作。 5.1 reduce 接收一个函数作为参数,这个函数要操作两个相同元素类型的RDD并返回一个同样类型的新元素.
针对各个元素的转化操作 map() 接收一个函数,把这个函数用于 RDD 中的每个元素,将函数的返回结果作为结果RDD 中对应元素的值 filter() 则接收一个函数,并将 RDD 中满足该函数的...元素放入新的 RDD 中返回 map() 的返回值类型不需要和输入类型一样 对每个输入元素生成多个输出元素。...但是intersection() 的性能却要差很多,它需要网络混洗数据发现共有数据 subtract(other) 函数接收另一个 RDD 作为参数,返回 一个由只存在于第一个 RDD 中而不存在于第二个...}); 折叠方法fold() 和 reduce() 类似,接收一个与 reduce() 接收的函数签名相同的函数,再加上一个 “初始值”来作为每个分区第一次调用时的结果。...使用你的函数对这个初始值进行多次计算不会改变结果,通过原地修改并返回两个参数中的前一个的值来节约在 fold() 中创建对象的开销fold() 和 reduce() 都要求函数的返回值类型需要和我们所操作的
,有时候,一个变量需要在任务之间,或者驱动程序之间进行共享,spark支持两种共享变量: 广播变量(broadcast variables),它可以在所有节点的内存中缓存一个值。...作为Map转换的结果 由于惰性,不会立即计算lineLengths //第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型 JavaRDD...才将计算拆分成不同的task, //并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序 int totalLength...作为Map转换的结果 由于惰性,不会立即计算lineLengths JavaRDD lineLengths = lines.map(new GetLength());...lineLengths.persist(StorageLevel.MEMORY_ONLY()); } //定义map函数 //第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型
返回行数 package com.spark.spark.actions; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD...2、take(n) first=take(1) 返回数据集中的第一个元素。 返回一个包含数据集前n个元素的集合。...org.apache.spark.api.java.function.Function; /** * collect * 将计算的结果作为集合拉回到driver端,一般在使用过滤算子或者一些能返回少量数据集的算子后...返回相同内容的元素对应的条数。...返回相同内容的元素对应的条数。
转化操作map()J接收一个函数,把这个函数用于RDD中的每一个元素,将函数的返回结果作为结果RDD中对应元素。而转化操作filter()则接收一个函数,将RDD满足该函数的元素放入新的RDD中返回。...RDD.subtract(otherRDD)返回只存在第一个RDD中而不存在第二个RDD中的所有的元素组成的RDD。也需要网络混洗。...,再加上一个“初始值”来作为分区第一次调用时的结果。...两者都要求函数的返回值类型需要和我们所操作的RDD中的元素类型相同。 aggregate()函数则把我们从返回值类型必须与所操作的RDD类型相同的限制中解放出来。可以计算两个RDD的平均值。...top()按照RDD元素的顺序,返回RDD的前几个元素。 first()就是一个行动操作,他会返回RDD的第一个元素。
spark支持两种共享变量: 广播变量(broadcast variables),它可以在所有节点的内存中缓存一个值。...当我们,在创建了初始的RDD之后,才可以通过Spark Core提供的transformation算子,对该RDD进行transformation(转换)操作,来获取其他的RDD。...//第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型 JavaRDD lineLengths = lines.map(new Function作为Map转换的结果 由于惰性,不会立即计算lineLengths JavaRDD lineLengths = lines.map(new GetLength());...lineLengths.persist(StorageLevel.MEMORY_ONLY()); } //定义map函数 //第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型
repartition coalesce groupByKey zip zipWithIndex 二、具体细节 mapPartitionWithIndex 类似于mapPartitions,除此之外还会携带分区的索引值...,也是RDD的分区数,并行度理论上来说设置大小为core的2~3倍 */ JavaRDD parallelize = sc.parallelize(names...RDD的分区数且不产生shuffle,不起作用 代码结果: JavaRDD coalesceRDD = rdd2.coalesce(4,true);//设置分区数大于原RDD的分区数且产生...作用在(K,V),返回(K,Iterable )。...import org.apache.spark.SparkConf import org.apache.spark.SparkContext /** * 该函数将RDD中的元素和这个元素在RDD中的索引号
这一篇是一些简单的Spark操作,如去重、合并、取交集等,不管用不用的上,做个档案记录。...distinct去重 import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext...java.util.Arrays; import java.util.List; /** * 返回两个RDD的交集 * @author wuweifeng wrote on 2018/4/16....RDD1中出现,但是不在RDD2中出现的元素,不去重 import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext...Integer>> results = oneRDD.cartesian(twoRDD).collect(); System.out.println(results); } } 注意,返回的是键值对
Spark SQL现在是不支持将包含了嵌套JavaBean或者List等复杂数据的JavaBean,作为元数据的。只支持一个包含简单数据类型的field的JavaBean。...Spark SQL会通过反射读取传递给case class的参数的名称,然后将其作为列名。...与Java不同的是,Spark SQL是支持将包含了嵌套数据结构的case class作为元数据的,比如包含了Array等。...,比java中的row的使用,更加丰富 // 在scala中,可以用row的getAs()方法,获取指定列名的列 teenagerRDD.map { row => Student(row.getAs[Int...()方法,获取指定几列的值,返回的是个map val studentRDD = teenagerRDD.map { row => { val map = row.getValuesMap[Any](Array
spark提供了对结果集RDD进行随机采样,即获取一小部分数据的功能。其中有sample、takeSample、takeOrdered等方法。...import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession... javaRDD = javaSparkContext.textFile("/users/wuwf/age"); //取10%的数据,随机数种子自己设定,也可以不设定...the sampling: " + sampleDataSize); //取指定数量的随机数据 List list = javaRDD.takeSample...(false, 10); System.out.println(list); //取排序好的指定数量的数据 List orderList
支持简单的SQL语法检查,能够在Scala中写Hive语句访问Hive数据,并将结果取回作为RDD使用。 ...2、Spark on Hive和Hive on Spark Spark on Hive: Hive只作为储存角色,Spark负责sql解析优化,执行。...Hive on Spark:Hive即作为存储又负责sql的解析优化,Spark负责执行。 二、基础概念 1、DataFrame ? DataFrame也是一个分布式数据容器。...* 注意: * 1.可以使用row.getInt(0),row.getString(1)...通过下标获取返回Row类型的数据,但是要注意列顺序问题---不常用 * 2.可以使用row.getAs("...列名")来获取对应的列值。
直接开始上代码了,注意,如果只是本地测试spark的各种api的使用,是不需要下载安装任何spark、Hadoop的。直接引入maven依赖就可以了。...、spark-hive等的依赖,目前是用不上的。...package map; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD;... originRDD = javaSparkContext.parallelize(data); //flatmap()是将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的...、最大值、最小值、平均值等。
一、pom.xml 添加spark-core依赖包 org.apache.spark spark-core_2.11 2.1.1...; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import...Arrays.asList(splits); return list.iterator(); } }); // 每个单词作为...return s._1; } }); // Lmbda 表达式写法 和 mapRdd 、 groupByRdd 值一样...= mapRdd1.groupBy(s -> s._1); // 相同key,value值累加 JavaPairRDD mapValuesRdd = groupByRdd.mapValues
近日,Databricks官方网站发表了一篇博文,用示例说明了lambda表达式如何让Spark编程更容易。文章开头即指出,Spark的主要目标之一是使编写大数据应用程序更容易。...Spark的Scala和Python接口一直很简洁,但由于缺少函数表达式,Java API有些冗长。因此,随着Java 8增加了lambda表达式,他们更新了Spark的API。...Spark 1.0将提供Java 8 lambda表达式支持,而且与Java的旧版本保持兼容。该版本将在5月初发布。 文中举了两个例子,用于说明Java 8如何使代码更简洁。...第一个例子是使用Spark的filter和count算子在一个日志文件中查找包含“error”的行。...Spark只需下载解压即可运行,而无须安装。感谢辛湜对本文的审校。(作者:马德奎,摘自:InfoQ)
SQL是我的短板,通过Spark SQL又练习了group by、join 、case when 等语法。... userRDD = spark.read().textFile("data/ml-1m/users.dat") //返回Dataset对象 .javaRDD() //返回JavaRDD...JavaRDD对象 Dataset userDF= spark.createDataFrame(userRDD,User.class); //返回Dataset对象 spark.read...().textFile 生成Dataset对象 javaRDD() 生成JavaRDD对象 map(String->{}) 生成JavaRDD对象 spark.createDataFrame...spark.sql执行sqll操作,可以选择创建的临时表。
: 心脏病的诊断 (冠状动脉疾病状态) 值为 0: < 50% 直径缩小 (意味着’没有疾病’) 值为 1: > 50% 直径缩小 (意味着’出现了疾病’) 使用的技术 Apache Spark:...从Spark官网能获取到的Spark的文档都非常出色,你可以在这里找到它们。...这大大加快了聚合查询的速度。一个列式存储格式在只获取需要的列的数据时大有帮助,也因此大大减少磁盘I / O消耗。 Spark MLLib: Spark的机器学习库。...如上图所示,原始文件要么被HDFS获取,要么被程序导入到HDFS。该文件或数据也可以通过Kafka的topics接收和使用spark streaming读取。...//你可以替代下面的代码,来尝试使用决策树模型,并比较返回数据的精度 NaiveBayesModel _model = NaiveBayes.train(_modelTrainData.rdd
本篇我们介绍在Spark下使用ES-Hadoop的例子 *注:资源准备、数据准备以及ES-Hadoop关键配置项说明请参考上一篇中的内容 Spark 读取 ES 数据 import org.apache.spark.SparkConf...q=clientip:247.37.0.0")方法从ES集群的索引logs-201998/type中,查询query为?q=clientip:247.37.0.0,返回JavaPairRDD。...通过 Spark RDD 写入 ES import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import..." esspark-1.0-SNAPSHOT.jar 通过--jars参数,载入elasticsearch-spark 总结 相比于Hadoop,Spark与ES的交互有更多的方式,包括RDD,Spark...Streaming,还有文中未涉及到的DataSet与Spark SQL的模式等等。
Java版: package com.spark.spark.transformations; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD...; import org.apache.spark.api.java.function.VoidFunction; /** * filter * 过滤符合符合条件的记录数,true的保留,false...2、map 将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。 特点:输入一条,输出一条数据。 /** * map * 通过传入的函数处理每个元素,返回新的数据集。...; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import...; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import
本文主要讲解用elasticsearch-spark的入门。...二、Spark Streaming spark的实时处理,es5.0的时候开始支持,Spark Streaming中的DStream编程接口是RDD,我们需要对RDD进行处理,处理起来较为费劲且不美观。...在spark streaming中,如果我们需要修改流程序的代码,在修改代码重新提交任务时,是不能从checkpoint中恢复数据的(程序就跑不起来),是因为spark不认识修改后的程序了。...也提供了spark sql的插件,换言之,elasticsearch变成了Spark SQL的原生数据源,可以通过Spark SQL显示调用,下面的例子将kibana_sample_data_ecommerce...下面这个例子是从控制台中读取数据,然后根据","切割,把第一个赋值给name,然后写入到es的spark-structured-streaming索引中去,启动程序前需要在控制台执行下命令:nc -lk
Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。它是一个实时的分布式搜索和分析引擎。它可以帮助你用几秒钟内搜索百万级别的数据。...total:返回记录数,本例是2条。 max_score:最高的匹配程度,本例是1.0。 hits:返回的记录组成的数组。...; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf...org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf...,返回的Tuple2值(或第二个元素)将文档作为java.util集合返回。
领取专属 10元无门槛券
手把手带您无忧上云