首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >提交第一个Spark统计文件单词数程序,配合hadoop hdfs

提交第一个Spark统计文件单词数程序,配合hadoop hdfs

作者头像
算法之名
发布2019-08-20 16:07:58
5440
发布2019-08-20 16:07:58
举报
文章被收录于专栏:算法之名算法之名

先说明,这次我们用的还不是Spark streaming,而是从hadoop hdfs拿取文件,经过计算,再把结果放回hadoop hdfs.

首先我们需要在之前的工程文件下修改我们的pom(具体参考IDEA全程搭建第一个Scala Spark streaming maven工程),增加hadoop版本号

<hadoop.version>2.7.6</hadoop.version>

添加两个依赖

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>${hadoop.version}</version>
</dependency>

修改打包方式

<build>
  <pluginManagement>
    <plugins>
      <!-- 编译scala的插件 -->
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.2</version>
      </plugin>
      <!-- 编译java的插件 -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.5.1</version>
      </plugin>
    </plugins>
  </pluginManagement>
  <plugins>
    <plugin>
      <groupId>net.alchim31.maven</groupId>
      <artifactId>scala-maven-plugin</artifactId>
      <executions>
        <execution>
          <id>scala-compile-first</id>
          <phase>process-resources</phase>
          <goals>
            <goal>add-source</goal>
            <goal>compile</goal>
          </goals>
        </execution>
        <execution>
          <id>scala-test-compile</id>
          <phase>process-test-resources</phase>
          <goals>
            <goal>testCompile</goal>
          </goals>
        </execution>
      </executions>
    </plugin>

    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <executions>
        <execution>
          <phase>compile</phase>
          <goals>
            <goal>compile</goal>
          </goals>
        </execution>
      </executions>
    </plugin>


    <!-- 打jar插件 -->
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <version>2.4.3</version>
      <executions>
        <execution>
          <phase>package</phase>
          <goals>
            <goal>shade</goal>
          </goals>
          <configuration>
            <filters>
              <filter>
                <artifact>*:*</artifact>
                <excludes>
                  <exclude>META-INF/*.SF</exclude>
                  <exclude>META-INF/*.DSA</exclude>
                  <exclude>META-INF/*.RSA</exclude>
                </excludes>
              </filter>
            </filters>
          </configuration>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

这样我们可以打出一个富jar包(包含所有第三方jar包的包),这个文件可能会比较大。

先来写一个单词统计的对象(Scala实现)

object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("ScalaWorkCount")
    val scc = new SparkContext(conf)
    //从hadoop hdfs获取文件
    val lines = scc.textFile(args(0))
    //统计文件中的单词的个数
    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    //将统计结果存入hadoop hdfs
    result.saveAsTextFile(args(1))
    scc.stop()
  }
}

用maven打包后,得到这样一组文件,而我们需要的是这个大的jar包

在linux系统中,我们随便写一个文件,假设我们命名为a.txt,内容也随便写几个单词

ice park dog fish dinsh cark balana apple fuck fool my him cry

然后将其上传到hadoop hdfs中

root@host2 bin# ./hdfs dfs -put ./a.txt /usr/file

root@host2 bin# ./hdfs dfs -lsr /

lsr: DEPRECATED: Please use 'ls -R' instead.

drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr

drwxr-xr-x - root supergroup 0 2018-11-03 16:06 /usr/file

-rw-r--r-- 3 root supergroup 63 2018-11-03 16:06 /usr/file/a.txt

-rw-r--r-- 3 root supergroup 173271626 2018-09-14 13:50 /usr/file/jdk-8u45-linux-x64.tar.gz

我们可以查看他的内容

root@host2 bin# ./hdfs dfs -cat /usr/file/a.txt

ice park dog fish dinsh cark balana apple fuck fool my him cry

此时我们也把我们需要的jar包上传到linux系统中

执行命令spark-submit得到一串输出

./spark-submit --master spark://host2:7077,host1:7077 --class com.guanjian.ScalaWordCount ./jar/sparknew-1.0-SNAPSHOT.jar hdfs://host2:8020/usr/file/a.txt hdfs://host2:8020/usr/file/wcount

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

18/11/03 16:20:21 INFO SparkContext: Running Spark version 2.2.0

18/11/03 16:20:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

18/11/03 16:20:22 INFO SparkContext: Submitted application: ScalaWorkCount

18/11/03 16:20:22 INFO SecurityManager: Changing view acls to: root

18/11/03 16:20:22 INFO SecurityManager: Changing modify acls to: root

18/11/03 16:20:22 INFO SecurityManager: Changing view acls groups to:

18/11/03 16:20:22 INFO SecurityManager: Changing modify acls groups to:

18/11/03 16:20:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()

18/11/03 16:20:22 INFO Utils: Successfully started service 'sparkDriver' on port 42065.

18/11/03 16:20:22 INFO SparkEnv: Registering MapOutputTracker

18/11/03 16:20:22 INFO SparkEnv: Registering BlockManagerMaster

18/11/03 16:20:22 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information

18/11/03 16:20:22 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up

18/11/03 16:20:22 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7aa12077-6316-47b4-97e9-f65b3009ac79

18/11/03 16:20:22 INFO MemoryStore: MemoryStore started with capacity 366.3 MB

18/11/03 16:20:22 INFO SparkEnv: Registering OutputCommitCoordinator

18/11/03 16:20:23 INFO Utils: Successfully started service 'SparkUI' on port 4040.

18/11/03 16:20:23 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.5.182:4040

18/11/03 16:20:23 INFO SparkContext: Added JAR file:/usr/local/spark2.2/bin/./jar/sparknew-1.0-SNAPSHOT.jar at spark://192.168.5.182:42065/jars/sparknew-1.0-SNAPSHOT.jar with timestamp 1541233223259

18/11/03 16:20:23 INFO Executor: Starting executor ID driver on host localhost

18/11/03 16:20:23 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44491.

18/11/03 16:20:23 INFO NettyBlockTransferService: Server created on 192.168.5.182:44491

18/11/03 16:20:23 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy

18/11/03 16:20:23 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.5.182, 44491, None)

18/11/03 16:20:23 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.5.182:44491 with 366.3 MB RAM, BlockManagerId(driver, 192.168.5.182, 44491, None)

18/11/03 16:20:23 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.5.182, 44491, None)

18/11/03 16:20:23 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.5.182, 44491, None)

18/11/03 16:20:24 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 236.5 KB, free 366.1 MB)

18/11/03 16:20:24 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.9 KB, free 366.0 MB)

18/11/03 16:20:24 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.5.182:44491 (size: 22.9 KB, free: 366.3 MB)

18/11/03 16:20:24 INFO SparkContext: Created broadcast 0 from textFile at ScalaWordCount.scala:13

18/11/03 16:20:24 INFO FileInputFormat: Total input paths to process : 1

18/11/03 16:20:25 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

18/11/03 16:20:25 INFO SparkContext: Starting job: saveAsTextFile at ScalaWordCount.scala:15

18/11/03 16:20:25 INFO DAGScheduler: Registering RDD 3 (map at ScalaWordCount.scala:14)

18/11/03 16:20:25 INFO DAGScheduler: Got job 0 (saveAsTextFile at ScalaWordCount.scala:15) with 2 output partitions

18/11/03 16:20:25 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at ScalaWordCount.scala:15)

18/11/03 16:20:25 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)

18/11/03 16:20:25 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)

18/11/03 16:20:25 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD3 at map at ScalaWordCount.scala:14), which has no missing parents

18/11/03 16:20:25 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.7 KB, free 366.0 MB)

18/11/03 16:20:25 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.8 KB, free 366.0 MB)

18/11/03 16:20:25 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.5.182:44491 (size: 2.8 KB, free: 366.3 MB)

18/11/03 16:20:25 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006

18/11/03 16:20:25 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD3 at map at ScalaWordCount.scala:14) (first 15 tasks are for partitions Vector(0, 1))

18/11/03 16:20:25 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks

18/11/03 16:20:25 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 4840 bytes)

18/11/03 16:20:25 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, ANY, 4840 bytes)

18/11/03 16:20:25 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)

18/11/03 16:20:25 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

18/11/03 16:20:25 INFO Executor: Fetching spark://192.168.5.182:42065/jars/sparknew-1.0-SNAPSHOT.jar with timestamp 1541233223259

18/11/03 16:20:25 INFO TransportClientFactory: Successfully created connection to /192.168.5.182:42065 after 51 ms (0 ms spent in bootstraps)

18/11/03 16:20:25 INFO Utils: Fetching spark://192.168.5.182:42065/jars/sparknew-1.0-SNAPSHOT.jar to /tmp/spark-d1405a32-85be-40d6-ba26-7abb1632abc4/userFiles-5bb37b1d-3c33-4f7b-acdb-ff85603d088f/fetchFileTemp5672377923687175291.tmp

18/11/03 16:20:26 INFO Executor: Adding file:/tmp/spark-d1405a32-85be-40d6-ba26-7abb1632abc4/userFiles-5bb37b1d-3c33-4f7b-acdb-ff85603d088f/sparknew-1.0-SNAPSHOT.jar to class loader

18/11/03 16:20:26 INFO HadoopRDD: Input split: hdfs://host2:8020/usr/file/a.txt:0+31

18/11/03 16:20:26 INFO HadoopRDD: Input split: hdfs://host2:8020/usr/file/a.txt:31+32

18/11/03 16:20:26 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1155 bytes result sent to driver

18/11/03 16:20:26 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1026 bytes result sent to driver

18/11/03 16:20:26 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 824 ms on localhost (executor driver) (1/2)

18/11/03 16:20:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 810 ms on localhost (executor driver) (2/2)

18/11/03 16:20:26 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

18/11/03 16:20:26 INFO DAGScheduler: ShuffleMapStage 0 (map at ScalaWordCount.scala:14) finished in 0.858 s

18/11/03 16:20:26 INFO DAGScheduler: looking for newly runnable stages

18/11/03 16:20:26 INFO DAGScheduler: running: Set()

18/11/03 16:20:26 INFO DAGScheduler: waiting: Set(ResultStage 1)

18/11/03 16:20:26 INFO DAGScheduler: failed: Set()

18/11/03 16:20:26 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD5 at saveAsTextFile at ScalaWordCount.scala:15), which has no missing parents

18/11/03 16:20:26 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 72.0 KB, free 366.0 MB)

18/11/03 16:20:26 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 25.9 KB, free 365.9 MB)

18/11/03 16:20:26 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.5.182:44491 (size: 25.9 KB, free: 366.2 MB)

18/11/03 16:20:26 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006

18/11/03 16:20:26 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD5 at saveAsTextFile at ScalaWordCount.scala:15) (first 15 tasks are for partitions Vector(0, 1))

18/11/03 16:20:26 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks

18/11/03 16:20:26 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, ANY, 4621 bytes)

18/11/03 16:20:26 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, ANY, 4621 bytes)

18/11/03 16:20:26 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)

18/11/03 16:20:26 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)

18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks

18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks

18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms

18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms

18/11/03 16:20:26 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.5.182:44491 in memory (size: 2.8 KB, free: 366.3 MB)

18/11/03 16:20:26 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

18/11/03 16:20:26 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

18/11/03 16:20:26 INFO FileOutputCommitter: Saved output of task 'attempt_20181103162025_0001_m_000000_2' to hdfs://host2:8020/usr/file/wcount/_temporary/0/task_20181103162025_0001_m_000000

18/11/03 16:20:26 INFO FileOutputCommitter: Saved output of task 'attempt_20181103162025_0001_m_000001_3' to hdfs://host2:8020/usr/file/wcount/_temporary/0/task_20181103162025_0001_m_000001

18/11/03 16:20:26 INFO SparkHadoopMapRedUtil: attempt_20181103162025_0001_m_000001_3: Committed

18/11/03 16:20:26 INFO SparkHadoopMapRedUtil: attempt_20181103162025_0001_m_000000_2: Committed

18/11/03 16:20:26 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1267 bytes result sent to driver

18/11/03 16:20:26 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1267 bytes result sent to driver

18/11/03 16:20:26 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 325 ms on localhost (executor driver) (1/2)

18/11/03 16:20:26 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 330 ms on localhost (executor driver) (2/2)

18/11/03 16:20:26 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

18/11/03 16:20:26 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at ScalaWordCount.scala:15) finished in 0.332 s

18/11/03 16:20:26 INFO DAGScheduler: Job 0 finished: saveAsTextFile at ScalaWordCount.scala:15, took 1.653739 s

18/11/03 16:20:26 INFO SparkUI: Stopped Spark web UI at http://192.168.5.182:4040

18/11/03 16:20:26 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

18/11/03 16:20:26 INFO MemoryStore: MemoryStore cleared

18/11/03 16:20:26 INFO BlockManager: BlockManager stopped

18/11/03 16:20:26 INFO BlockManagerMaster: BlockManagerMaster stopped

18/11/03 16:20:26 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

18/11/03 16:20:26 INFO SparkContext: Successfully stopped SparkContext

18/11/03 16:20:26 INFO ShutdownHookManager: Shutdown hook called

18/11/03 16:20:26 INFO ShutdownHookManager: Deleting directory /tmp/spark-d1405a32-85be-40d6-ba26-7abb1632abc4

这个时候我们来查看保存在hadoop hdfs中的结果。

root@host2 bin# ./hdfs dfs -lsr /

lsr: DEPRECATED: Please use 'ls -R' instead.

drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr

drwxr-xr-x - root supergroup 0 2018-11-03 16:20 /usr/file

-rw-r--r-- 3 root supergroup 63 2018-11-03 16:06 /usr/file/a.txt

-rw-r--r-- 3 root supergroup 173271626 2018-09-14 13:50 /usr/file/jdk-8u45-linux-x64.tar.gz

drwxr-xr-x - root supergroup 0 2018-11-03 16:20 /usr/file/wcount

-rw-r--r-- 3 root supergroup 0 2018-11-03 16:20 /usr/file/wcount/_SUCCESS

-rw-r--r-- 3 root supergroup 78 2018-11-03 16:20 /usr/file/wcount/part-00000

-rw-r--r-- 3 root supergroup 37 2018-11-03 16:20 /usr/file/wcount/part-00001

root@host2 bin# ./hdfs dfs -cat /usr/file/wcount/part-00000

(him,1)

(park,1)

(fool,1)

(dinsh,1)

(fish,1)

(dog,1)

(apple,1)

(cry,1)

(my,1)

root@host2 bin# ./hdfs dfs -cat /usr/file/wcount/part-00001

(ice,1)

(cark,1)

(balana,1)

(fuck,1)

这样我们就得到了我们需要的结果,文本文件a.txt的单词统计,当然这种处理主要是一种离线处理,跟Web程序的关联不大,要做实时处理还要用到Spark streaming。

当然Spark也支持命令行式的操作,类似于Scala一样,如下,我们给本次操作分配5G内存,16线程

root@host2 bin# ./spark-shell --master spark://host2:7077,host1:7077 --executor-memory 5g --total-executor-cores 16

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

18/11/03 20:59:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

18/11/03 21:00:01 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

Spark context Web UI available at http://192.168.5.182:4040

Spark context available as 'sc' (master = spark://host2:7077,host1:7077, app id = app-20181103205956-0001).

Spark session available as 'spark'.

Welcome to

   \_\_\_\_              \_\_
  / \_\_/\_\_  \_\_\_ \_\_\_\_\_/ /\_\_
 \_\ \/ \_ \/ \_ `/ \_\_/  '\_/
/\_\_\_/ .\_\_/\\_,\_/\_/ /\_/\\_\   version 2.2.0
   /\_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)

Type in expressions to have them evaluated.

Type :help for more information.

scala>

比如我们要查看a.txt里面有多少个单词

scala> val r1 = sc.textFile("hdfs://host2:8020/usr/file/a.txt")

r1: org.apache.spark.rdd.RDDString = hdfs://host2:8020/usr/file/a.txt MapPartitionsRDD1 at textFile at <console>:24

我们可以看到他的返回值是一个RDD类型,那RDD是什么呢,一张图来说明

scala> r1.count

res0: Long = 1

scala> r1.flatMap(_.split(" ")).map((_,1)).count

res1: Long = 13

我们对这个RDD操作,行数为1行,单词数为13个单词

最后我们来看一下Spark的8091端口的系统界面

我们执行过的任务在这里面都可以有一些记录。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档