1、filter val rdd = sc.parallelize(List(1,2,3,4,5)) val mappedRDD = rdd.map(2*_) mappedRDD.collect val filteredRDD = mappedRdd.filter(_>4) filteredRDD.collect
(上述完整写法) val filteredRDDAgain = sc.parallelize(List(1,2,3,4,5)).map(2 * _).filter(_ > 4).collect
2、wordcount val rdd = sc.textfile("/data/README.md") rdd.count rdd.cache val wordcount = rdd.flatMap(_.split('、')).map(_,1).reduceByKey(_+_) wordcount.collect wordcount.saveAsTextFile("/data/result")
3、sort val== rdd.flatMap(_split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).saveasTextFile("/data/resultsorted")
4、union val rdd1 = sc.parallelize(List(('a',1),('b',1))) val rdd2 = sc.parallelize(List(('c',1),('d',1))) val result = rdd1 union rdd2 result.collect (join 同理)
5、连接mysql 创建DF
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.{SaveMode, DataFrame} import org.apache.spark.sql.hive.HiveContext
val mySQLUrl = "jdbc:mysql://localhost:3306/yangsy?user=root&password=yangsiyi"
val people_DDL = s""" CREATE TEMPORARY TABLE PEOPLE USING org.apache.spark.sql.jdbc OPTIONS ( url '${mySQLUrl}', dbtable 'person' )""".stripMargin
sqlContext.sql(people_DDL) val person = sql("SELECT * FROM PEOPLE").cache()
val name = "name" val targets = person.filter("name ="+name).collect()
for(line <- targets){ val target_name = line(0) println(target_name) val target_age = line(1) println(target_age) }
6、手工设置Spark SQL task个数
SQLContext.setConf("spark.sql.shuffle.partitions","10")