首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark get集合按值排序

Spark get集合按值排序
EN

Stack Overflow用户
提问于 2014-07-09 22:38:55
回答 11查看 95.6K关注 0票数 38

我尝试了这个教程http://spark.apache.org/docs/latest/quick-start.html我首先从一个文件中创建了一个集合

代码语言:javascript
复制
textFile = sc.textFile("README.md")

然后,我尝试了一个命令来计算单词:

代码语言:javascript
复制
wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

打印集合的步骤:

代码语言:javascript
复制
 wordCounts.collect()

我发现了如何使用sortByKey命令按单词对其进行排序。我想知道怎么可能做同样的事情来按值排序,在本例中是一个单词在文档中出现的数字。

EN

回答 11

Stack Overflow用户

发布于 2014-12-14 09:22:55

排序通常应该在调用collect()之前完成,因为这样会将数据集返回给驱动程序,而且这也是在java中对hadoop map-reduce作业进行编程的方式,以便(通常)将所需的最终输出写入HDFS。使用spark API,这种方法提供了将输出以“原始”形式写入所需位置的灵活性,例如写入到文件中,在该文件中可以将输出用作进一步处理的输入。

在collect()之前使用spark的scala API排序可以遵循eliasah的建议,并使用Tuple2.swap()两次,一次在排序之前,一次在排序之后,以便生成一个元组列表,该列表按其第二个字段(名为_2)的升序或降序排序,并包含其第一个字段(名为_1)中的单词数。下面是如何在spark-shell中编写脚本的示例:

代码语言:javascript
复制
// this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D
val file = sc.textFile("some_local_text_file_pathname")
val wordCounts = file.flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _, 1)  // 2nd arg configures one task (same as number of partitions)
  .map(item => item.swap) // interchanges position of entries in each tuple
  .sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task
  .map(item => item.swap)

为了颠倒排序的顺序,请使用sortByKey(false,1),因为它的第一个参数是升序的布尔值。它的第二个参数是任务的数量(等同于分区的数量),它被设置为1,用于测试只需要一个输出数据文件的小输入文件;reduceByKey也接受这个可选参数。

之后,可以将wordCounts RDD作为文本文件保存到包含saveAsTextFile(directory_pathname)的目录中,在该目录中将根据为作业配置的减速机数量(每个减速机1个输出数据文件),存放一个或多个part-xxxxx文件(从part-00000开始)、一个_SUCCESS文件(取决于作业是否成功)和.crc文件。

使用pyspark,一个与上面所示的scala脚本非常相似的python脚本,产生的输出实际上是相同的。以下是pyspark版本,演示了按值对集合进行排序:

代码语言:javascript
复制
file = sc.textFile("file:some_local_text_file_pathname")
wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b, 1) \ # last arg configures one reducer task
    .map(lambda (a, b): (b, a)) \
    .sortByKey(1, 1) \ # 1st arg configures ascending sort, 2nd configures 1 task
    .map(lambda (a, b): (b, a))

为了降序sortbyKey,它的第一个arg应该是0。由于python捕获前导空格和尾随空格作为数据,因此在将每行分隔为空格之前会插入strip(),但使用spark-shell/scala不需要这样做。

spark和python版本的wordCount输出的主要区别在于spark输出(word,3) python输出(u'word',3)。

有关spark RDD方法的更多信息,请参阅针对python的http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html和针对scala的https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD

在spark-shell中,在wordCounts上运行collect()将其从一个RDD转换为一个数组(String,Int) = ArrayTuple2(String,Int),该数组本身可以在每个Tuple2元素的第二个字段上使用以下命令进行排序:

代码语言:javascript
复制
Array.sortBy(_._2) 

sortBy还采用了一个可选的隐式math.Ordering参数,如罗密欧·基恩兹勒在之前对此问题的回答中所示。Array.sortBy(_._2)只需在运行Tuple2 -reduce脚本之前定义一个隐式的反向排序,就可以对其_2字段上的数组_2元素执行反向排序,因为它覆盖了先前存在的Int排序。Romeo Kienzler已经定义的反向int排序是:

代码语言:javascript
复制
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
  override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}

定义这种反向排序的另一种常见方法是颠倒a和b的顺序,并在比较定义的右侧删除(-1):

代码语言:javascript
复制
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
  override def compare(a: Int, b: Int) = b.compare(a)
}   
票数 32
EN

Stack Overflow用户

发布于 2015-06-11 19:04:52

用一种更具蟒蛇风格的方式。

代码语言:javascript
复制
# In descending order
''' The first parameter tells number of elements
    to be present in output.
''' 
data.takeOrdered(10, key=lambda x: -x[1])
# In Ascending order
data.takeOrdered(10, key=lambda x: x[1])
票数 20
EN

Stack Overflow用户

发布于 2015-06-23 12:59:27

对于那些希望获得按值排序的前N个元素的人:

代码语言:javascript
复制
theRDD.takeOrdered(N, lambda (key, value): -1 * len(value))

如果您希望按字符串长度排序。

另一方面,如果值已采用适合您所需排序的形式,则:

代码语言:javascript
复制
theRDD.takeOrdered(N, lambda (key, value): -1 * value)

就足够了。

票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/24656696

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档