SparkStreaming小例子

1.安装nc才可以打开端口 rpm -ivh /media/CentOS_6.7_Final/Packages/nmap-5.51-4.el6.x86_64.rpm 2.ncat -lk 1234 3. package com.iflytek.sparkstreaming;

import java.util.Arrays;

import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2; import scala.collection.generic.BitOperations.Int; import scala.collection.script.Start; import sun.net.www.content.audio.x_aiff;

public class SparkStreaming {

 public static void main(String[] args) {   SparkConf sparkConf = new SparkConf().setAppName("sparkSQL");   sparkConf.setMaster("local[2]");   System.setProperty("hadoop.home.dir", "E:\\spark\\spark-1.6.1-bin-hadoop2.6"); //  JavaSparkContext sc = new JavaSparkContext(sparkConf);   JavaStreamingContext jssc=new JavaStreamingContext(sparkConf,Durations.seconds(3));   Logger.getRootLogger().setLevel(Level.OFF);   JavaReceiverInputDStream<String> lines=jssc.socketTextStream("h1", 1234);   JavaDStream<String> words=lines.flatMap(new FlatMapFunction<String,String>() {

   public Iterable<String> call(String x) throws Exception {     return Arrays.asList(x.split(" "));    }   });   JavaPairDStream<String, Integer> pairs=words.mapToPair(new PairFunction<String, String, Integer>() {

   public Tuple2<String, Integer> call(String s) throws Exception {     return new Tuple2<>(s,1);    }   });   JavaPairDStream<String, Integer> javaPairDStream=pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {    @Override    public Integer call(Integer arg0, Integer arg1) throws Exception {     return arg0+arg1;    }   });   javaPairDStream.print();   jssc.start();   jssc.awaitTermination();

 }

}

4.截图

注意:如果向端口发送的信息在控制台显示不出来,记得修改端口号,有可能这个端口被占用。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Hongten

利用Velocity自动生成自定义代码_java版_源码下载

=======================================================

572
来自专栏

Spring MVC基于注解的Junit测试

INFO : org.springframework.test.context.TestContextManager - @TestExecutionList...

675
来自专栏向治洪

电话拦截

首先需要 android 源码文件NeighboringCellInfo.aidl和ITelephony.aidl,新建文件夹android.telephony...

1917
来自专栏Netkiller

Spring Data MongoDB

以下节选自《Netkiller Java 手札》 11.6.2. Spring Data MongoDB 11.6.2.1. pom.xml 注意Spring4...

3305
来自专栏Hongten

原创Java版的Shell

如果你接触过windows操作系统,你应该对windows中的cmd有一定的了解。

754
来自专栏禁心尽力

关于web.xml3.0启动报错

九月 08, 2017 10:18:19 上午 org.apache.tomcat.util.digester.SetPropertiesRule begin ...

2857
来自专栏流媒体

aapt命令介绍

aapt l -a app-release-unsigned.apk>l.txt 文件

512
来自专栏挖坑填坑

使用typescript开发angular模块(编写模块)

之前在使用typescript开发angular模块(发布npm包)一文中基本掌握了怎么发布一个typescript写的npm包。但是离目标还有段距离。

883
来自专栏岑玉海

Spark Streaming自定义Receivers

自定义一个Receiver class SocketTextStreamReceiver(host: String, port: Int( ...

2623
来自专栏JAVA技术站

Eclispe下集成JFinal中jetty包作为开发环境

1.如果是gradle 或是maven项目地址在这,jetty-server包http://maven.oschina.net/index.html#nexu...

742

扫码关注云+社区