Apache Spark是一个快速的通用集群计算框架 / 殷勤。它提供Java,Scala,Python和R中的高级API,以及支持常规执行图的优化引擎。它还支持一组丰富的更高级别的工具,包括Spark SQL用于SQL和结构化数据的处理,MLlib机器学习,GraphX用于图形处理和Spark Streaming. 。
作为Apache的顶级项目之一, 它的官网为 http://spark.apache.org
The Berkeley Data Analytics Stack
回顾hadoop
要基于Yarn来进行资源调度,必须实现AppalicationMaster接口,Spark实现了这个接口,所以可以基于Yarn。
面试如果问Spark的RDD 我们可以介绍RDD的五大特性以及相关注意实现
可以将下图中的每一个猴看成是一个RDD
RDD Lineage依靠他们之间的依赖关系形成了一个有向无环图DAG
但在复杂的逻辑中, 可能是多条lineage组成一个DAG
注意:
Spark的Driver如果回收多个Worker可能会出现OOM问题
OOM问题 ; out of memery 内存溢出
这些角色都是以JVM进程形式存在
注意:
分布式文件系统(File system)–加载RDD
transformations延迟执行–针对RDD的操作 , 是某一类算子(函数)
Action触发执行 , Action也是一类算子(函数)
Transformations类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。
Transformation类算子:
(K,(Iterable<V>,Iterable<W>))
,子RDD的分区与父RDD多的一致。K,Iterable <V>
)。Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。
Action类算子
控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。
cache() : 默认将数据存在内存中
且 cache() = persist() = persist(StorageLevel.MEMORY_ONLY)
persist(): 可以手动指定持久化的级别, 我们经常使用的persist级别(推荐程度由高到低)
MEMORY\_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
MEMORY_AND_DISK
尽量避免使用“_2” 和 “DISK_ONLY” 级别
cache和persist的注意事项:
checkpoint 的执行原理:
/**
* 验证控制算子checkPoint
* Author TimePause
* Create 2019-12-13 19:55
*/
object CheckPointTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("cache")
val sc = new SparkContext(conf)
sc.setCheckpointDir("./ck/word.txt")
val lines = sc.textFile("./data/word.txt")
lines.checkpoint()
lines.count()
}
}
运行结果
1).Spark官网下载安装包,解压
2).进入安装包的conf目录下,复制slaves.template文件,复制后名称为slaves
添加从节点(worker)名称。保存。
3).复制spark-env.sh.template文件,复制后名称为spark-env.sh
修改spark-env.sh
JAVA_HOME jdk所在目录
SPARK_MASTER_IP:master的ip
SPARK_MASTER_PORT:提交任务的端口,默认是7077
SPARK_WORKER_CORES:每个worker从节点能够支配的core的个数
SPARK_WORKER_MEMORY:每个worker从节点能够支配的内存数
4).同步到其他节点上
5).启动集群
进入sbin目录下,执行当前目录下的./start-all.sh
6)访问集群的图形化界面, 默认为8080
这是使用的jar是Spark自带的一个jar, 用于计算圆周率, 无需自己手动编写, 执行运行即可, 在此用于测试Spark能否正常提交任务undefined
通过bin,目录下的 spark-submit来提交(在那一个节点都可以,命令都如下,不会改变)
# ./spark-submit --master spark提交任务的ip和端口 提交的jar的全限定路径 提交的jar的名称 运行jar/任务的task数(图1)
./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
图1
因此如果我们使用方法二, 会在任务提交时一直占用当前shell以及网卡资源,为了消除这个影响我们选择方法二
将spark安装包原封不动的拷贝到一个新的节点上,然后,在新的节点上提交任务即可。
在bin目录下, 命令依旧如下
# ./spark-submit --master spark提交任务的ip和端口 提交的jar的全限定路径 提交的jar的名称 运行jar/任务的task数(图1)
./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
这样占用的就不是Spark集群节点的网卡和shell, 而是无关的节点的相关资源了
基于Standalone每次提交任务时,都会在Worker节点Spark安装目录的/work目录下生成一个命名为app-xxx-xxx的目录,这个目录下存放程序运行时所需的依赖的jar包。每次提交任务都会在这个work目录下生成一个application目录且不会自动清理。如果时间长了就有可能占用大量的磁盘空间。
清理:可以在worker节点的Spark-env.sh中配置如下参数,定期清理work目录。
export SPARK_WORKER_OPTS="
-Dspark.worker.cleanup.enabled=true #是否开启自动清理
-Dspark.worker.cleanup.interval=1800 #每隔多长时间清理一次,单位s
-Dspark.worker.cleanup.appDataTtl=604800" # 保留最近多长时间的数据,单位s
以上参数中:
spark.worker.cleanupenabled=true 只有运行完成的application才会被清理。
spark.worker.cleanup.interval 清理周期,单位s,默认值为30分钟。
spark.worker.clearnup.appDataTtl 保存多长时间的数据,单位s,默认是一周
spark.yarn.jars hdfs://node1:9000/sparkjars/*
(sparkjars 目录需要在hdfs中创建,要配置访问权限755),将spark_home/jars下的所有jar包都上传到hdfs中sparkjars目录下,这样每次提交任务时,就不会从客户端的spark_home/jars下上传所有jar包,只是从hdfs中sparkjars下读取,速度会很快,省略了上传的过程。提交任务有依赖jar包时,有以下三种方式选择:
提交命令
真实提交时必须将这个命令修改成一行然后运行
## 不指定参数
./spark-submit
--master spark://node1:7077
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar 100
./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
## 添加一行参数--deploy-mode client
./spark-submit
--master spark://node1:7077
--deploy-mode client
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar 100
./spark-submit --master spark://node1:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
执行原理图解
执行流程
在client节点启动Driver进程后,Driver向Master申请资源,向woker发送task,并接受worker的执行结果
总结
client模式适用于测试调试程序 。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。
在Driver端可以看到task执行的情况。生产环境下不能使用client模式,是因为:假设要提交100个application到集群运行,Driver每次都会在client端启动,那么就会导致客户端100次网卡流量暴增的问题。
提交命令
真实提交时必须将这个命令修改成一行然后运行
./spark-submit
--master spark://node1:7077
--deploy-mode cluster
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar 100
./spark-submit --master spark://node1:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
执行原理图解
执行流程
在worker节点启动Driver进程后,Driver向Master申请资源,向woker发送task,并接受worker的执行结果
总结
cluster模式适合在生产模式(项目上线环境)使用, Driver进程是在集群某一台Worker上启动的,在客户端是无法查看task的执行情况(包括执行结果!!!)。假设要提交100个application到集群运行,每次Driver会随机在集群中某一台Worker上启动,那么这100次网卡流量暴增的问题就散布在集群上。
图1
图2
图3
需要有dhfs集群和yarn框架的支持, 但是无需启动 spark Standalone集群
使用前的步骤
zKServer.sh start
start-dfs.sh
start-yarn.sh
yarn-daemon.sh start resourcemanager
注意: 下面三种方式效果相同, 但是在输入要整合成一行命令输入
格式:
./spark-submit --master yarn --class 类所在全限定路径 jar所在位置 task数量
举例:
./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
# --master yarn–client
./spark-submit
--master yarn–client
--class org.apache.spark.examples.SparkPi
../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
# --deploy-mode client
./spark-submit
--master yarn
--deploy-mode client
--class org.apache.spark.examples.SparkPi
../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
结果输出在当前命令行
执行原理图解
执行流程
RS接收Driver发送的资源请求, 在NM上启动AM, 接收AM启动成功后的资源请求, 分配给NM给AM, 启动Executor. Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端。
总结
Yarn-client模式同样是适用于测试 ,因为Driver运行在本地,Driver会与yarn集群中的Executor进行大量的通信,会造成客户机网卡流量的大量增加.
ApplicationMaster的作用:
提交命令
格式
./spark-submit (--master yarn-cluster 或者--deploy-mode cluster ) --class 类的全限定路径 jar所在位置 task数目
举例
方式一
./spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 1000
方式二
./spark-submit --master yarn --deploy-mode cluster ../examples/jars/spark-examples_2.11-2.3.1.jar 1000
停止集群任务命令:
yarn application -kill applicationID
执行原理图解
执行流程
RM接收客户端请求, 在NM上启动AM(相当于Driver), 接收AM请求, 返回AM一批NM节点 AM连接NM发送请求启动Executor, 接收Executor的反向注册, 最后发送任务到Executor
总结
Yarn-Cluster主要用于生产环境中, 因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。
ApplicationMaster的作用:
访问自己 ResourceManager所在节点的 8088端口, eg: http://node3:8088
简单关系图
RDD之间有一系列的依赖关系,依赖关系又分为窄依赖和宽依赖。
宽窄依赖图理解
Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务。然后将这些task以taskSet的形式提交给TaskScheduler运行。 stage是由一组并行的task组成。undefined
切割规则:从后往前,遇到宽依赖就切割stage
pipeline管道计算模式,pipeline只是一种计算思想,模式。
粗粒度资源申请(Spark)
细粒度资源申请(MapReduce)
Spark standalone with cluster deploy mode only:
Spark standalone and Mesos only:
Spark standalone and YARN only:
YARN-only:
资源请求简单图
资源调度Master路径:
路径:spark-1.6.0/core/src/main/scala/org.apache.spark/deploy/Master/Master.scala
提交应用程序,submit的路径:
路径:spark-1.6.0/core/src/main/scala/org.apache.spark/ deploy/SparkSubmit.scala
总结:
总结如下图:
java代码
public class wc3 {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("wc3");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("./data/word.txt");// {hello,world}
JavaRDD<String> stringJavaRDD = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = stringJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);//(hello,1),(world,1)
}
});
JavaPairRDD<String, Integer> stringIntegerJavaPairRDD1 = stringIntegerJavaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;//(hello,27),(world,1)
}
});
JavaPairRDD<Integer, String> integerStringJavaPairRDD = stringIntegerJavaPairRDD1.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.swap();//(27,hello),(1,world)
}
});
JavaPairRDD<Integer, String> integerStringJavaPairRDD1 = integerStringJavaPairRDD.sortByKey();
JavaPairRDD<String, Integer> result = integerStringJavaPairRDD1.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return integerStringTuple2.swap();//正序(world,1),(hello,27)
}
});
result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
System.out.println(stringIntegerTuple2);
}
});
System.out.println("qqq");
sc.stop();
}
}
Scala代码
object Wc {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("wc1")
val sc = new SparkContext(conf)
sc.textFile("./data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).foreach(println)
sc.stop()
}
}
数据格式
运行结果
Scala代码
/**
* 网站访问量统计, 多多练习和理解
*
* Author TimePause
* Create 2019-12-16 21:09
*/
object PvUv {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("pvuv")
val sc = new SparkContext(conf)
val lines = sc.textFile("./data/pvuvdata")
//pv页面访问量: 用户访问某个网站次数,无需去重
//这里没有flatmap, 直接通过map返回了split分割后形成的数组的第五个元素n,组成(n,1)的元组,然后进行wc操作
val result = lines.map(line => {(line.split("\t")(5), 1)}).reduceByKey(_ + _).sortBy(_._2,false)
.foreach(tp=>{println(s"网站 ${tp._1} 的页面访问量为: ${tp._2}")})
//uv 独立访问用户数,一般为1天统计一次, 需要去重
//别老是忘记去去重过后的元素line2.split("_")(1)
lines.map(line=>{(line.split("\t")(0)+"_"+line.split("\t")(5))}).distinct()
.map(line2=>{(line2.split("_")(1),1)}).reduceByKey(_+_).sortBy(_._2,false)
.foreach(tp=>{println(s"网站 ${tp._1} 的独立访问用户数为: ${tp._2}")})
}
}
数据格式
运行结果
java代码
SparkConf sparkConf = new SparkConf()
.setMaster("local")
.setAppName("SecondarySortTest");
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> secondRDD = sc.textFile("secondSort.txt");
JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<SecondSortKey, String> call(String line) throws Exception {
String[] splited = line.split(" ");
int first = Integer.valueOf(splited[0]);
int second = Integer.valueOf(splited[1]);
SecondSortKey secondSortKey = new SecondSortKey(first,second);
return new Tuple2<SecondSortKey, String>(secondSortKey,line);
}
});
pairSecondRDD.sortByKey(false).foreach(new
VoidFunction<Tuple2<SecondSortKey,String>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<SecondSortKey, String> tuple) throws Exception {
System.out.println(tuple._2);
}
});
public class SecondSortKey implements Serializable,Comparable<SecondSortKey>{
/**
*
*/
private static final long serialVersionUID = 1L;
private int first;
private int second;
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
public SecondSortKey(int first, int second) {
super();
this.first = first;
this.second = second;
}
@Override
public int compareTo(SecondSortKey o1) {
if(getFirst() - o1.getFirst() ==0 ){
return getSecond() - o1.getSecond();
}else{
return getFirst() - o1.getFirst();
}
}
}
Scala代码
/**
* 样例类: 实现二次排序的逻辑实现
* @param first
* @param second
*/
case class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] {
def compare(that: SecondSortKey): Int = {
if(this.first-that.first==0)
this.second- that.second
else
this.first-that.first
}
}
/**
* 二次排序问题
*/
object SecondSort {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("secondarySort")
conf.setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("./data/secondSort.txt")
val transRDD: RDD[(SecondSortKey,String)] = lines.map(s=>{(SecondSortKey(s.split(" ")(0).toInt,s.split(" ")(1).toInt),s)})
transRDD.sortByKey(false).map(_._2).foreach(println)
}
}
数据样式
运行结果
java代码
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("TopOps");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> linesRDD = sc.textFile("scores.txt");
JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String str) throws Exception {
String[] splited = str.split("\t");
String clazzName = splited[0];
Integer score = Integer.valueOf(splited[1]);
return new Tuple2<String, Integer> (clazzName,score);
}
});
pairRDD.groupByKey().foreach(new
VoidFunction<Tuple2<String,Iterable<Integer>>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {
String clazzName = tuple._1;
Iterator<Integer> iterator = tuple._2.iterator();
Integer[] top3 = new Integer[3];
while (iterator.hasNext()) {
Integer score = iterator.next();
for (int i = 0; i < top3.length; i++) {
if(top3[i] == null){
top3[i] = score;
break;
}else if(score > top3[i]){
for (int j = 2; j > i; j--) {
top3[j] = top3[j-1];
}
top3[i] = score;
break;
}
}
}
System.out.println("class Name:"+clazzName);
for(Integer sscore : top3){
System.out.println(sscore);
}
}
});
Scala代码
/**
* 分组取topN
*
* Author TimePause
* Create 2019-12-18 15:50
*/
object TopNTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("topn")
val sc = new SparkContext(conf)
val lines = sc.textFile("./data/scores.txt")
//class1 100=>(class1,int(100))
val pairInfo = lines.map(one => {
(one.split(" ")(0), one.split(" ")(1).toInt)
})
val value = pairInfo.groupByKey().map(tp => {
val classname = tp._1
val iter = tp._2.iterator
val top3Score = new Array[Int](3)
val loop = new Breaks
while (iter.hasNext) {
val currScore = iter.next()
loop.breakable {
for (i <- 0.until(top3Score.size)) {
if (top3Score(i) == 0) {
top3Score(i) = currScore
loop.break()
} else if (currScore > top3Score(i)) {
// 2到i, 步长为-1
for (j <- 2.until(i, -1)) {
top3Score(j) = top3Score(j - 1)
}
top3Score(i) = currScore
loop.break()
}
}
}
}
(classname,top3Score.toBuffer)
}).collect()
value.foreach(println)
}
}
原理图
测试代码
object BroadCastTest {
def main(args:Array[String]): Unit ={
val conf = new SparkConf().setMaster("local").setAppName("test")
val sc = new SparkContext(conf)
val list: List[String] = List[String]("hello timepause")
// 将该数组定义成广播变量
val bcValue: Broadcast[List[String]] = sc.broadcast(list)
val lines = sc.textFile("./data/word.txt")
lines.filter(one=>{
val value: List[String] = bcValue.value //获取广播变量数组的值
// list.contains(one)
value.contains(one)
}).foreach(println)
}
}
注意事项
原理图
演示代码
val conf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.accumulator(0)
sc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}}
println(accumulator.value)
sc.stop()
注意事项
累加器在Driver端定义赋初始值,累加器只能在Driver端读取,在Excutor端更新。
概念:
SparkShell是Spark自带的一个快速原型开发工具,也可以说是Spark的scala REPL(Read-Eval-Print-Loop),即交互式shell。支持使用scala语言来进行Spark的交互式编程。
使用:
启动Standalone集群,./start-all.sh ( sbin )
在客户端bin目录下启动 spark-shell:
./spark-shell --master spark://node1:7077
启动hdfs,创建目录spark/test,上传文件wc.txt
启动hdfs集群:
start-all.sh
创建目录:
hdfs dfs -mkdir -p /spark/test
上传wc.txt
hdfs dfs -put /root/test/wc.txt /spark/test/
word,txt部分内容
# 如果直接使用foreach进行输出, 结果会在执行的日志中显示,需要通过图形化界面查看
scala> sc.textFile("hdfs://node2:8020/spark/data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).foreach(println)
# 如果通过其他的 Action算子触发执行将会显示结果, 如下
scala> sc.textFile("hdfs://node2:8020/spark/data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect()
res2: Array[(String, Int)] = Array((hello,110), (myself,36), (world,36), (timepause,14), (ah,12), (sz,8), (worldd,4))
# 将hdfs文件赋给一个rdd变量
scala> var rdd=sc.textFile("hdfs://node2:8020/spark/data/word.txt")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://node2:8020/spark/data/word.txt MapPartitionsRDD[11] at textFile at <console>:24
# 可以这样进行导包
scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel
# 设置为只使用内存并序列化
scala> rdd.persist(StorageLevel.MEMORY_ONLY_SER)
res1: org.apache.spark.rdd.RDD[String] = hdfs://node2:8020/spark/data/word.txt MapPartitionsRDD[11] at textFile at <console>:24
mycluster为我的Hadoop集群名称. 如何查找自己Hadoop集群名称?
位于自己 /hadoop/etc/hadoop/hdfs-site.xml 文件下
原理图
搭建步骤
注意点
./start-master,sh
SparkShuffle概念
1) 普通机制
普通机制示意图
执行流程
总结
存在的问题
产生的磁盘小文件过多,会导致以下问题:
2) 合并机制
合并机制示意图
总结
产生磁盘小文件的个数:E(Executor的个数)*R(reduce的个数)
执行流程
5.01*2-5=5.02M
内存给内存数据结构。总结
产生磁盘小文件的个数: 2*M(map task的个数)
2) bypass机制
bypass机制示意图
总结
shuffle文件寻址图
BlockManager块管理者,是Spark架构中的一个模块,也是一个主从架构。
Spark1.6以上版本默认使用的是统一内存管理,可以通过参数spark.memory.useLegacyMode
设置为true(默认为false)使用静态内存管理。
静态内存管理分布图
SparkShuffle调优配置项如何使用?
附其他的调优参数
spark.shuffle.file.buffer
默认值:32k
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.reducer.maxSizeInFlight
默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.shuffle.io.maxRetries
默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
shuffle file not find taskScheduler不负责重试task,由DAGScheduler负责重试stage
spark.shuffle.io.retryWait
默认值:5s
参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。
spark.shuffle.memoryFraction
默认值:0.2
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
spark.shuffle.manager
默认值:sort|hash
参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。
spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。
spark.shuffle.consolidateFiles
默认值:false
参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制。
Spark on Hive和Hive on Spark
SparkSQL的数据源可以是JSON类型的字符串,JDBC,Parquent,Hive,HDFS等。
首先拿到sql后解析一批未被解决的逻辑计划,再经过分析得到分析后的逻辑计划,
再经过一批优化规则转换成一批最佳优化的逻辑计划,再经过SparkPlanner的策略转化成一批物理计划,
随后经过消费模型转换成一个个的Spark任务执行。
注意:
java
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("jsonfile");
SparkContext sc = new SparkContext(conf);
//创建sqlContext
SQLContext sqlContext = new SQLContext(sc);
/**
* DataFrame的底层是一个一个的RDD RDD的泛型是Row类型。
* 以下两种方式都可以读取json格式的文件
*/
DataFrame df = sqlContext.read().format("json").load("sparksql/json");
// DataFrame df2 = sqlContext.read().json("sparksql/json.txt");
// df2.show();
/**
* DataFrame转换成RDD
*/
RDD<Row> rdd = df.rdd();
/**
* 显示 DataFrame中的内容,默认显示前20行。如果现实多行要指定多少行show(行数)
* 注意:当有多个列时,显示的列先后顺序是按列的ascii码先后显示。
*/
// df.show();
/**
* 树形的形式显示schema信息
*/
df.printSchema();
/**
* dataFram自带的API 操作DataFrame
*/
//select name from table
// df.select("name").show();
//select name age+10 as addage from table
df.select(df.col("name"),df.col("age").plus(10).alias("addage")).show();
//select name ,age from table where age>19
df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show();
//select count(*) from table group by age
df.groupBy(df.col("age")).count().show();
/**
* 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘
*/
df.registerTempTable("jtable");
DataFrame sql = sqlContext.sql("select age,count(1) from jtable group by age");
DataFrame sql2 = sqlContext.sql("select * from jtable");
sc.stop();
Scala
val conf = new SparkConf()
conf.setMaster("local").setAppName("jsonfile")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("sparksql/json")
//val df1 = sqlContext.read.format("json").load("sparksql/json")
df.show()
df.printSchema()
//select * from table
df.select(df.col("name")).show()
//select name from table where age>19
df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show()
//select count(*) from table group by age
df.groupBy(df.col("age")).count().show();
/**
* 注册临时表
*/
df.registerTempTable("jtable")
val result = sqlContext.sql("select * from jtable")
result.show()
sc.stop()
java
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("jsonRDD");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> nameRDD = sc.parallelize(Arrays.asList(
"{\"name\":\"zhangsan\",\"age\":\"18\"}",
"{\"name\":\"lisi\",\"age\":\"19\"}",
"{\"name\":\"wangwu\",\"age\":\"20\"}"
));
JavaRDD<String> scoreRDD = sc.parallelize(Arrays.asList(
"{\"name\":\"zhangsan\",\"score\":\"100\"}",
"{\"name\":\"lisi\",\"score\":\"200\"}",
"{\"name\":\"wangwu\",\"score\":\"300\"}"
));
DataFrame namedf = sqlContext.read().json(nameRDD);
DataFrame scoredf = sqlContext.read().json(scoreRDD);
namedf.registerTempTable("name");
scoredf.registerTempTable("score");
DataFrame result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name");
result.show();
sc.stop();
Scala
val conf = new SparkConf()
conf.setMaster("local").setAppName("jsonrdd")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val nameRDD = sc.makeRDD(Array(
"{\"name\":\"zhangsan\",\"age\":18}",
"{\"name\":\"lisi\",\"age\":19}",
"{\"name\":\"wangwu\",\"age\":20}"
))
val scoreRDD = sc.makeRDD(Array(
"{\"name\":\"zhangsan\",\"score\":100}",
"{\"name\":\"lisi\",\"score\":200}",
"{\"name\":\"wangwu\",\"score\":300}"
))
val nameDF = sqlContext.read.json(nameRDD)
val scoreDF = sqlContext.read.json(scoreRDD)
nameDF.registerTempTable("name")
scoreDF.registerTempTable("score")
val result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name")
result.show()
sc.stop()
java
/**
* 注意:
* 1.自定义类必须是可序列化的
* 2.自定义类访问级别必须是Public
* 3.RDD转成DataFrame会把自定义类中字段的名称按assci码排序
*/
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("RDD");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("sparksql/person.txt");
JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Person call(String s) throws Exception {
Person p = new Person();
p.setId(s.split(",")[0]);
p.setName(s.split(",")[1]);
p.setAge(Integer.valueOf(s.split(",")[2]));
return p;
}
});
/**
* 传入进去Person.class的时候,sqlContext是通过反射的方式创建DataFrame
* 在底层通过反射的方式获得Person的所有field,结合RDD本身,就生成了DataFrame
*/
DataFrame df = sqlContext.createDataFrame(personRDD, Person.class);
df.show();
df.registerTempTable("person");
sqlContext.sql("select name from person where id = 2").show();
/**
* 将DataFrame转成JavaRDD
* 注意:
* 1.可以使用row.getInt(0),row.getString(1)...通过下标获取返回Row类型的数据,但是要注意列顺序问题---不常用
* 2.可以使用row.getAs("列名")来获取对应的列值。
*
*/
JavaRDD<Row> javaRDD = df.javaRDD();
JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Person call(Row row) throws Exception {
Person p = new Person();
//p.setId(row.getString(1));
//p.setName(row.getString(2));
//p.setAge(row.getInt(0));
p.setId((String)row.getAs("id"));
p.setName((String)row.getAs("name"));
p.setAge((Integer)row.getAs("age"));
return p;
}
});
map.foreach(new VoidFunction<Person>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Person t) throws Exception {
System.out.println(t);
}
});
sc.stop();
Scala
val conf = new SparkConf()
conf.setMaster("local").setAppName("rddreflect")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lineRDD = sc.textFile("./sparksql/person.txt")
/**
* 将RDD隐式转换成DataFrame
*/
import sqlContext.implicits._
val personRDD = lineRDD.map { x => {
val person = Person(x.split(",")(0),x.split(",")(1),Integer.valueOf(x.split(",")(2)))
person
} }
val df = personRDD.toDF();
df.show()
/**
* 将DataFrame转换成PersonRDD
*/
val rdd = df.rdd
val result = rdd.map { x => {
Person(x.getAs("id"),x.getAs("name"),x.getAs("age"))
} }
result.foreach { println}
sc.stop()
java:
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("rddStruct");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("./sparksql/person.txt");
/**
* 转换成Row类型的RDD
*/
JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {
return RowFactory.create(
String.valueOf(s.split(",")[0]),
String.valueOf(s.split(",")[1]),
Integer.valueOf(s.split(",")[2])
);
}
});
/**
* 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源于外部数据库
*/
List<StructField> asList =Arrays.asList(
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true)
);
StructType schema = DataTypes.createStructType(asList);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.show();
sc.stop();
Scala
val conf = new SparkConf()
conf.setMaster("local").setAppName("rddStruct")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lineRDD = sc.textFile("./sparksql/person.txt")
val rowRDD = lineRDD.map { x => {
val split = x.split(",")
RowFactory.create(split(0),split(1),Integer.valueOf(split(2)))
} }
val schema = StructType(List(
StructField("id",StringType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)
))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.show()
df.printSchema()
sc.stop()
注意:
可以将DataFrame存储成parquet文件。保存成parquet文件的方式有两种
df.write().mode(SaveMode.Overwrite)format("parquet")
.save("./sparksql/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
SaveMode指定文件保存时的模式。
java
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("parquet");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> jsonRDD = sc.textFile("sparksql/json");
DataFrame df = sqlContext.read().json(jsonRDD);
/**
* 将DataFrame保存成parquet文件,SaveMode指定存储文件时的保存模式
* 保存成parquet文件有以下两种方式:
*/
df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
df.show();
/**
* 加载parquet文件成DataFrame
* 加载parquet文件有以下两种方式:
*/
DataFrame load = sqlContext.read().format("parquet").load("./sparksql/parquet");
load = sqlContext.read().parquet("./sparksql/parquet");
load.show();
sc.stop();
Scala
val conf = new SparkConf()
conf.setMaster("local").setAppName("parquet")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val jsonRDD = sc.textFile("sparksql/json")
val df = sqlContext.read.json(jsonRDD)
df.show()
/**
* 将DF保存为parquet文件
*/
df.write.mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet")
df.write.mode(SaveMode.Overwrite).parquet("./sparksql/parquet")
/**
* 读取parquet文件
*/
var result = sqlContext.read.parquet("./sparksql/parquet")
result = sqlContext.read.format("parquet").load("./sparksql/parquet")
result.show()
sc.stop()
两种方式创建DataFrame
java:
parkConf conf = new SparkConf();
conf.setMaster("local").setAppName("mysql");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
/**
* 第一种方式读取MySql数据库表,加载为DataFrame
*/
Map<String, String> options = new HashMap<String,String>();
options.put("url", "jdbc:mysql://192.168.179.4:3306/spark");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user", "root");
options.put("password", "123456");
options.put("dbtable", "person");
DataFrame person = sqlContext.read().format("jdbc").options(options).load();
person.show();
person.registerTempTable("person");
/**
* 第二种方式读取MySql数据表加载为DataFrame
*/
DataFrameReader reader = sqlContext.read().format("jdbc");
reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "123456");
reader.option("dbtable", "score");
DataFrame score = reader.load();
score.show();
score.registerTempTable("score");
DataFrame result =
sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name");
result.show();
/**
* 将DataFrame结果保存到Mysql中
*/
Properties properties = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "123456");
result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.4:3306/spark", "result", properties);
sc.stop();
Scala
val conf = new SparkConf()
conf.setMaster("local").setAppName("mysql")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
/**
* 第一种方式读取Mysql数据库表创建DF
*/
val options = new HashMap[String,String]();
options.put("url", "jdbc:mysql://192.168.179.4:3306/spark")
options.put("driver","com.mysql.jdbc.Driver")
options.put("user","root")
options.put("password", "123456")
options.put("dbtable","person")
val person = sqlContext.read.format("jdbc").options(options).load()
person.show()
person.registerTempTable("person")
/**
* 第二种方式读取Mysql数据库表创建DF
*/
val reader = sqlContext.read.format("jdbc")
reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark")
reader.option("driver","com.mysql.jdbc.Driver")
reader.option("user","root")
reader.option("password","123456")
reader.option("dbtable", "score")
val score = reader.load()
score.show()
score.registerTempTable("score")
val result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name")
result.show()
/**
* 将数据写入到Mysql表中
*/
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "123456")
result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.179.4:3306/spark", "result", properties)
sc.stop()
./spark-submit
--master spark://node1:7077,node2:7077
--executor-cores 1
--executor-memory 2G
--total-executor-cores 1
--class com.bjsxt.sparksql.dataframe.CreateDFFromHive
/root/test/HiveTest.jar
java
SparkConf conf = new SparkConf();
conf.setAppName("hive");
JavaSparkContext sc = new JavaSparkContext(conf);
//HiveContext是SQLContext的子类。
HiveContext hiveContext = new HiveContext(sc);
hiveContext.sql("USE spark");
hiveContext.sql("DROP TABLE IF EXISTS student_infos");
//在hive中创建student_infos表
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' ");
hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos");
hiveContext.sql("DROP TABLE IF EXISTS student_scores");
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'");
hiveContext.sql("LOAD DATA "
+ "LOCAL INPATH '/root/test/student_scores'"
+ "INTO TABLE student_scores");
/**
* 查询表生成DataFrame
*/
DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score "
+ "FROM student_infos si "
+ "JOIN student_scores ss "
+ "ON si.name=ss.name "
+ "WHERE ss.score>=80");
hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
goodStudentsDF.registerTempTable("goodstudent");
DataFrame result = hiveContext.sql("select * from goodstudent");
result.show();
/**
* 将结果保存到hive表 good_student_infos
*/
goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");
Row[] goodStudentRows = hiveContext.table("good_student_infos").collect();
for(Row goodStudentRow : goodStudentRows) {
System.out.println(goodStudentRow);
}
sc.stop();
Scala
val conf = new SparkConf()
conf.setAppName("HiveSource")
val sc = new SparkContext(conf)
/**
* HiveContext是SQLContext的子类。
*/
val hiveContext = new HiveContext(sc)
hiveContext.sql("use spark")
hiveContext.sql("drop table if exists student_infos")
hiveContext.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'")
hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos")
hiveContext.sql("drop table if exists student_scores")
hiveContext.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'")
hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores")
val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
hiveContext.sql("drop table if exists good_student_infos")
/**
* 将结果写入到hive表中
*/
df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")
sc.stop()
hive --service metastore &
所在的节点)<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node3:9083</value>
</property>
</configuration>
# 阻塞式启动
hive --service metastore
# 后台启动
hive --service metastore &
./spark-shell --master spark://node1:7077,node2:7077
--executor-cores 1
--executor-memory 1g
--total-executor-cores 1
## 方式一
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
hc.sql("show databases").show
hc.sql("user default").show
hc.sql("select count(*) from jizhan").show
## 方式二
spark.sql("show tables").show
找不到HDFS集群路径,要在客户端机器conf/spark-env.sh中设置HDFS的路径
/**
* 读取Hive中的数据
* 要开启 :enableHiveSupport
*/
object CreateDataFrameFromHive {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("CreateDataFrameFromHive").enableHiveSupport().getOrCreate()
spark.sql("use spark")
spark.sql("drop table if exists student_infos")
spark.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'")
spark.sql("load data local inpath '/root/test/student_infos' into table student_infos")
spark.sql("drop table if exists student_scores")
spark.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'")
spark.sql("load data local inpath '/root/test/student_scores' into table student_scores")
// val frame: DataFrame = spark.table("student_infos") 可以将表转换成DataFrame
// frame.show(100)
val df = spark.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
df.show(100)
spark.sql("drop table if exists good_student_infos")
/**
* 将结果写入到hive表中
*/
df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")
}
}
student_infos
-------------------
zhangsan 18
lisi 19
wangwu 20
student_scores
---------------
zhangsan 100
lisi 200
wangwu 300
2.通过相关命令上传并执行该jar任务
./spark-submit
--master spark://node1:7077
--class com.bjsxt.scalaspark.sql.DataSetAndDataFrame.CreateDataFrameFromHive /root/test/MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar
3.进入hive中, 进入相应的DB查看
注意: 如果没有该DB, 可以手动创建
可以自定义类实现UDFX接口。
java
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("udf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD,schema);
df.registerTempTable("user");
/**
* 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1,UDF2。。。。UDF1xxx
*/
sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Integer call(String t1) throws Exception {
return t1.length();
}
}, DataTypes.IntegerType);
sqlContext.sql("select name ,StrLen(name) as length from user").show();
//sqlContext.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
//
// /**
// *
// */
// private static final long serialVersionUID = 1L;
//
// @Override
// public Integer call(String t1, Integer t2) throws Exception {
//return t1.length()+t2;
// }
//} ,DataTypes.IntegerType );
//sqlContext.sql("select name ,StrLen(name,10) as length from user").show();
sc.stop();
scala:
val conf = new SparkConf()
conf.setMaster("local").setAppName("udf")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc);
val rdd = sc.makeRDD(Array("zhansan","lisi","wangwu"))
val rowRDD = rdd.map { x => {
RowFactory.create(x)
} }
val schema = DataTypes.createStructType(Array(StructField("name",StringType,true)))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.registerTempTable("user")
//sqlContext.udf.register("StrLen",(s : String)=>{s.length()})
//sqlContext.sql("select name ,StrLen(name) as length from user").show
sqlContext.udf.register("StrLen",(s : String,i:Int)=>{s.length()+i})
sqlContext.sql("select name ,StrLen(name,10) as length from user").show
sc.stop()
实现UDAF函数如果要自定义类要继承UserDefinedAggregateFunction类
java
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("udaf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu","zhangsan","zhangsan","lisi"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/**
* 注册一个UDAF函数,实现统计相同值得个数
* 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的
*/
sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {
/**
*
*/
private static final long serialVersionUID = 1L;
/**
* 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑
* buffer.getInt(0)获取的是上一次聚合后的值
* 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合
* 大聚和发生在reduce端.
* 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算
*/
@Override
public void update(MutableAggregationBuffer buffer, Row arg1) {
buffer.update(0, buffer.getInt(0)+1);
}
/**
* 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理
* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来
* buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值
* buffer2.getInt(0) : 这次计算传入进来的update的结果
* 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作
*/
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));
}
/**
* 指定输入字段的字段及类型
*/
@Override
public StructType inputSchema() {
return DataTypes.createStructType(
Arrays.asList(DataTypes.createStructField("name",
DataTypes.StringType, true)));
}
/**
* 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, 0);
}
/**
* 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果
*/
@Override
public Object evaluate(Row row) {
return row.getInt(0);
}
@Override
public boolean deterministic() {
//设置为true
return true;
}
/**
* 指定UDAF函数计算后返回的结果类型
*/
@Override
public DataType dataType() {
return DataTypes.IntegerType;
}
/**
* 在进行聚合操作的时候所要处理的数据的结果的类型
*/
@Override
public StructType bufferSchema() {
return
DataTypes.createStructType(
Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType,
true)));
}
});
sqlContext.sql("select name ,StringCount(name) from user group by name").show();
sc.stop();
Scala
class MyUDAF extends UserDefinedAggregateFunction {
// 聚合操作时,所处理的数据的类型
def bufferSchema: StructType = {
DataTypes.createStructType(Array(DataTypes.createStructField("aaa", IntegerType, true)))
}
// 最终函数返回值的类型
def dataType: DataType = {
DataTypes.IntegerType
}
def deterministic: Boolean = {
true
}
// 最后返回一个最终的聚合值 要和dataType的类型一一对应
def evaluate(buffer: Row): Any = {
buffer.getAs[Int](0)
}
// 为每个分组的数据执行初始化值
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0
}
//输入数据的类型
def inputSchema: StructType = {
DataTypes.createStructType(Array(DataTypes.createStructField("input", StringType, true)))
}
// 最后merger的时候,在各个节点上的聚合值,要进行merge,也就是合并
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Int](0)+buffer2.getAs[Int](0)
}
// 每个组,有新的值进来的时候,进行分组对应的聚合值的计算
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Int](0)+1
}
}
object UDAF {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("udaf")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val rdd = sc.makeRDD(Array("zhangsan","lisi","wangwu","zhangsan","lisi"))
val rowRDD = rdd.map { x => {RowFactory.create(x)} }
val schema = DataTypes.createStructType(Array(DataTypes.createStructField("name", StringType, true)))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.show()
df.registerTempTable("user")
/**
* 注册一个udaf函数
*/
sqlContext.udf.register("StringCount", new MyUDAF())
sqlContext.sql("select name ,StringCount(name) from user group by name").show()
sc.stop()
}
}
注意:
row_number() 开窗函数是按照某个字段分组,然后取另一字段的前几个值,相当于分组取topN
如果SQL语句里面使用到了开窗函数,那么这个SQL语句必须使用HiveContext来执行,HiveContext默认情况下在本地无法创建。在MySql8之后也增加了开窗函数。(一般在Spark集群中运行,将任务提交至集群中运行)
开窗函数格式:
row_number() over (partitin by XXX order by XXX)
java代码
SparkConf conf = new SparkConf();
conf.setAppName("windowfun");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc);
hiveContext.sql("use spark");
hiveContext.sql("drop table if exists sales");
hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) "
+ "row format delimited fields terminated by '\t'");
hiveContext.sql("load data local inpath '/root/test/sales' into table sales");
/**
* 开窗函数格式:
* 【 rou_number() over (partitin by XXX order by XXX) 】
*/
DataFrame result = hiveContext.sql("select riqi,leibie,jine "
+ "from ("
+ "select riqi,leibie,jine,"
+ "row_number() over (partition by leibie order by jine desc) rank "
+ "from sales) t "
+ "where t.rank<=3");
result.show();
sc.stop();
Scala代码
val conf = new SparkConf()
conf.setAppName("windowfun")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
hiveContext.sql("use spark");
hiveContext.sql("drop table if exists sales");
hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) "
+ "row format delimited fields terminated by '\t'");
hiveContext.sql("load data local inpath '/root/test/sales' into table sales");
/**
* 开窗函数格式:
* 【 rou_number() over (partitin by XXX order by XXX) 】
*/
val result = hiveContext.sql("select riqi,leibie,jine "
+ "from ("
+ "select riqi,leibie,jine,"
+ "row_number() over (partition by leibie order by jine desc) rank "
+ "from sales) t "
+ "where t.rank<=3");
result.show();
sc.stop()
SparkStreaming是流式处理框架,是Spark API的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来源可以是 :Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,并且可以使用高级功能的复杂算子来处理流数据。例如:map,reduce,join,window 。最终,处理后的数据可以存放在文件系统,数据库等,方便实时展现。
storm 和 spark streaming 在实时性,吞吐量等方面的对比 1、实时性:一般Storm的时延性比spark streaming要低,原因是Spark Streaming是小的批处理,通过间隔时长生成批次,一个批次触发一次计算,比如我在程序里面设置间隔时长为5秒,那就是五秒接收到的数据触发一次计算,Storm是实时处理,来一条数据,触发一次计算,所以可以称spark streaming为流式计算,Storm 为实时计算,阿里的JStorm通过实现Trident,也支持小的批处理计算 2、吞吐量 :Storm的吞吐量要略差于Spark Streaming,原因一是Storm从spout组件 接收源数据,通过发射器发送到bolt,bolt对接收到的数据进行处理,处理完以后,写入到外部存储系统中或者发送到下个bolt进行再处理,所以storm是移动数据,不是移动计算;Spark Streaming获取Task要计算的数据在哪个节点上,然后TaskScheduler把task发送到对应节点上进行数据处理,所以Spark Streaming是移动计算不是移动数据,移动计算也是当前计算引擎的主流设计思想;原因二大家很容易看出来,一个是批处理,一个是实时计算,批处理的吞吐量一般要高于实时触发的计算 3、容错机制:storm是acker(ack/fail消息确认机制)确认机制确保一个tuple被完全处理,Spark Streaming是通过存储RDD转化逻辑进行容错,也就是如果数据从A数据集到B数据集计算错误了,由于存储的有A到B的计算逻辑,所以可以从A重新计算生成B,容错机制不一样,暂时无所谓好坏
注意:
netcat yum install -y nc
)1. 为SparkStreaming中每一个Key维护一份state状态,state类型可以是任意类型的,可以是一个自定义的对象,更新函数也可以是自定义的。
2. 通过更新函数对该key的状态不断更新,对于每个新的batch而言,SparkStreaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新。
使用到updateStateByKey要开启checkpoint机制和功能。
多久会将内存中的数据写入到磁盘一份?
如果batchInterval设置的时间小于10秒,那么10秒写入磁盘一份。如果batchInterval设置的时间大于10秒,那么就会batchInterval时间间隔写入磁盘一份。
对一定间隔时间内的Wc,而不是全局的Wc
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用SparkStreaming进行WordCount: 注意这种wc只是对一定间隔时间内的Wc,而不是全局的Wc
* 注意
* 1. batchDuration: 代表我们能够处理数据接收的延迟度, 批次数据处理的间隔时间, 可以集合WebUI调节
* 2. 创建StreamingContext的两种方式
* val ssc=new StreamingContext(SparkContext, batchDuration)
* val ssc=new StreamingContext(SparkConf,batchDuration)
* 3. SparkStreaming操作的是Dstream, 可以使用的DStream的TransFormation算子, 要使用outputOperation类算子触发执行
* 4. StreamingContext.start()后, 不能添加新的业务逻辑
* 5. StreamingContext.stop()后, 不能调用StreamingContext,start()重新启动, 因为对象已经被回收
* 6. StreaningContext.stop(stopSparkContext=true), 默认关闭关闭StreamingContext关闭时会将SparkContext
*
* Author TimePause
* Create 2019-12-21 10:10
*/
object SparkStreamingForWc {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("sswc") //创建2个线程,一个用于接收,一个用于处理
val sc = new SparkContext(conf)
// 创建StreamingContext对象,Durations指定批处理间隔时间. 通过socketTextStream设置Socket通信ip和端口
val ssc = new StreamingContext(sc, Durations.seconds(5))
val socket: ReceiverInputDStream[String] = ssc.socketTextStream("node4", 9999)
val words = socket.flatMap(one => {
(one.split(" "))
})
val pairWords = words.map(word => {
(word, 1)
})
val reduceResult = pairWords.reduceByKey((v1, v2) => {
v1 + v2
})
// print可以指定输出的行数
//reduceResult.print(10)
/**
* foreachRDD
* 1. 拿到DataStream中的RDD.对RDD进行Transformation或者action操作
* 2. 只有foreachRDD这个operation算子,不会触发执行,必须还要有action算子的支持
* 3. foreachRDD算子内map算子外的地方的代码是在Driver执行的, 我们可以通过这里动态的改变广播变量, 实现对配置的热部署
*/
val resultRDD = reduceResult.foreachRDD(rdd => {
println("******************") //外部代码在Driver端执行
val resultRdd = rdd.map(one => {
println(s"实时接收的数据为-------------${one}") //内部代码在Executor端执行
})
resultRdd.foreach(println)
})
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
全局的WordCount, 会将实时的结果持久化到磁盘中
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
/**
* UpdateStateByKey 根据key更新状态(全局)
* 1、为Spark Streaming中每一个Key维护一份state状态,state类型可以是任意类型的, 可以是一个自定义的对象,那么更新函数也可以是自定义的。
* 2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新
*/
object UpdateStateByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("UpdateStateByKey")
val ssc = new StreamingContext(conf,Durations.seconds(5))
//设置日志级别
ssc.sparkContext.setLogLevel("ERROR")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node4",9999)
val words: DStream[String] = lines.flatMap(line=>{line.split(" ")})
val pairWords: DStream[(String, Int)] = words.map(word => {(word, 1)})
/**
* 根据key更新状态,需要设置 checkpoint来保存状态
* 默认key的状态在内存中 有一份,在checkpoint目录中有一份。
*
* 多久会将内存中的数据(每一个key所对应的状态)写入到磁盘上一份呢?
* 如果你的batchInterval小于10s 那么10s会将内存中的数据写入到磁盘一份
* 如果bacthInterval 大于10s,那么就以bacthInterval为准
*
* 这样做是为了防止频繁的写HDFS
*/
ssc.checkpoint("./data/streamingCheckpoint")
// ssc.sparkContext.setCheckpointDir("./data/streamingCheckpoint")
/**
* currentValues :当前批次某个 key 对应所有的value 组成的一个集合
* preValue : 以往批次当前key 对应的总状态值
*/
val result: DStream[(String, Int)] = pairWords.updateStateByKey((currentValues: Seq[Int], preValue: Option[Int]) => {
var totalValues = 0
if (!preValue.isEmpty) {
totalValues += preValue.get
}
for(value <- currentValues){
totalValues += value
}
Option(totalValues)
})
result.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
/**
* 使用transform算子实现指定信息的过滤
*
* Author TimePause
* Create 2019-12-21 16:20
*/
object TransFormTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("windowfun")
// 规定处理的时间为5秒
val ssc = new StreamingContext(conf, Durations.seconds(5))
// 通过获取sparkContext来设置Spark输出的日志级别
ssc.sparkContext.setLogLevel("error")
val scoket: ReceiverInputDStream[String] = ssc.socketTextStream("node4", 9999)
// transform可以将一种格式的DStream转换成另一种格式的DStream
val transRDD: DStream[(String, String)] = scoket.transform(rdd => {
val filterRDD: RDD[String] = rdd.filter(line => {
println(s"====需要被过滤的字符====$line")
//不加!是只显示所过滤的字符串,反之是显示除了过滤的字符串
!"world".equals(line.split(" ")(1))
})
val mapRDD: RDD[(String, String)] = filterRDD.map(one => {
(one.split(" ")(0), one.split(" ")(1))
})
mapRDD
})
transRDD.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
窗口操作
实现每隔n秒, 打印nm秒的数据
/**
* 窗口函数
*
* Author TimePause
* Create 2019-12-21 15:28
*/
object reduceByKeyAndWindowTest {
def main(args:Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("windowfun")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Durations.seconds(5))
ssc.sparkContext.setLogLevel("error")
// 配置窗口函数优化后需要checkpoint保存数据
ssc.checkpoint("./data/window")
val socket = ssc.socketTextStream("node4", 9999)
val words: DStream[String] = socket.flatMap(line => {
line.split(" ")
})
// map 处理后成为一个元组(k,v)形式pairwords
val pairwords = words.map(word => {
(word, 1)
})
// pairwords调用窗口函数, 结束后紧跟窗口函数参数, 第一个代表窗口时间长度, 第二个代表窗口的滑动间隔
/* val result = pairwords.reduceByKeyAndWindow((v1: Int, v2: Int) => {
v1 + v2
}, Durations.seconds(15), Durations.seconds(5))*/
// 窗口函数优化, 在表窗口长度结束后, 仍保存其k, v设置为0
val result = pairwords.reduceByKeyAndWindow((v1: Int, v2: Int) => {v1 + v2},
(v1:Int,v2:Int)=>{v1-v2}, Durations.seconds(15), Durations.seconds(5))
// val value = pairwords.window(Durations.seconds(15), Durations.seconds(5))
//打印结果
result.print()
// 启动和关闭StreamingContext对象
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
因为SparkStreaming是7*24小时运行,Driver只是一个简单的进程,有可能挂掉,所以实现Driver的HA就有必要(如果使用的Client模式就无法实现Driver HA ,这里针对的是cluster模式)。Yarn平台cluster模式提交任务,AM(AplicationMaster)相当于Driver,如果挂掉会自动启动AM。这里所说的DriverHA针对的是Spark standalone和Mesos资源调度的情况下。
实现Driver的高可用有两个步骤:
第一:提交任务层面,在提交任务的时候加上选项 --supervise,当Driver挂掉的时候会自动重启Driver。
第二:代码层面,使用JavaStreamingContext.getOrCreate(checkpoint路径,JavaStreamingContextFactory)
Driver中元数据包括:
示例代码
/**
* Driver HA :
* 1.在提交application的时候 添加 --supervise 选项 如果Driver挂掉 会自动启动一个Driver
* 2.代码层面恢复Driver(StreamingContext)
*
*/
object SparkStreamingDriverHA {
//设置checkpoint目录
val ckDir = "./data/streamingCheckpoint"
def main(args: Array[String]): Unit = {
/**
* StreamingContext.getOrCreate(ckDir,CreateStreamingContext)
* 这个方法首先会从ckDir目录中获取StreamingContext【 因为StreamingContext是序列化存储在Checkpoint目录中,恢复时会尝试反序列化这些objects。
* 如果用修改过的class可能会导致错误,此时需要更换checkpoint目录或者删除checkpoint目录中的数据,程序才能起来。】
*
* 若能获取回来StreamingContext,就不会执行CreateStreamingContext这个方法创建,否则就会创建
*/
val ssc: StreamingContext = StreamingContext.getOrCreate(ckDir,CreateStreamingContext)
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
def CreateStreamingContext() = {
println("=======Create new StreamingContext =======")
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("DriverHA")
val ssc: StreamingContext = new StreamingContext(conf,Durations.seconds(5))
ssc.sparkContext.setLogLevel("Error")
/**
* 默认checkpoint 存储:
* 1.配置信息
* 2.DStream操作逻辑
* 3.batch执行的进度 或者【offset】
*/
ssc.checkpoint(ckDir)
val lines: DStream[String] = ssc.textFileStream("./data/streamingCopyFile")
val words: DStream[String] = lines.flatMap(line=>{line.trim.split(" ")})
val pairWords: DStream[(String, Int)] = words.map(word=>{(word,1)})
val result: DStream[(String, Int)] = pairWords.reduceByKey((v1:Int, v2:Int)=>{v1+v2})
// result.print()
/**
* 更改逻辑
*/
result.foreachRDD(pairRDD=>{
pairRDD.filter(one=>{
println("*********** filter *********")
true
}).foreach(println)
})
ssc
}
}
kafka是什么?使用场景?
kafka是一个高吞吐的分布式消息队列系统。特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。消息列队常见场景:系统之间解耦合、峰值压力缓冲、异步通信
链接介绍的是Kafka 0.1.0版本, 这里介绍 kafka 0.8.2环境搭建
真实数据存储位置:
zookeeper的节点:
kafka_2.11-0.11.0.3 安装同kafka 0.8.2 不过更换版本时需要删除zk中存放 kafka信息删除方式如下
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import scala.util.Random
/**
* 向 kafka 中生产数据
*/
object ProduceDataToKafka {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "node2:9092,node3:9092,node4:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//批次大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384")
//等待时间
props.put(ProducerConfig.LINGER_MS_CONFIG,"1")
props.put("enable.auto.commit","true")
val producer = new KafkaProducer[String,String](props)
var counter = 0
var keyFlag = 0
while(true){
counter +=1
keyFlag +=1
val content: String = userlogs()
print(content)
producer.send(new ProducerRecord[String, String]("testKafka,mytopic1222", s"key-$keyFlag", content))
if(0 == counter%100){
counter = 0
Thread.sleep(5000)
}
}
producer.close()
}
def userlogs()={
val userLogBuffer = new StringBuffer("")
val timestamp = new Date().getTime();
var userID = 0L
var pageID = 0L
//随机生成的用户ID
userID = Random.nextInt(2000)
//随机生成的页面ID
pageID = Random.nextInt(2000);
//随机生成Channel
val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML")
val channel = channelNames(Random.nextInt(10))
val actionNames = Array[String]("View", "Register")
//随机生成action行为
val action = actionNames(Random.nextInt(2))
val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
userLogBuffer.append(dateToday)
.append("\t")
.append(timestamp)
.append("\t")
.append(userID)
.append("\t")
.append(pageID)
.append("\t")
.append(channel)
.append("\t")
.append(action)
System.out.println(userLogBuffer.toString())
userLogBuffer.toString()
}
}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Durations, StreamingContext}
/**
* SparkStreaming2.3版本 读取kafka 中数据 :
* 1.采用了新的消费者api实现,类似于1.6中SparkStreaming 读取 kafka Direct模式。并行度 一样。
* 2.因为采用了新的消费者api实现,所有相对于1.6的Direct模式【simple api实现】 ,api使用上有很大差别。未来这种api有可能继续变化
* 3.kafka中有两个参数:
* heartbeat.interval.ms:这个值代表 kafka集群与消费者之间的心跳间隔时间,kafka 集群确保消费者保持连接的心跳通信时间间隔。这个时间默认是3s.
* 这个值必须设置的比session.timeout.ms 小,一般设置不大于 session.timeout.ms 的1/3
* session.timeout.ms :
* 这个值代表消费者与kafka之间的session 会话超时时间,如果在这个时间内,kafka 没有接收到消费者的心跳【heartbeat.interval.ms 控制】,
* 那么kafka将移除当前的消费者。这个时间默认是30s。
* 这个时间位于配置 group.min.session.timeout.ms【6s】 和 group.max.session.timeout.ms【300s】之间的一个参数,
* 如果SparkSteaming 批次间隔时间大于5分钟,也就是大于300s,那么就要相应的调大group.max.session.timeout.ms 这个值。
* 4.大多数情况下,SparkStreaming读取数据使用 LocationStrategies.PreferConsistent 这种策略,这种策略会将分区均匀的分布在集群的Executor之间。
* 如果Executor在kafka 集群中的某些节点上,可以使用 LocationStrategies.PreferBrokers 这种策略,那么当前这个Executor 中的数据会来自当前broker节点。
* 如果节点之间的分区有明显的分布不均,可以使用 LocationStrategies.PreferFixed 这种策略,可以通过一个map 指定将topic分区分布在哪些节点中。
*
* 5.新的消费者api 可以将kafka 中的消息预读取到缓存区中,默认大小为64k。默认缓存区在 Executor 中,加快处理数据速度。
* 可以通过参数 spark.streaming.kafka.consumer.cache.maxCapacity 来增大,也可以通过spark.streaming.kafka.consumer.cache.enabled 设置成false 关闭缓存机制。
* "注意:官网中描述这里建议关闭,在读取kafka时如果开启会有重复读取同一个topic partition 消息的问题,报错:KafkaConsumer is not safe for multi-threaded access"
*
* 6.关于消费者offset
* 1).如果设置了checkpoint ,那么offset 将会存储在checkpoint中。
* 这种有缺点: 第一,当从checkpoint中恢复数据时,有可能造成重复的消费。
* 第二,当代码逻辑改变时,无法从checkpoint中来恢复offset.
* 2).依靠kafka 来存储消费者offset,kafka 中有一个特殊的topic 来存储消费者offset。新的消费者api中,会定期自动提交offset。这种情况有可能也不是我们想要的,
* 因为有可能消费者自动提交了offset,但是后期SparkStreaming 没有将接收来的数据及时处理保存。这里也就是为什么会在配置中将enable.auto.commit 设置成false的原因。
* 这种消费模式也称最多消费一次,默认sparkStreaming 拉取到数据之后就可以更新offset,无论是否消费成功。自动提交offset的频率由参数auto.commit.interval.ms 决定,默认5s。
* *如果我们能保证完全处理完业务之后,可以后期异步的手动提交消费者offset.
* 注意:这种模式也有弊端,这种将offset存储在kafka中方式,参数offsets.retention.minutes=1440控制offset是否过期删除,默认将offset信息保存一天,
* 如果停机没有消费达到时长,存储在kafka中的消费者组会被清空,offset也就被清除了。
* 3).自己存储offset,这样在处理逻辑时,保证数据处理的事务,如果处理数据失败,就不保存offset,处理数据成功则保存offset.这样可以做到精准的处理一次处理数据。
*
*/
object SparkStreamingOnKafkaDirect {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("SparkStreamingOnKafkaDirect")
val ssc = new StreamingContext(conf,Durations.seconds(5))
//设置日志级别
// ssc.sparkContext.setLogLevel("ERROR")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node4:9092,node2:9092,node3:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "MyGroupId",//
/**
*
* earliest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始
* latest:自动重置偏移量为最大偏移量【默认】*
* none:没有找到以前的offset,抛出异常
*/
"auto.offset.reset" -> "earliest",
/**
* 当设置 enable.auto.commit为false时,不会自动向kafka中保存消费者offset.需要异步的处理完数据之后手动提交
*/
"enable.auto.commit" -> (false: java.lang.Boolean)//默认是true
)
val topics = Array[String]("mytopic1222")
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,//消费策略
Subscribe[String, String](topics, kafkaParams)
)
val transStrem: DStream[String] = stream.map((record:ConsumerRecord[String, String]) => {
val key_value = (record.key, record.value)
println("receive message key = "+key_value._1)
println("receive message value = "+key_value._2)
key_value._2
})
val wordsDS: DStream[String] = transStrem.flatMap(line=>{line.split("\t")})
val result: DStream[(String, Int)] = wordsDS.map((_,1)).reduceByKey(_+_)
result.print()
/**
* 以上业务处理完成之后,异步的提交消费者offset,这里将 enable.auto.commit 设置成false,就是使用kafka 自己来管理消费者offset
* 注意这里,获取 offsetRanges: Array[OffsetRange] 每一批次topic 中的offset时,必须从 源头读取过来的 stream中获取,不能从经过stream转换之后的DStream中获取。
*/
stream.foreachRDD { rdd =>
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
for(or <- offsetRanges){
println(s"current topic = ${or.topic},partition = ${or.partition},fromoffset = ${or.fromOffset},untiloffset=${or.untilOffset}")
}
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}