scala直接调用shell脚本是不行的,但是可以利用java调用shell脚本然后在spark代码中引入java代码实现。
public class MyJavaClass {
public void executeShell(String shpath) {
try {
Process ps = Runtime.getRuntime().exec(shpath);
ps.waitFor();
BufferedReader br = new BufferedReader(new InputStreamReader(ps.getInputStream()));
StringBuffer sb = new StringBuffer();
String line;
while ((line = br.readLine()) != null) {
sb.append(line).append("\n");
}
String result = sb.toString();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
object SparkWordCount {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("wordCount").setMaster("yarn")
val sc=new SparkContext(conf)
val rdd=sc.textFile("hdfs:///test/Hamlet.txt")
val rdd1=rdd.flatMap(x=>x.split(" "))
.filter(x=>x.size>1)
.map(x=>(x,1))
.reduceByKey(_+_)
.map{case(x,y)=>(y,x)}
rdd1.sortByKey(false)
.map{case(a,b)=>(b,a)}
.saveAsTextFile("hdfs:///test/output.txt")//saveAsTextFile是个action,真正开始提交job,
// 调用shell脚本
val shpath = "/data/spark-runshell.sh";
val javaClass = new MyJavaClass()
val addResult = javaClass.executeShell(shpath)
println(addResult);
}
}
#!/bin/bash
spark-submit \
--master yarn \
--class com.kouyy.test.SparkWordCount /data/sparkdemo-1.0-SNAPSHOT.jar
服务器上spark程序Jar包及运行脚本.png
#!/bin/bash
echo 'spark调用shell脚本运行成功'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'spark调用shell脚本运行成功'
echo 'spark调用shell脚本运行成功'
echo 'spark调用shell脚本运行成功'
echo 'spark调用shell脚本运行成功'
echo 'spark调用shell脚本运行成功'
echo 'spark调用shell脚本运行成功'
echo 'spark调用shell脚本运行成功'