Intellij idea配置Spark开发环境,统计哈姆雷特词频(2)

idea 新建maven 项目

  1. 输入maven坐标

maven 坐标

  1. 编辑maven文件

Spark 体系

中间层Spark,即核心模块Spark Core,必须在maven中引用。 编译Spark还要声明java8编译工具。

<properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

idea自动加载引用,在窗口左侧Project导航栏-->External Libraries中看到引用org.apache.spark中spark-core_2.11-2.1.0.jar文件。

idea Externel Libraries

注:Spark Streaming是流式计算框架、SparkSQL数据库工具、Mlib机器学习框架、GraphX图计算工具。

Java 8 lambda函数风格的wordCount

//定义单词总数累加器、和停用词累加器
Accumulator countTotal = jsc.accumulator(0);
Accumulator stopTotal = jsc.accumulator(0);
// 文件初始化RDD
JavaRDD<String> stopword = jsc.textFile("data/text/stopword.txt");
JavaRDD<String> rdd = jsc.textFile("data/text/Hamlet.txt");
// RDD 转换为List
List<String> stopWordList = stopword.collect();
// Broadcast 广播变量,task共享executor的变量
Broadcast<List<String>> broadcastedStopWordSet = jsc.broadcast(stopWordList);

rdd.filter(l->l.length()>0)
        .flatMap(l-> Arrays.asList(l.trim().split(" ")).iterator()) 
        // 将line分割展成词向量,词向量在连接,返回Rdd<String>
        .map(v->v.replaceAll("['.,:;?!-]", "").toLowerCase())
        // 特殊字符处理, Rdd<String>
        .filter(v->{
            boolean isStop = false;
            countTotal.add(1);
            if(broadcastedStopWordSet.value().contains(v)){
                stopTotal.add(1);
                isStop = true;
            }
            return !isStop;
        })
        //遍历总数计数、停用词计数,过滤停止词, Rdd<String>
        .mapToPair(v-> new Tuple2<>(v,1))
        .reduceByKey((v1,v2)->v1+v2)
        //统计个数
        .mapToPair(p-> new Tuple2<>(p._2,p._1))
        .sortByKey(false)
        //排序
        .take(10).forEach(e->{
            System.out.println(e._2+":"+e._1);
        });
  1. 将line分割展成词向量,词向量连接,flatmap返回Rdd<String>
  2. 特殊字符处理,返回 Rdd<String>
  3. 遍历总数计数、停用词计数,过滤停止词, 返回Rdd<String>
  4. Reduce Rdd<String,1>,返回Rdd<String,total>
  5. 排序 SortByKey,返回 Rdd<String,total>

后期有更多案例介绍Java 8 lambda风格的RDD开发

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏数值分析与有限元编程

可视化 | 一个三角形常应变单元后处理例子

昨天提到了应力云图,其实质是用不同的颜色填充等值线。有了结点的应力值,单元内任意一点的应力值是通过插值实现的。下面来看一个悬臂梁的综合后处理。 如图所示,一个悬...

2847
来自专栏xingoo, 一个梦想做发明家的程序员

Spark源码分析之分区器的作用

最近因为手抖,在Spark中给自己挖了一个数据倾斜的坑。为了解决这个问题,顺便研究了下Spark分区器的原理,趁着周末加班总结一下~ 先说说数据倾斜 数据...

24410
来自专栏AI研习社

TensorFlow全新的数据读取方式:Dataset API入门教程

Dataset API是TensorFlow 1.3版本中引入的一个新的模块,主要服务于数据读取,构建输入数据的pipeline。 此前,在TensorFlow...

4073
来自专栏数据结构与算法

05:素数回文数的个数

05:素数回文数的个数 查看 提交 统计 提问 总时间限制: 1000ms 内存限制: 65536kB描述 求11到n之间(包括n),既是素数又是回文数的整数...

30510
来自专栏LhWorld哥陪你聊算法

【推荐系统篇】--推荐系统之训练模型

经过之前的训练数据的构建可以得到所有特征值为1的模型文件,本文将继续构建训练数据特征并构建模型。

1831
来自专栏潇涧技术专栏

Python Algorithms - C2 The basics

本节主要介绍了三个内容:算法渐近运行时间的表示方法、六条算法性能评估的经验以及Python中树和图的实现方式。

1112
来自专栏聊聊技术

原 "二分查找(Binary Search

43311
来自专栏聊聊技术

原 初学图论-DAG单源最短路径算法

38312
来自专栏AI科技评论

开发 | TensorFlow全新的数据读取方式:Dataset API入门教程

AI科技评论按:本文作者何之源,该文首发于知乎专栏AI Insight (https://zhuanlan.zhihu.com/ai-insight),AI科技...

3325
来自专栏漫漫深度学习路

pytorch学习笔记(六):自定义Datasets

什么是Datasets: 在输入流水线中,我们看到准备数据的代码是这么写的data = datasets.CIFAR10("./data/", transfor...

4827

扫码关注云+社区

领取腾讯云代金券