前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkStreaming小例子

SparkStreaming小例子

作者头像
用户1171305
发布2017-12-28 11:23:24
1.1K0
发布2017-12-28 11:23:24
举报
文章被收录于专栏:成长道路成长道路

1.安装nc才可以打开端口 rpm -ivh /media/CentOS_6.7_Final/Packages/nmap-5.51-4.el6.x86_64.rpm 2.ncat -lk 1234 3. package com.iflytek.sparkstreaming;

import java.util.Arrays;

import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2; import scala.collection.generic.BitOperations.Int; import scala.collection.script.Start; import sun.net.www.content.audio.x_aiff;

public class SparkStreaming {

 public static void main(String[] args) {   SparkConf sparkConf = new SparkConf().setAppName("sparkSQL");   sparkConf.setMaster("local[2]");   System.setProperty("hadoop.home.dir", "E:\\spark\\spark-1.6.1-bin-hadoop2.6"); //  JavaSparkContext sc = new JavaSparkContext(sparkConf);   JavaStreamingContext jssc=new JavaStreamingContext(sparkConf,Durations.seconds(3));   Logger.getRootLogger().setLevel(Level.OFF);   JavaReceiverInputDStream<String> lines=jssc.socketTextStream("h1", 1234);   JavaDStream<String> words=lines.flatMap(new FlatMapFunction<String,String>() {

   public Iterable<String> call(String x) throws Exception {     return Arrays.asList(x.split(" "));    }   });   JavaPairDStream<String, Integer> pairs=words.mapToPair(new PairFunction<String, String, Integer>() {

   public Tuple2<String, Integer> call(String s) throws Exception {     return new Tuple2<>(s,1);    }   });   JavaPairDStream<String, Integer> javaPairDStream=pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {    @Override    public Integer call(Integer arg0, Integer arg1) throws Exception {     return arg0+arg1;    }   });   javaPairDStream.print();   jssc.start();   jssc.awaitTermination();

 }

}

4.截图

注意:如果向端口发送的信息在控制台显示不出来,记得修改端口号,有可能这个端口被占用。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-04-25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档