我尝试了这个教程http://spark.apache.org/docs/latest/quick-start.html我首先从一个文件中创建了一个集合
textFile = sc.textFile("README.md")然后,我尝试了一个命令来计算单词:
wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)打印集合的步骤:
wordCounts.collect()我发现了如何使用sortByKey命令按单词对其进行排序。我想知道怎么可能做同样的事情来按值排序,在本例中是一个单词在文档中出现的数字。
发布于 2014-12-14 09:22:55
排序通常应该在调用collect()之前完成,因为这样会将数据集返回给驱动程序,而且这也是在java中对hadoop map-reduce作业进行编程的方式,以便(通常)将所需的最终输出写入HDFS。使用spark API,这种方法提供了将输出以“原始”形式写入所需位置的灵活性,例如写入到文件中,在该文件中可以将输出用作进一步处理的输入。
在collect()之前使用spark的scala API排序可以遵循eliasah的建议,并使用Tuple2.swap()两次,一次在排序之前,一次在排序之后,以便生成一个元组列表,该列表按其第二个字段(名为_2)的升序或降序排序,并包含其第一个字段(名为_1)中的单词数。下面是如何在spark-shell中编写脚本的示例:
// this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D
val file = sc.textFile("some_local_text_file_pathname")
val wordCounts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _, 1) // 2nd arg configures one task (same as number of partitions)
.map(item => item.swap) // interchanges position of entries in each tuple
.sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task
.map(item => item.swap)为了颠倒排序的顺序,请使用sortByKey(false,1),因为它的第一个参数是升序的布尔值。它的第二个参数是任务的数量(等同于分区的数量),它被设置为1,用于测试只需要一个输出数据文件的小输入文件;reduceByKey也接受这个可选参数。
之后,可以将wordCounts RDD作为文本文件保存到包含saveAsTextFile(directory_pathname)的目录中,在该目录中将根据为作业配置的减速机数量(每个减速机1个输出数据文件),存放一个或多个part-xxxxx文件(从part-00000开始)、一个_SUCCESS文件(取决于作业是否成功)和.crc文件。
使用pyspark,一个与上面所示的scala脚本非常相似的python脚本,产生的输出实际上是相同的。以下是pyspark版本,演示了按值对集合进行排序:
file = sc.textFile("file:some_local_text_file_pathname")
wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b, 1) \ # last arg configures one reducer task
.map(lambda (a, b): (b, a)) \
.sortByKey(1, 1) \ # 1st arg configures ascending sort, 2nd configures 1 task
.map(lambda (a, b): (b, a))为了降序sortbyKey,它的第一个arg应该是0。由于python捕获前导空格和尾随空格作为数据,因此在将每行分隔为空格之前会插入strip(),但使用spark-shell/scala不需要这样做。
spark和python版本的wordCount输出的主要区别在于spark输出(word,3) python输出(u'word',3)。
有关spark RDD方法的更多信息,请参阅针对python的http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html和针对scala的https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD。
在spark-shell中,在wordCounts上运行collect()将其从一个RDD转换为一个数组(String,Int) = ArrayTuple2(String,Int),该数组本身可以在每个Tuple2元素的第二个字段上使用以下命令进行排序:
Array.sortBy(_._2) sortBy还采用了一个可选的隐式math.Ordering参数,如罗密欧·基恩兹勒在之前对此问题的回答中所示。Array.sortBy(_._2)只需在运行Tuple2 -reduce脚本之前定义一个隐式的反向排序,就可以对其_2字段上的数组_2元素执行反向排序,因为它覆盖了先前存在的Int排序。Romeo Kienzler已经定义的反向int排序是:
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}定义这种反向排序的另一种常见方法是颠倒a和b的顺序,并在比较定义的右侧删除(-1):
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = b.compare(a)
} 发布于 2015-06-11 19:04:52
用一种更具蟒蛇风格的方式。
# In descending order
''' The first parameter tells number of elements
to be present in output.
'''
data.takeOrdered(10, key=lambda x: -x[1])
# In Ascending order
data.takeOrdered(10, key=lambda x: x[1])发布于 2015-06-23 12:59:27
对于那些希望获得按值排序的前N个元素的人:
theRDD.takeOrdered(N, lambda (key, value): -1 * len(value))如果您希望按字符串长度排序。
另一方面,如果值已采用适合您所需排序的形式,则:
theRDD.takeOrdered(N, lambda (key, value): -1 * value)就足够了。
https://stackoverflow.com/questions/24656696
复制相似问题