前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark-Streaming实时数据读取(kafka)

Spark-Streaming实时数据读取(kafka)

作者头像
每天学Java
发布2020-06-01 10:45:40
1.2K0
发布2020-06-01 10:45:40
举报
文章被收录于专栏:每天学Java

上一篇文章我们使用Spark对MySQL进行读写,实际上Spark在工作中更多的是充当实时流计算框架

引入依赖
代码语言:javascript
复制
       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0-preview</version>
<!--            <scope>provided</scope>-->
        </dependency>
代码
代码语言:javascript
复制
package spark;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * @Auther: 陈龙
 * @Date: 2019-12-03 20:28
 * @Description:
 */
public class SparkStreamDemo {
    public static void main(String[] args) throws InterruptedException {
        SparkConf sparkConf = new SparkConf().setAppName("SparkStreamDemo").setMaster("local[4]");
        SparkContext sc = new SparkContext(sparkConf);
        JavaStreamingContext streamingContext = new JavaStreamingContext(JavaSparkContext.fromSparkContext(sc), Duration.apply(5000));
//        JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(5));
//        sc.setLogLevel("WARN");
        JavaReceiverInputDStream<String> lines = streamingContext.socketTextStream("127.0.0.1", 9999);
        JavaDStream<String> words =   lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String lines) throws Exception {
                System.out.println(lines);
                List<String > list = new ArrayList<>();
                list.add(lines);
                return list.iterator();
            }
        });
        //触发DStream需要的aciton
        words.print();

        streamingContext.start();
        streamingContext.awaitTermination();
    }
}
测试

控制台可以通过 nc -lk 9999 发送消息

kafka
代码语言:javascript
复制
       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0-preview</version>
        </dependency>
实现代码
代码语言:javascript
复制
package spark;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import java.util.*;

/**
 * @Auther: 陈龙
 * @Date: 2019-12-03 20:30
 * @Description:
 */
public class SparkkafkaDemo {
    public static void main(String[] args) throws InterruptedException {
        String brokers = "127.0.0.1:9092";
        String topics = "test_topic";
        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("streaming word count")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(7));

        Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        //kafka相关参数
        Map<String, Object> kafkaParams = new HashMap<>();
//        kafkaParams.put("metadata.broker.list", brokers) ;{"name": "id", "type": "string"}
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("group.id", "1");
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

//        Map<TopicPartition, Long> offsets = new HashMap<>();

//        offsets.put(new TopicPartition("test_topic",1), 2L);

        JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
        );

        JavaDStream<ConsumerRecord<Object,Object>> words =   lines.flatMap(new FlatMapFunction<ConsumerRecord<Object, Object>, ConsumerRecord<Object, Object>>() {
            @Override
            public Iterator<ConsumerRecord<Object, Object>> call(ConsumerRecord<Object, Object> record) throws Exception {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                List<ConsumerRecord<Object, Object> > list = new ArrayList<>();
                list.add(record);
                return list.iterator();
            }
        });

        //触发DStream需要的aciton
        words.print();

        ssc.start();
        ssc.awaitTermination();
    }
}
测试

关于kafka消息生产可以参考文章中的中间件:kafka入门

执行上面程序,启动kafka,在kafka文件的bin目录执行下面命令

echo '00000,{"name":"Steve", "title":"Captain America"}' | ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic --property parse.key=true --property key.separator=,

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-12-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 每天学Java 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引入依赖
  • 代码
  • 测试
  • kafka
  • 实现代码
  • 测试
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档