SparkStreaming小例子

1.安装nc才可以打开端口 rpm -ivh /media/CentOS_6.7_Final/Packages/nmap-5.51-4.el6.x86_64.rpm 2.ncat -lk 1234 3. package com.iflytek.sparkstreaming;

import java.util.Arrays;

import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2; import scala.collection.generic.BitOperations.Int; import scala.collection.script.Start; import sun.net.www.content.audio.x_aiff;

public class SparkStreaming {

 public static void main(String[] args) {   SparkConf sparkConf = new SparkConf().setAppName("sparkSQL");   sparkConf.setMaster("local[2]");   System.setProperty("hadoop.home.dir", "E:\\spark\\spark-1.6.1-bin-hadoop2.6"); //  JavaSparkContext sc = new JavaSparkContext(sparkConf);   JavaStreamingContext jssc=new JavaStreamingContext(sparkConf,Durations.seconds(3));   Logger.getRootLogger().setLevel(Level.OFF);   JavaReceiverInputDStream<String> lines=jssc.socketTextStream("h1", 1234);   JavaDStream<String> words=lines.flatMap(new FlatMapFunction<String,String>() {

   public Iterable<String> call(String x) throws Exception {     return Arrays.asList(x.split(" "));    }   });   JavaPairDStream<String, Integer> pairs=words.mapToPair(new PairFunction<String, String, Integer>() {

   public Tuple2<String, Integer> call(String s) throws Exception {     return new Tuple2<>(s,1);    }   });   JavaPairDStream<String, Integer> javaPairDStream=pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {    @Override    public Integer call(Integer arg0, Integer arg1) throws Exception {     return arg0+arg1;    }   });   javaPairDStream.print();   jssc.start();   jssc.awaitTermination();

 }

}

4.截图

注意:如果向端口发送的信息在控制台显示不出来,记得修改端口号,有可能这个端口被占用。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏ACM小冰成长之路

51Nod-1635-第K个幸运排列

ACM模版 描述 ? 题解 image.png 代码 #include <iostream> using namespace std; typedef lo...

2128
来自专栏lgp20151222

SSH上一个随笔的基础上添加上hibernate支持

熟悉的pom.xml其中lo4g和slf4j这两个包第一眼看上去有点莫名奇妙,我也是这么觉得的,实际作用是在后台输出sql语句,不导入hibernate就会报错...

701
来自专栏码匠的流水账

聊聊rocketmq的PushConsumerImpl

io/openmessaging/rocketmq/consumer/PushConsumerImpl.java

1362
来自专栏mukekeheart的iOS之旅

No.013 Roman to Integer

13. Roman to Integer Total Accepted: 95998 Total Submissions: 234087 Difficulty:...

2355
来自专栏函数式编程语言及工具

FunDA(11)- 数据库操作的并行运算:Parallel data processing

   FunDA最重要的设计目标之一就是能够实现数据库操作的并行运算。我们先重温一下fs2是如何实现并行运算的。我们用interleave、merge、eith...

1878
来自专栏Golang语言社区

Go语言实现AzDG可逆加密算法实例

本文实例讲述了Go语言实现AzDG可逆加密算法。分享给大家供大家参考。具体实现方法如下: package main import ( "fmt" "cryp...

3538
来自专栏木宛城主

PowerShell 获取Site Collection下被签出的文件

由于权限的设置,当文件被签出时导致别人不可见了,这对校验文件个数的人来说着实是件烦恼的事。幸好利用PowerShell,可以获取Site Collection下...

1907
来自专栏拂晓风起

jQuery 和 json 简单例子(注意callback函数的处理!!) (servlet返回json,jquery更新,java json)

1023
来自专栏大数据智能实战

spark 从HIVE读数据导入hbase中发生空指针(java.lang.NullPointerException)问题的解决

陆续好多人会问,在写入Hbase的时候总是会出现空指针的问题,而检查程序,看起来一点也没有错。 如报的错误大致如下: Error: application fa...

3205
来自专栏码匠的流水账

聊聊storm的CheckpointSpout

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java

1756

扫码关注云+社区