前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >cloudera第三天

cloudera第三天

作者头像
DataScience
发布2019-12-30 11:38:14
4460
发布2019-12-30 11:38:14
举报
文章被收录于专栏:A2DataA2Data
所谓·生活

就是一系列下定决心的努力

· 正 · 文 · 来 · 啦 ·

Spark-Shell

启动Saprk

代码语言:javascript
复制
spark-shell --master yarn-client
代码语言:javascript
复制
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Spark 实操(入门)

代码语言:javascript
复制
// First we're going to import the classes we need
// 首先我们需要导入我们需要的类

scala> import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Job

scala> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

scala> import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.GenericRecord

scala> import parquet.hadoop.ParquetInputFormat
import parquet.hadoop.ParquetInputFormat

scala> import parquet.avro.AvroReadSupport
import parquet.avro.AvroReadSupport

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

// Then we create RDD's for 2 of the files we imported from MySQL with Sqoop
//然后我们使用Sqoop为我们从MySQL导入的2个文件创建RDD

// RDD's are Spark's data structures for working with distributed datasets
// RDD是Spark用于处理分布式数据集的数据结构


scala> def rddFromParquetHdfsFile(path: String): RDD[GenericRecord] = {
     |     val job = new Job()
     |     FileInputFormat.setInputPaths(job, path)
     |     ParquetInputFormat.setReadSupportClass(job,
     |         classOf[AvroReadSupport[GenericRecord]])
     |     return sc.newAPIHadoopRDD(job.getConfiguration,
     |         classOf[ParquetInputFormat[GenericRecord]],
     |         classOf[Void],
     |         classOf[GenericRecord]).map(x => x._2)
     | }
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
rddFromParquetHdfsFile: (path: String)org.apache.spark.rdd.RDD[org.apache.avro.generic.GenericRecord]

scala> 

scala> val warehouse = "hdfs://quickstart/user/hive/warehouse/"
warehouse: String = hdfs://quickstart/user/hive/warehouse/

scala> val order_items = rddFromParquetHdfsFile(warehouse + "order_items");
order_items: org.apache.spark.rdd.RDD[org.apache.avro.generic.GenericRecord] = MapPartitionsRDD[1] at map at <console>:41

scala> val products = rddFromParquetHdfsFile(warehouse + "products");
products: org.apache.spark.rdd.RDD[org.apache.avro.generic.GenericRecord] = MapPartitionsRDD[3] at map at <console>:41

// Next, we extract the fields from order_items and products that we care about
//接下来,我们从order_items和我们关心的产品中提取字段

// and get a list of every product, its name and quantity, grouped by order
//并获取每个产品的清单,名称和数量,按订单分组

scala> val orders = order_items.map { x => (
     |     x.get("order_item_product_id"),
     |     (x.get("order_item_order_id"), x.get("order_item_quantity")))
     | }.join(
     |   products.map { x => (
     |     x.get("product_id"),
     |     (x.get("product_name")))
     |   }
     | ).map(x => (
     |     scala.Int.unbox(x._2._1._1), // order_id
     |     (
     |         scala.Int.unbox(x._2._1._2), // quantity
     |         x._2._2.toString // product_name
     |     )
     | )).groupByKey()
orders: org.apache.spark.rdd.RDD[(Int, Iterable[(Int, String)])] = ShuffledRDD[10] at groupByKey at <console>:55

// Finally, we tally how many times each combination of products appears
// together in an order, then we sort them and take the 10 most common
//最后,我们统计了每个产品组合在订单中出现的次数,
//然后我们对它们进行排序并采用最常见的10个

scala> val cooccurrences = orders.map(order =>
     |   (
     |     order._1,
     |     order._2.toList.combinations(2).map(order_pair =>
     |         (
     |             if (order_pair(0)._2 < order_pair(1)._2)
     |                 (order_pair(0)._2, order_pair(1)._2)
     |             else
     |                 (order_pair(1)._2, order_pair(0)._2),
     |             order_pair(0)._1 * order_pair(1)._1
     |         )
     |     )
     |   )
     | )
cooccurrences: org.apache.spark.rdd.RDD[(Int, Iterator[((String, String), Int)])] = MapPartitionsRDD[11] at map at <console>:43

scala> val combos = cooccurrences.flatMap(x => x._2).reduceByKey((a, b) => a + b)
combos: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[13] at reduceByKey at <console>:45

scala> val mostCommon = combos.map(x => (x._2, x._1)).sortByKey(false).take(10)
mostCommon: Array[(Int, (String, String))] = Array((290678,(Perfect Fitness Perfect Rip Deck,Perfect Fitness Perfect Rip Deck)), (246633,(Nike Men's Dri-FIT Victory Golf Polo,Nike Men's Dri-FIT Victory Golf Polo)), (225067,(O'Brien Men's Neoprene Life Vest,O'Brien Men's Neoprene Life Vest)), (140254,(Nike Men's Free 5.0+ Running Shoe,Nike Men's Free 5.0+ Running Shoe)), (119904,(Under Armour Girls' Toddler Spine Surge Runni,Under Armour Girls' Toddler Spine Surge Runni)), (67876,(Nike Men's Dri-FIT Victory Golf Polo,Perfect Fitness Perfect Rip Deck)), (62924,(O'Brien Men's Neoprene Life Vest,Perfect Fitness Perfect Rip Deck)), (54399,(Nike Men's Dri-FIT Victory Golf Polo,O'Brien Men's Neoprene Life Vest)), (39656,(Nike Men's Free 5.0+ Running Shoe,Perfect Fitness Perfect Rip Deck)), (35...

// 我们打印结果,每行1个

scala> println(mostCommon.deep.mkString("\n"))
(290678,(Perfect Fitness Perfect Rip Deck,Perfect Fitness Perfect Rip Deck))
(246633,(Nike Men's Dri-FIT Victory Golf Polo,Nike Men's Dri-FIT Victory Golf Polo))
(225067,(O'Brien Men's Neoprene Life Vest,O'Brien Men's Neoprene Life Vest))
(140254,(Nike Men's Free 5.0+ Running Shoe,Nike Men's Free 5.0+ Running Shoe))
(119904,(Under Armour Girls' Toddler Spine Surge Runni,Under Armour Girls' Toddler Spine Surge Runni))
(67876,(Nike Men's Dri-FIT Victory Golf Polo,Perfect Fitness Perfect Rip Deck))
(62924,(O'Brien Men's Neoprene Life Vest,Perfect Fitness Perfect Rip Deck))
(54399,(Nike Men's Dri-FIT Victory Golf Polo,O'Brien Men's Neoprene Life Vest))
(39656,(Nike Men's Free 5.0+ Running Shoe,Perfect Fitness Perfect Rip Deck))
(35092,(Perfect Fitness Perfect Rip Deck,Under Armour Girls' Toddler Spine Surge Runni))

//然后退出Spark shell
scala> exit
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
Dec 27, 2018 6:45:10 AM INFO: parquet.hadoop.ParquetInputFormat: Total input paths to process : 8
Dec 27, 2018 6:45:10 AM INFO: parquet.hadoop.ParquetInputFormat: Total input paths to process : 8
[root@quickstart ~]# 

这里可以

左右滑动哦

数据正在计算中······

最终结果

‘ 所谓成功 ’

坚持把简单的事情做好就是不简单

坚持把平凡的事情做好就是不平凡

每个人都有潜在的能量,只是很容易--

被习惯所掩盖,

被时间所迷离,

被惰性所消磨。

那么,成功呢?就是在平凡中做出不平凡的坚持

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-12-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 DataScience 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 启动Saprk
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档