Spark开发指南

总的来说,每一个Spark的应用,都是由一个驱动程序(driver program)构成,它运行用户的main函数,在一个集群上执行各种各样的并行操作。Spark提出的最主要抽象概念是弹性分布式数据集 (resilient distributed dataset,RDD),它是元素的集合,划分到集群的各个节点上,可以被并行操作。RDDs的创建可以从HDFS(或者任意其他支持Hadoop文件系统) 上的一个文件开始,或者通过转换驱动程序(driver program)中已存在的Scala集合而来。用户也可以让Spark保留一个RDD在内存中,使其能在并行操作中被有效的重复使用。最后,RDD能自动从节点故障中恢复。

    Spark的第二个抽象概念是共享变量(shared variables),可以在并行操作中使用。在默认情况下,Spark通过不同节点上的一系列任务来运行一个函数,它将每一个函数中用到的变量的拷贝传递到每一个任务中。有时候,一个变量需要在任务之间,或任务与驱动程序之间被共享。Spark 支持两种类型的共享变量:广播变量(broadcast variables),可以在内存的所有的结点上缓存变量;累加器(accumulators):只能用于做加法的变量,例如计数或求和。

    本指南将展示这些特性,并给出一些例子。读者最好比较熟悉Scala,尤其是闭包的语法。请留意,你也可以通过spark-shell脚本,来交互式地运行Spark。我们建议你在接下来的步骤中这样做。

2 接入Spark

    Spark 1.6.1 需要搭配使用 Scala 2.10. 如果你用Scala 来编写应用,你需要相适应的版本的Scala(2.10.X或者更高版本).要写一个Spark 应用,你需要给它加上Spark的依赖。如果你使用SBT或者Maven,Spark可以通过Maven中心库来获得:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.1

    另外,如果你想访问一个HDFS集群,你需要根据你的HDFS版本,添加一个hadoop-client的依赖, HDFS的版本可以在third party distributions找到:

groupId = org.apache.hadoop
artifactId = hadoop-client
version = 

最后,你需要将一些Spark的类和隐式转换导入到你的程序中。通过如下语句:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

    Spark 1.6.1 可以运行在Java 6及以上版本。 如果你使用Java 8, Spark支持Lambda表达式来代替实现function匿名类,否则你还是需要使用org.apache.spark.api.java.function 包下的function类.如果你想用Java来编写Spark应用程序,你需要添加Spark依赖,maven版本依赖如下:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.1

3 初始化Spark

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.

    Spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉Spark如何访问一个集群。创建一个SparkContext对象,你需要创建一个包含你应用信息的SparkConf对象,把它传给JavaSparkContext 。

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

    “appName”参数是你的应用名字,它展示在集群的UI上面。master是一个Spark, Mesos or YARN cluster URL,或者local模式运行的特殊字符串“local”。 实践中,当程序运行在集群中时,不需要在程序中硬编码master,而是使用spark-submit启动应用. 然而对于本地测试和单元测试,你需要将"local"传给Spark。

3.1 使用Shell

   使用Spark shell时, 一个特殊的交互式的SparkContext已经为你创建, 叫做sc变量. 你自己的SparkContext不会工作. 你可以使用--master参数指定context连接的master。你可以通过--jar参数增加外部jar. 例如运行bin/spark-shell在四个core上:

$ ./bin/spark-shell --master local[4]

也可以增加code.jar:

$ ./bin/spark-shell --master local[4] --jars code.jar

To include a dependency using maven coordinates:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

运行spark-shell --help查看更多的参数。, spark-shell invokes the more general spark-submit script.

4 弹性分布式数据集RDD

Spark围绕的概念是弹性分布式数据集(RDD),这是一个有容错机制并可以被并行操作的元素集合。目前有两种方式创建RDD:并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行计算。 或者引用一个外部存储系统的数据集,比如共享文件系统,HDFS, HBase 或者hadoop支持的任意存储系统即可。

4.1并行集合(Parallelized Collections)

并行集合是通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。例如,下面的解释器输出,演示了如何从一个数组(1到5)创建一个并行集合:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

