上一篇文章我们使用Spark对MySQL进行读写,实际上Spark在工作中更多的是充当实时流计算框架
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0-preview</version>
<!-- <scope>provided</scope>-->
</dependency>
package spark;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* @Auther: 陈龙
* @Date: 2019-12-03 20:28
* @Description:
*/
public class SparkStreamDemo {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setAppName("SparkStreamDemo").setMaster("local[4]");
SparkContext sc = new SparkContext(sparkConf);
JavaStreamingContext streamingContext = new JavaStreamingContext(JavaSparkContext.fromSparkContext(sc), Duration.apply(5000));
// JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(5));
// sc.setLogLevel("WARN");
JavaReceiverInputDStream<String> lines = streamingContext.socketTextStream("127.0.0.1", 9999);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String lines) throws Exception {
System.out.println(lines);
List<String > list = new ArrayList<>();
list.add(lines);
return list.iterator();
}
});
//触发DStream需要的aciton
words.print();
streamingContext.start();
streamingContext.awaitTermination();
}
}
控制台可以通过 nc -lk 9999 发送消息
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0-preview</version>
</dependency>
package spark;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.*;
/**
* @Auther: 陈龙
* @Date: 2019-12-03 20:30
* @Description:
*/
public class SparkkafkaDemo {
public static void main(String[] args) throws InterruptedException {
String brokers = "127.0.0.1:9092";
String topics = "test_topic";
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("streaming word count")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("WARN");
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(7));
Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
//kafka相关参数
Map<String, Object> kafkaParams = new HashMap<>();
// kafkaParams.put("metadata.broker.list", brokers) ;{"name": "id", "type": "string"}
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("group.id", "1");
kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Map<TopicPartition, Long> offsets = new HashMap<>();
// offsets.put(new TopicPartition("test_topic",1), 2L);
JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
);
JavaDStream<ConsumerRecord<Object,Object>> words = lines.flatMap(new FlatMapFunction<ConsumerRecord<Object, Object>, ConsumerRecord<Object, Object>>() {
@Override
public Iterator<ConsumerRecord<Object, Object>> call(ConsumerRecord<Object, Object> record) throws Exception {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
List<ConsumerRecord<Object, Object> > list = new ArrayList<>();
list.add(record);
return list.iterator();
}
});
//触发DStream需要的aciton
words.print();
ssc.start();
ssc.awaitTermination();
}
}
关于kafka消息生产可以参考文章中的中间件:kafka入门
执行上面程序,启动kafka,在kafka文件的bin目录执行下面命令
echo '00000,{"name":"Steve", "title":"Captain America"}' | ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic --property parse.key=true --property key.separator=,