1.安装nc才可以打开端口 rpm -ivh /media/CentOS_6.7_Final/Packages/nmap-5.51-4.el6.x86_64.rpm 2.ncat -lk 1234 3. package com.iflytek.sparkstreaming;
import java.util.Arrays;
import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2; import scala.collection.generic.BitOperations.Int; import scala.collection.script.Start; import sun.net.www.content.audio.x_aiff;
public class SparkStreaming {
public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("sparkSQL"); sparkConf.setMaster("local[2]"); System.setProperty("hadoop.home.dir", "E:\\spark\\spark-1.6.1-bin-hadoop2.6"); // JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaStreamingContext jssc=new JavaStreamingContext(sparkConf,Durations.seconds(3)); Logger.getRootLogger().setLevel(Level.OFF); JavaReceiverInputDStream<String> lines=jssc.socketTextStream("h1", 1234); JavaDStream<String> words=lines.flatMap(new FlatMapFunction<String,String>() {
public Iterable<String> call(String x) throws Exception { return Arrays.asList(x.split(" ")); } }); JavaPairDStream<String, Integer> pairs=words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s,1); } }); JavaPairDStream<String, Integer> javaPairDStream=pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer arg0, Integer arg1) throws Exception { return arg0+arg1; } }); javaPairDStream.print(); jssc.start(); jssc.awaitTermination();
}
}
4.截图
注意:如果向端口发送的信息在控制台显示不出来,记得修改端口号,有可能这个端口被占用。