一旦分布式数据集(distData)被创建好,它们将可以被并行操作。例如,我们可以调用distData.reduce((a, b) => a + b)来将数组的元素相加。我们会在后续的分布式数据集运算中进一步描述。

并行集合的一个重要参数是slices,表示数据集切分的份数。Spark将会在集群上为每一份数据起一个任务。典型地,你可以在集群的每个CPU上分布2-4个slices. 一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。然而,你也可以通过传递给parallelize的第二个参数来进行手动设置。(例如:sc.parallelize(data, 10)).

4.2 外部数据集(External Datasets)

Spark可以从Hadoop支持的文件系统创建数据集, 包括本地文件,HDFS,Cassandra,HBase,amazon S3等。Spark可以支持TextFile,SequenceFiles以及其它任何Hadoop输入格式。

Text file的RDDs可以通过SparkContext’s textFile的方式创建,该方法接受一个文件的URI地址(或者机器上的一个本地路径,或者一个hdfs://, sdn://,kfs://,其它URI). 下面是一个调用例子:

scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08

一旦创建完成,distFile可以被进行数据集操作。例如,我们可以通过使用如下的map和reduce操作:distFile.map(s => s.length).reduce((a, b) => a + b)将所有数据行的长度相加。 读取文件时的一些注意点:

  1. 如果使用本地文件系统,必须确保每个节点都能自己节点的此路径下访问相同的文件。 可以将文件复制到所有的worker上或者使用网 络共享文件系统。
  2. Spark所有的文件输入方法,包括textFile,支持文件夹,压缩文件和通配符。 比如你可以使用textFile("/my/directory"), textFile("/my/directory/.txt")和 textFile("/my/directory/.gz")。
  3. textFile方法也可以通过输入一个可选的第二参数,来控制文件的分片数目。默认情况下,Spark为每一块文件创建一个分片(HDFS默认的块大小为64MB),但是你也可以通过传入一个更大的值,来指定一个更高的片值。注意,你不能指定一个比块数更小的片值。

除了文本文件,Spark Scala API 也支持其它数据格式:

  1. SparkContext.wholeTextFiles允许你读取文件夹下所有的文件,比如多个小的文本文件, 返回文件名/内容对。
  2. 对于SequenceFiles,可以使用SparkContext的sequenceFile[K, V]方法创建,其中K和V是文件中的key和values的类型。像IntWritable和Text一样,它们必须是Hadoop的Writable interface的子类。另外,对于几种通用Writable类型,Spark允许你指定原生类型来替代。例如:sequencFile[Int, String]将会自动读取IntWritable和Texts。
  3. 对于其他类型的Hadoop输入格式,你可以使用SparkContext.hadoopRDD方法,它可以接收任意类型的JobConf和输入格式类,键类型和值类型。按照像Hadoop作业一样的方法,来设置输入源就可以了。你也可以使用SparkContext.newHadoopRDD, 它基于新的MapReduce API(org.apache.hadoop.mapreduce).
  4. RDD.saveAsObjectFile and SparkContext.objectFile支持保存RDD为一个简单格式, 包含序列化的Java对象. 尽管这不是一个高效的格式,比如Avro, 但是它提供了一个容易的方式来保存RDD。

4.3 RDD的操作

RDD支持两种操作:转换(transformation)从现有的数据集创建一个新的数据集;而动作(actions)在数据集上运行计算后,返回一个值给驱动程序。 例如,map就是一种转换,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集表示结果。另一方面,reduce是一种动作,通过一些函数将所有的元素叠加起来,并将最终结果返回给Driver程序。(不过还有一个并行的reduceByKey,能返回一个分布式数据集)

Spark中的所有转换都是惰性的,也就是说,他们并不会直接计算结果。相反的,它们只是记住应用到基础数据集(例如一个文件)上的这些转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这个设计让Spark更加有效率的运行。例如,我们可以实现:通过map创建的一个新数据集,并在reduce中使用,最终只返回reduce的结果给driver,而不是整个大的新数据集。

默认情况下,每一个转换过的RDD都会在你在它之上执行一个动作时被重新计算。不过,你也可以使用persist(或者cache)方法,持久化一个RDD在内存中。在这种情况下,Spark将会在集群中,保存相关元素,下次你查询这个RDD时,它将能更快速访问。在磁盘上持久化数据集,或在集群间复制数据集也是支持的。

4.3.1 基础操作

下面的代码演示了RDD的基本操作:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

第一行从一个外部文件创建了一个基本的RDD对象。这个数据集并没有加载到内存中,行只不过是一个指向文件的指针. 代码第二行定义行长度作为mao的结果, 行长度由于惰性设计并没有立即计算。最终 当我们运行reduce,这是一个action。 这时Spark将计算分解成运行在各个节点的任务。 每个节点运行它的map部分以及一个本地的reduction, 并仅将它的结果返回给驱动程序。

如果你想再使用行长度,我们可以在reduce之前增加:

lineLengths.persist()

它可以在lineLengths第一次计算之前被保存在内存中。

4.3.2将function对象传给Spark

Spark API非常依赖在集群中运行的驱动程序中传递function, 对于Scala来说有两种方式实现:

  1. 匿名函数语法(Anonymous function syntax), 可以用作简短的代码。
  2. 全局单例对象的静态方法(Static methods in a global singleton object). 例如,你可以定义MyFunctions对象,传递MyFunctions.func1, 如下所示:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

4.3.3Working with Key-Value Pairs

大部分的Spark操作可以包含任意类似的对象,而一些特殊的操作只能操作键值对的RDD。 最有代表性的是“shuffle”操作, 比如根据键分组或者聚合元素。 在Scala中,这些操作可以使用包含Tuple2 元素的RDD(Scala内建的tuple类型,只需(a, b)就可创建此类型的对象), 比需要import org.apache.spark.SparkContext._ 允许Spark隐式转换. 可以在PairRDDFunctions上应用键值对操作。

举例来说,下面的代码使用reduceByKey操作来计算行在文件中出现了多少次:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

我们也可以使用counts.sortByKey(),例如按照字幕顺序排序然后使用counts.collect()继续将它们作为驱动程序的一个数组对象。

注意: 当使用定制对象作为键时,必须保证equals() 和hashCode() 方法一致.

4.3.4 转换

下面的列表列出了一些通用的转换。 请参考 RDD API doc (ScalaJava, Python) 和 pair RDD functions doc (ScalaJava) 了解细节.

转换

含义

map(func)

返回一个新分布式数据集,由每一个输入元素经过func函数转换后组成

filter(func)

返回一个新数据集,由经过func函数计算后返回值为true的输入元素组成

flatMap(func)

类似于map,但是每一个输入元素可以被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素)

