就是一系列下定决心的努力
· 正 · 文 · 来 · 啦 ·
Spark-Shell
spark-shell --master yarn-client
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Spark 实操(入门)
// First we're going to import the classes we need
// 首先我们需要导入我们需要的类
scala> import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Job
scala> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
scala> import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.GenericRecord
scala> import parquet.hadoop.ParquetInputFormat
import parquet.hadoop.ParquetInputFormat
scala> import parquet.avro.AvroReadSupport
import parquet.avro.AvroReadSupport
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
// Then we create RDD's for 2 of the files we imported from MySQL with Sqoop
//然后我们使用Sqoop为我们从MySQL导入的2个文件创建RDD
// RDD's are Spark's data structures for working with distributed datasets
// RDD是Spark用于处理分布式数据集的数据结构
scala> def rddFromParquetHdfsFile(path: String): RDD[GenericRecord] = {
| val job = new Job()
| FileInputFormat.setInputPaths(job, path)
| ParquetInputFormat.setReadSupportClass(job,
| classOf[AvroReadSupport[GenericRecord]])
| return sc.newAPIHadoopRDD(job.getConfiguration,
| classOf[ParquetInputFormat[GenericRecord]],
| classOf[Void],
| classOf[GenericRecord]).map(x => x._2)
| }
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
rddFromParquetHdfsFile: (path: String)org.apache.spark.rdd.RDD[org.apache.avro.generic.GenericRecord]
scala>
scala> val warehouse = "hdfs://quickstart/user/hive/warehouse/"
warehouse: String = hdfs://quickstart/user/hive/warehouse/
scala> val order_items = rddFromParquetHdfsFile(warehouse + "order_items");
order_items: org.apache.spark.rdd.RDD[org.apache.avro.generic.GenericRecord] = MapPartitionsRDD[1] at map at <console>:41
scala> val products = rddFromParquetHdfsFile(warehouse + "products");
products: org.apache.spark.rdd.RDD[org.apache.avro.generic.GenericRecord] = MapPartitionsRDD[3] at map at <console>:41
// Next, we extract the fields from order_items and products that we care about
//接下来,我们从order_items和我们关心的产品中提取字段
// and get a list of every product, its name and quantity, grouped by order
//并获取每个产品的清单,名称和数量,按订单分组
scala> val orders = order_items.map { x => (
| x.get("order_item_product_id"),
| (x.get("order_item_order_id"), x.get("order_item_quantity")))
| }.join(
| products.map { x => (
| x.get("product_id"),
| (x.get("product_name")))
| }
| ).map(x => (
| scala.Int.unbox(x._2._1._1), // order_id
| (
| scala.Int.unbox(x._2._1._2), // quantity
| x._2._2.toString // product_name
| )
| )).groupByKey()
orders: org.apache.spark.rdd.RDD[(Int, Iterable[(Int, String)])] = ShuffledRDD[10] at groupByKey at <console>:55
// Finally, we tally how many times each combination of products appears
// together in an order, then we sort them and take the 10 most common
//最后,我们统计了每个产品组合在订单中出现的次数,
//然后我们对它们进行排序并采用最常见的10个
scala> val cooccurrences = orders.map(order =>
| (
| order._1,
| order._2.toList.combinations(2).map(order_pair =>
| (
| if (order_pair(0)._2 < order_pair(1)._2)
| (order_pair(0)._2, order_pair(1)._2)
| else
| (order_pair(1)._2, order_pair(0)._2),
| order_pair(0)._1 * order_pair(1)._1
| )
| )
| )
| )
cooccurrences: org.apache.spark.rdd.RDD[(Int, Iterator[((String, String), Int)])] = MapPartitionsRDD[11] at map at <console>:43
scala> val combos = cooccurrences.flatMap(x => x._2).reduceByKey((a, b) => a + b)
combos: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[13] at reduceByKey at <console>:45
scala> val mostCommon = combos.map(x => (x._2, x._1)).sortByKey(false).take(10)
mostCommon: Array[(Int, (String, String))] = Array((290678,(Perfect Fitness Perfect Rip Deck,Perfect Fitness Perfect Rip Deck)), (246633,(Nike Men's Dri-FIT Victory Golf Polo,Nike Men's Dri-FIT Victory Golf Polo)), (225067,(O'Brien Men's Neoprene Life Vest,O'Brien Men's Neoprene Life Vest)), (140254,(Nike Men's Free 5.0+ Running Shoe,Nike Men's Free 5.0+ Running Shoe)), (119904,(Under Armour Girls' Toddler Spine Surge Runni,Under Armour Girls' Toddler Spine Surge Runni)), (67876,(Nike Men's Dri-FIT Victory Golf Polo,Perfect Fitness Perfect Rip Deck)), (62924,(O'Brien Men's Neoprene Life Vest,Perfect Fitness Perfect Rip Deck)), (54399,(Nike Men's Dri-FIT Victory Golf Polo,O'Brien Men's Neoprene Life Vest)), (39656,(Nike Men's Free 5.0+ Running Shoe,Perfect Fitness Perfect Rip Deck)), (35...
// 我们打印结果,每行1个
scala> println(mostCommon.deep.mkString("\n"))
(290678,(Perfect Fitness Perfect Rip Deck,Perfect Fitness Perfect Rip Deck))
(246633,(Nike Men's Dri-FIT Victory Golf Polo,Nike Men's Dri-FIT Victory Golf Polo))
(225067,(O'Brien Men's Neoprene Life Vest,O'Brien Men's Neoprene Life Vest))
(140254,(Nike Men's Free 5.0+ Running Shoe,Nike Men's Free 5.0+ Running Shoe))
(119904,(Under Armour Girls' Toddler Spine Surge Runni,Under Armour Girls' Toddler Spine Surge Runni))
(67876,(Nike Men's Dri-FIT Victory Golf Polo,Perfect Fitness Perfect Rip Deck))
(62924,(O'Brien Men's Neoprene Life Vest,Perfect Fitness Perfect Rip Deck))
(54399,(Nike Men's Dri-FIT Victory Golf Polo,O'Brien Men's Neoprene Life Vest))
(39656,(Nike Men's Free 5.0+ Running Shoe,Perfect Fitness Perfect Rip Deck))
(35092,(Perfect Fitness Perfect Rip Deck,Under Armour Girls' Toddler Spine Surge Runni))
//然后退出Spark shell
scala> exit
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
Dec 27, 2018 6:45:10 AM INFO: parquet.hadoop.ParquetInputFormat: Total input paths to process : 8
Dec 27, 2018 6:45:10 AM INFO: parquet.hadoop.ParquetInputFormat: Total input paths to process : 8
[root@quickstart ~]#
这里可以
左右滑动哦
数据正在计算中······
最终结果
‘ 所谓成功 ’
坚持把简单的事情做好就是不简单,
坚持把平凡的事情做好就是不平凡。
每个人都有潜在的能量,只是很容易--
被习惯所掩盖,
被时间所迷离,
被惰性所消磨。
那么,成功呢?就是在平凡中做出不平凡的坚持。