前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark入门第一步:WordCount之java版、Scala版

Spark入门第一步:WordCount之java版、Scala版

作者头像
趣学程序-shaofeer
发布2019-10-29 16:36:20
1.7K0
发布2019-10-29 16:36:20
举报
文章被收录于专栏:upuptop的专栏upuptop的专栏

Spark入门第一步:WordCount之java版、Scala版

Spark入门系列,第一步,编写WordCount程序。

我们分别使用java和scala进行编写,从而比较二者的代码量

数据文件 通过读取下面的文件内容,统计每个单词出现的次数

代码语言:javascript
复制
java scala python android
spark storm spout bolt
kafka MQ
elasticsearch logstash kibana
hive hbase mysql oracle sqoop
hadoop hdfs map reduce
java scala python android
spark storm spout bolt
kafka MQ
java scala python android
spark storm spout bolt
kafka MQ
elasticsearch logstash kibana
hive hbase mysql oracle sqoop
hive hbase mysql oracle sqoop
hadoop hdfs map reduce

代码实现

•使用java代码进行编写

代码语言:javascript
复制
package top.wintp.java_spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import scala.Tuple2;
/**
 * @author: pyfysf
 * <p>
 * @qq: 337081267
 * <p>
 * @CSDN: http://blog.csdn.net/pyfysf
 * <p>
 * @blog: http://wintp.top
 * <p>
 * @email: pyfysf@163.com
 * <p>
 * @time: 2019/10/26
 */
public class SparkWordCount {
    public static void main(String[] args) {
        //    复杂模式
        //    创建SparkConf
        SparkConf conf = new SparkConf();
        conf.setAppName("spark_demo_java");
        conf.setMaster("local");

        //    创建javaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        //    读取文件
        JavaRDD<String> lines = sc.textFile("./data/words.txt");
        //    截取单词
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split("\\s+")).iterator();
            }
        });
        //    对单词进行计数
        JavaPairRDD<String, Integer> pairWord = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });

        //    根据key进行计算
        JavaPairRDD<String, Integer> result = pairWord.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i, Integer i2) throws Exception {
                return i + i2;
            }
        });

        //打印结果
        result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                System.out.println(stringIntegerTuple2);
            }
        });
        sc.stop();
    }
}

•利用lamda表达式简化java代码

代码语言:javascript
复制
package top.wintp.java_spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import scala.Tuple2;
/**
 * @author: pyfysf
 * <p>
 * @qq: 337081267
 * <p>
 * @CSDN: http://blog.csdn.net/pyfysf
 * <p>
 * @blog: http://wintp.top
 * <p>
 * @email: pyfysf@163.com
 * <p>
 * @time: 2019/10/26
 */
public class SparkWordCount {
    public static void main(String[] args) {
        //lamda表达式
        SparkConf conf = new SparkConf();
        conf.setAppName("spark_demo_java");
        conf.setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("./data/words.txt");
        JavaRDD<String> words = lines.flatMap((String line) -> Arrays.asList(line.split("\\s+")).iterator());
        JavaPairRDD<String, Integer> pairWords = words.mapToPair((String s) -> new Tuple2<>(s, 1));
        JavaPairRDD<String, Integer> result = pairWords.reduceByKey(Integer::sum);
        result.foreach((Tuple2<String, Integer> res) -> System.out.println(res));
        sc.stop();
    }
}

•使用scala代码编写

代码语言:javascript
复制
package top.wintp.scala_spark
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author: pyfysf
 *          <p>
 * @qq: 337081267
 *      <p>
 * @CSDN: http://blog.csdn.net/pyfysf
 *        <p>
 * @blog: http://wintp.top
 *        <p>
 * @email: pyfysf@163.com
 *         <p>
 * @time: 2019/10/26
 */
object SparkWordCount {
  def main(args: Array[String]): Unit = {
    //    完整版
    //    创建配置对象
    val conf = new SparkConf()
    //    设置运行模式
    conf.setMaster("local")
    //    设置任务名称
    conf.setAppName("sparkTest")
    //    创建SparkContext对象
    val sc = new SparkContext(conf)
    //    读取文件
    val lines = sc.textFile("./data/words.txt")
    //    切割文件
    val words = lines.flatMap((line: String) => {
      line.split("\\s+")
    })
    //    对word进行计数
    val pariWrod = words.map((tmp: String) => {
      new Tuple2(tmp, 1)
    })
    //    根据key进行聚合
    val result = pariWrod.reduceByKey((v1: Int, v2: Int) => {
      v1 + v2
    })
    //    输出结果
    result.foreach(println)
    //    释放资源
    sc.stop()
  }
}

•利用scala的特性简化代码

代码语言:javascript
复制
package top.wintp.scala_spark
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author: pyfysf
 *          <p>
 * @qq: 337081267
 *      <p>
 * @CSDN: http://blog.csdn.net/pyfysf
 *        <p>
 * @blog: http://wintp.top
 *        <p>
 * @email: pyfysf@163.com
 *         <p>
 * @time: 2019/10/26
 */
object SparkWordCount {
  def main(args: Array[String]): Unit = {
    //    简洁版
    val conf = new SparkConf().setAppName("sparkDemo").setMaster("local")
    val sc = new SparkContext(conf)
    val result = sc.textFile("./data/words.txt")
                    .flatMap(_.split("\\s+"))
                    .map((_, 1))
                    .reduceByKey(_ + _)
    result.foreach(println)
    sc.stop()
  }
}

建议大家对于java版和scala版本的这两种方式都要掌握。特别是scala的一行代码版本。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 趣学程序 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark入门第一步:WordCount之java版、Scala版
  • 代码实现
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档