mapPartitions(func)

类似于map,但独立地在RDD的每一个分块上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

类似于mapPartitions, 但func带有一个整数参数表示分块的索引值。因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根据fraction指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器种子

union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.

intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.

groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.

cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

pipe(command, [envVars])

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.

coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Actions

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R)

and pair RDD functions doc (Scala, Java) for details.

Action

Meaning

reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

count()

Return the number of elements in the dataset.

first()

Return the first element of the dataset (similar to take(1)).

take(n)

Return an array with the first n elements of the dataset.

takeSample(withReplacement, num, [seed])

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

takeOrdered(n, [ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.

saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

saveAsSequenceFile(path) (Java and Scala)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

saveAsObjectFile(path) (Java and Scala)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().

countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

Shuffle operations

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

Background

To understand what happens during the shuffle we can consider the example of the reduceByKey operation. The reduceByKey operation generates a new RDD where all values for a single key are combined into a tuple - the key and the result of executing a reduce function against all values associated with that key. The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to compute the result.

In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle.

Although the set of elements in each partition of newly shuffled data will be deterministic, and so is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably ordered data following shuffle then it’s possible to use:

  • mapPartitions to sort each partition using, for example, .sorted
  • repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
  • sortBy to make a globally ordered RDD

Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

Performance Impact

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations.

Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.

Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to organize records before or after transferring them. Specifically, reduceByKey and aggregateByKey create these structures on the map side, and 'ByKey operations generate these on the reduce side. When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection.

Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are preserved until the corresponding RDDs are no longer used and are garbage collected. This is done so the shuffle files don’t need to be re-created if the lineage is re-computed. Garbage collection may happen only after a long period time, if the application retains references to these RDDs or if GC does not kick in frequently. This means that long-running Spark jobs may consume a large amount of disk space. The temporary storage directory is specified by the spark.local.dir configuration parameter when configuring the Spark context.

Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the ‘Shuffle Behavior’ section within the Spark Configuration Guide.

RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes, or store it off-heap in Tachyon. These levels are set by passing a StorageLevel object (Scala, Java, Python) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is:

Storage Level

Meaning

MEMORY_ONLY

Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.

MEMORY_AND_DISK

Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.

MEMORY_ONLY_SER

Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

MEMORY_AND_DISK_SER

Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.

DISK_ONLY

Store the RDD partitions only on disk.

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

Same as the levels above, but replicate each partition on two cluster nodes.

OFF_HEAP (experimental)

Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory. If you plan to use Tachyon as the off heap store, Spark is compatible with Tachyon out-of-the-box. Please refer to this page for the suggested version pairings.

Note: In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level.

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

Which Storage Level to Choose?

Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

  • If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
  • If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access.
  • Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.
  • Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.
  • In environments with high amounts of memory or multiple applications, the experimental OFF_HEAP mode has several advantages:
    • It allows multiple executors to share the same pool of memory in Tachyon.
    • It significantly reduces garbage collection costs.
    • Cached data is not lost if individual executors crash.

Removing Data

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

Shared Variables

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The code below shows this:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Broadcast broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]
>>> broadcastVar = sc.broadcast([1, 2, 3])


>>> broadcastVar.value
[1, 2, 3]

After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).

