专栏首页扎心了老铁java spark-streaming接收TCP/Kafka数据

java spark-streaming接收TCP/Kafka数据

 本文将展示

1、如何使用spark-streaming接入TCP数据并进行过滤;

2、如何使用spark-streaming接入TCP数据并进行wordcount;

内容如下:

1、使用maven,先解决pom依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>

1、接收TCP数据并过滤,打印含有error的行

package com.xiaoju.dqa.realtime_streaming;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Durations;


//nc -lk 9999
public class SparkStreamingTCP {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("streaming word count");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        JavaDStream<String> lines = jssc.socketTextStream("10.93.21.21", 9999);
        JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                return s.contains("error");
            }
        });
        errorLines.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

执行方法

$ spark-submit realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar
# 另起一个窗口
$ nc -lk 9999
# 输入数据

2、接收Kafka数据并进行计数(WordCount)

package com.xiaoju.dqa.realtime_streaming;

import java.util.*;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;

import scala.Tuple2;

// bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
public class SparkStreamingKafka {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setMaster("yarn-client").setAppName("streaming word count");
        //String topic = "offline_log_metrics";
        String topic = "test";
        int part = 1;
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));
        Map<String ,Integer> topicMap = new HashMap<String, Integer>();
        String[] topics = topic.split(";");
        for (int i=0; i<topics.length; i++) {
            topicMap.put(topics[i], 1);
        }
        List<JavaPairReceiverInputDStream<String, String>> list = new ArrayList<JavaPairReceiverInputDStream<String, String>>();
        for (int i = 0; i < part; i++) {
            list.add(KafkaUtils.createStream(jssc,
                    "10.93.21.21:2181",
                    "bigdata_qa",
                    topicMap));
        }
        JavaPairDStream<String, String> wordCountLines = list.get(0);
        for (int i = 1; i < list.size(); i++) {
            wordCountLines = wordCountLines.union(list.get(i));
        }
        JavaPairDStream<String, Integer> counts = wordCountLines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>(){
            @Override
            public Iterable<String> call(Tuple2<String, String> stringStringTuple2){
                List<String> list2 = null;
                try {
                    if ("".equals(stringStringTuple2._2) || stringStringTuple2._2 == null) {
                        System.out.println("_2 is null");
                        throw new Exception("_2 is null");
                    }
                    list2 = Arrays.asList(stringStringTuple2._2.split(" "));
                } catch (Exception ex) {
                    ex.printStackTrace();
                    System.out.println(ex.getMessage());
                }
                return list2;
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                Tuple2<String, Integer> tuple2 = null;
                try {
                    if (s==null || "".equals(s)) {
                        tuple2 = new Tuple2<String, Integer>(s, 0);
                        throw new Exception("s is null");
                    }
                    tuple2 = new Tuple2<String, Integer>(s, 1);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
                return tuple2;
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer x, Integer y) throws Exception {
                return x + y;
            }
        });
        counts.print();

        jssc.start();
        try {
            jssc.awaitTermination();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            jssc.close();
        }
    }
}

执行方法

 $ spark-submit --queue=root.XXX realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar
# 另开一个窗口,启动kafka生产者
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 输入数据

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 大数据算法设计模式(2) - 左外链接(leftOuterJoin) spark实现

    左外链接(leftOuterJoin) spark实现 package com.kangaroo.studio.algorithms.join; impor...

    用户1225216
  • CentOS环境下Docker私有仓库搭建

    本文讲述如何搭建docker私有仓库。 有了docker hub,为什么还要搭建docker私有仓库? 1、性能考虑:docker hub的访问要通过互联网,性...

    用户1225216
  • 使用hive客户端java api读写hive集群上的信息

    上文介绍了hdfs集群信息的读取方式,本文说hive 1、先解决依赖 <properties> <hive.version>1.2.1</hiv...

    用户1225216
  • Java微信公众平台开发_05_微信网页授权

    登录微信公众平台后台, 开发 - 接口权限 - 网页服务 - 网页帐号 - 网页授权获取用户基本信息 - 修改,

    shirayner
  • JavaWeb_常用功能_01_文件上传

     一个功能完善的JavaWeb应用,必不可少的一个功能就是文件的上传。无论是用户的头像等,还是用户需要上传的一系列资料,都是通过文件的上传功能实现的。

    shirayner
  • Java爬虫

    jsoup-1.7.3.jar 个人认为爬虫的实现机制: 获取Docume对象—>获取节点—>输出或者持久化

    用户1518699
  • 微信支付开发实记

    微信支付分为JSAPI支付,扫码支付,APP支付,小程序支付等不同的支付方式。但大体的支付过程是一致的,本文以JSAPI支付,也就是微信内的H5支付为例,描述一...

    超超不会飞
  • Kubernetes-V1.14.2 二进制编译安装(环境准备篇)

    准备部署一个 一主两从 的 三节点 Kubernetes集群,整体节点规划如下表所示:

    Leorizon
  • Spring Cloud中Feign的继承特性

    上篇文章我们了解了Feign的基本使用,在HelloService类中声明接口时,我们发现这里的代码可以直接从服务提供者的Controller中复制过来,这些可...

    江南一点雨
  • Java爬虫快速开发工具uncs的部署全攻略

    uncs是java快速开发爬虫的工具,简单便捷,经过大量版本迭代和生产验证,可以适用大多数网站,推荐使用。

    宜信技术学院

扫码关注云+社区

领取腾讯云代金券