前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark笔记10-demo

Spark笔记10-demo

作者头像
皮大大
发布2021-03-02 15:55:15
4750
发布2021-03-02 15:55:15
举报
文章被收录于专栏:机器学习/数据可视化

案例

根据几个实际的应用案例来学会sparkmap、filter、take等函数的使用

案例1

找出TOP5的值

  • filter(func):筛选出符合条件的数据
  • map(func):对传入数据执行func操作
  • sortByKey():只能对键值对进行操作,默认是升序
代码语言:javascript
复制
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf=conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file")  # 得到RDD元素,每个RDD元素都是文本文件中的一行数据(可能存在空行)

res1 = lines.filter(lambda line:(len(line.strip()) > 0) and (len(line.split(",")) == 4))   # 字符串后面的空格去掉,并且保证长度是4
res2 = res1.map(lambda x:x.split(",")[2])  # 将列表中的元素分割,取出第3个元素,仍是字符串
res3 = res2.map(lambda x:(int(x), ""))  # 将字符串转成int类型,并且变成key-value形式(50, ""),value都是空格
res4 = res3.repartition(1)
res5 = res4.sortByKey(False)   # sortByKey的对象必须是键值对;按照key进行降序排列,value不动
res6 = res5.map(lambda x:x[0]) # 取出第一个元素并通过take取出前5个
res7 = res6.take(5)
for a in res7:
  print(a)
文件全局排序
代码语言:javascript
复制
from pyspark import SparkConf, SparkContext

index = 0

def getindex():
  global index
  index += 1
  return index

def main():
  conf = SparkConf().setMaster("local").setAppName("ReadHBase")
  sc = SparkContext(conf=conf)
  lines = sc.textFile("file:///usr/local/spark/rdd/filesort/file.txt")
  index = 0
  res1 = lines.filter(lambda line:(len(line.strip()) > 0 ))
  res2 = res1.map(lambda x: (int(x.strip()),""))
  res3 = res2.repartition(1)
  res4 = res3.sortByKey(True)  # 升序排列
  res5 = res4.map(lambda x:x[0])
  res6 = res5.map(lambda x:(getindex(),x))
  res6.foreach(print)
  res6.saveAsFile("file:///usr/local/spark/code/rdd/filesort/result") # 结果写进目录中-

二次排序
代码语言:javascript
复制
from operator import gt
from pyspark import SparkContext, SparkConf

class SecondarySortKey():
  def __int__(self,k):   # 构造函数
    self.column1 = k[0]
    self.column2 = k[1]

  def __gt__(self,other):  # 重写比较函数
    if other.column1 = self.column1:   # 如果第一个元素相等,表第二个
      return gt(self.column2, other.column2)
    else:
      return gt(self.column1, other.column1)  # 否则直接比较第一个


def main():
  conf = SparkConf().setMaster("local").setAppName("ReadHBase")
  sc = SparkContext(conf=conf)
  rdd1 = sc.textFile("file:///usr/local/spark/rdd/filesort/file.txt")
  rdd2 = rdd1.filter(lambda x:(len(x.strip()) > 0 ))   # 空行消掉
  rdd3 = rdd2.map(lambda x: (int(x.split(" ")[0]), int(x.split(" ")[1])), x)
  rdd4 = rdd3.map(lambda x:(SecondarySortKey(x[0]), x[1]))
  rdd5 = rdd4.sortByKey(False)
  rdd6 = rdd5.map(lambda x: x[1])
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-10-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 案例
    • 案例1
      • 文件全局排序
        • 二次排序
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档