Accumulators

Accumulators are variables that are only “added” to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types. If accumulators are created with a name, they will be displayed in Spark’s UI. This can be useful for understanding the progress of running stages (NOTE: this is not yet supported in Python).

An accumulator is created from an initial value v by calling SparkContext.accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.

The code below shows an accumulator being used to add up the elements of an array:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

While this code used the built-in support for accumulators of type Int, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a “zero value” for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

In Scala, Spark also supports the more general Accumulable interface to accumulate data where the resulting type is not the same as the elements added (e.g. build a list by collecting together elements), and the SparkContext.accumulableCollection method for accumulating common Scala collection types.

Accumulator accum = sc.accumulator(0);

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

While this code used the built-in support for accumulators of type Integer, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a “zero value” for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

class VectorAccumulatorParam implements AccumulatorParam {
  public Vector zero(Vector initialValue) {
    return Vector.zeros(initialValue.size());
  }
  public Vector addInPlace(Vector v1, Vector v2) {
    v1.addInPlace(v2); return v1;
  }
}

// Then, create an Accumulator of this type:
Accumulator vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());

In Java, Spark also supports the more general Accumulable interface to accumulate data where the resulting type is not the same as the elements added (e.g. build a list by collecting together elements).

>>> accum = sc.accumulator(0)
Accumulator

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
10

While this code used the built-in support for accumulators of type Int, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a “zero value” for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return Vector.zeros(initialValue.size)

    def addInPlace(self, v1, v2):
        v1 += v2
        return v1

# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map(). The below code fragment demonstrates this property:

val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// Here, accum is still 0 because no actions have caused the map to be computed.
Accumulator accum = sc.accumulator(0);
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.
accum = sc.accumulator(0)
def g(x):
  accum.add(x)
  return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.

Deploying to a Cluster

The application submission guide describes how to submit applications to a cluster. In short, once you package your application into a JAR (for Java/Scala) or a set of .py or .zip files (for Python), the bin/spark-submit script lets you submit it to any supported cluster manager.

Launching Spark jobs from Java / Scala

The org.apache.spark.launcher package provides classes for launching Spark jobs as child processes using a simple Java API.

Unit Testing

Spark is friendly to unit testing with any popular unit test framework. Simply create a SparkContext in your test with the master URL set to local, run your operations, and then call SparkContext.stop() to tear it down. Make sure you stop the context within a finally block or the test framework’s tearDown method, as Spark does not support two contexts running concurrently in the same program.

Migrating from pre-1.0 Versions of Spark

Spark 1.0 freezes the API of Spark Core for the 1.X series, in that any API available today that is not marked “experimental” or “developer API” will be supported in future versions. The only change for Scala users is that the grouping operations, e.g. groupByKey, cogroup and join, have changed from returning (Key, Seq[Value]) pairs to (Key, Iterable[Value]).

Spark 1.0 freezes the API of Spark Core for the 1.X series, in that any API available today that is not marked “experimental” or “developer API” will be supported in future versions. Several changes were made to the Java API:

  • The Function classes in org.apache.spark.api.java.function became interfaces in 1.0, meaning that old code that extends Function should implement Function instead.
  • New variants of the map transformations, like mapToPair and mapToDouble, were added to create RDDs of special data types.
  • Grouping operations like groupByKey, cogroup and join have changed from returning (Key, List) pairs to (Key, Iterable).

Spark 1.0 freezes the API of Spark Core for the 1.X series, in that any API available today that is not marked “experimental” or “developer API” will be supported in future versions. The only change for Python users is that the grouping operations, e.g. groupByKey, cogroup and join, have changed from returning (key, list of values) pairs to (key, iterable of values).

Migration guides are also available for Spark Streaming, MLlib and GraphX.

Where to Go from Here

You can see some example Spark programs on the Spark website. In addition, Spark includes several samples in the examples directory (Scala, Java, Python, R). You can run Java and Scala examples by passing the class name to Spark’s bin/run-example script; for instance:

./bin/run-example SparkPi

For Python examples, use spark-submit instead:

./bin/spark-submit examples/src/main/python/pi.py

For R examples, use spark-submit instead:

./bin/spark-submit examples/src/main/r/dataframe.R

For help on optimizing your programs, the configuration and tuning guides provide information on best practices. They are especially important for making sure that your data is stored in memory in an efficient format. For help on deploying, the cluster mode overview describes the components involved in distributed operation and supported cluster managers.

Finally, full API documentation is available in Scala, Java, Python and R.

本文参考:http://colobu.com/2014/12/08/spark-programming-guide/

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Python源码安装MySQLdb

    1、#    yum install Python-devel MySQL-devel zlib-devel openssl-devel

    幽鸿
  • Spark提交任务入口源码分析

    我们平常在使用Spark进行提交代码的时候,一般是直接在装有spark客户端的机器上提交jar包执行。运行命令如下:

    幽鸿
  • Spark DAGScheduler源码解读1-stage划分

    这里创建一个stage,并且将stage放入scheduler的HashMap中进行管理:

    幽鸿
  • Duke@coursera 数据分析与统计推断 unit1 part1 introduction to data

    ‣ observations, variables, and datamatrices

    统计学家
  • 通过Docker安装谷歌足球游戏环境

    足球环境github链接:https://github.com/google-research/football

    用户1908973
  • 3D人体姿态估计

    Coarse-to-Fine Volumetric Prediction for Single-Image 3D Human Pose

    用户1148525
  • spark 2.3 导致driver OOM的一个SparkPlanGraphWrapper源码的bug

    长话短说,我们部门一个同事找到我,说他的spark 2.3 structured streaming程序频繁报OOM,从来没有坚持过超过三四天的,叫帮看一下。 ...

    老白
  • Package Stopped State Since Android 3.1

    Since Android 3.1, Android has introduced a LaunchControl mechanism. It’s call S...

    技术小黑屋
  • Disentangled的假设的探讨

    Francesco Locatello, Stefan Bauer, Mario Lucic, Sylvain Gelly, Bernhard Schölkop...

    用户1908973
  • 为什么需要多线程

    对于这个问题可能很多朋友会说是为了高性能,个人觉得这是误解,多线程不等于高性能,从cpu(单核)的角度上看单线程才能带来最高性能。

    lulianqi

扫码关注云+社区

领取腾讯云代金券