Zzreal的大数据笔记-SparkDay05

Spark Streaming

SparkStreaming部分没做知识点的笔记,直接从代码上理解它的用法。后面整理Storm的时候会与SparkStreaming做一个对比,如果这时候难以理解SparkStreaming的话就先照着代码学会怎么用,后面结合Storm来理解实时计算体系。

flume+SparkStreaming.conf

---SparkStreaming集成flume的flume配置

#以下是push模式

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/lc/log/1.txt

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

#a1.sinks.k1.type = avro

#a1.sources.k1.channels = c1

#a1.sinks.k1.hostname = localhost

#a1.sinks.k1.port = 19999

#使用pull模式。需要三个jar包放在flume/lib下,分别是spark-streaming-flume-sink_2.11-2.1.1.jar ,scala-#library-2.11.8.jar, commons-lang3-3.3.2.jar,注意版本是你程序用的版本。

a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink

#a1.sources.k1.channels = c1

a1.sinks.k1.hostname = localhost

a1.sinks.k1.port = 19999

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

FlumeSparkStreaming.scala ---Spark代码

import org.apache.spark.sql.SparkSession

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming.flume.FlumeUtils

import org.apache.spark.streaming.

object FlumeSparkStreaming extends App{

val spark = SparkSession.builder().master("local[*]").appName("Streaming").getOrCreate()

val sc = spark.sparkContext;sc.setLogLevel("WARN")

val ssc = new StreamingContext(sc,Seconds(5))

val flumeStream = FlumeUtils.createPollingStream(ssc,"localhost",19999,StorageLevel.MEMORY_ONLY)

flumeStream.map(f=>f.event.toString).print()

ssc.start()

ssc.awaitTermination()

}

Flume+kafka+SparkStreaming.conf

---SparkStreaming集成flume+kafka的flume配置

#flume+kafka+sparkStreaming集成

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/sker/wc.txt

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = test01

a1.sinks.k1.kafka.bootstrap.servers = localhost:9092

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.ki.kafka.producer.compression.type = snappy

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume启动命令

#bin/flume-ng agent --conf conf --conf-file conf /Flume + kafka + SparkStreaming.conf --name a1 -#Dflume.root.logger=INFO,console

FromKafkaStream.scala ---Spark代码

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.sql.SparkSession

import org.apache.spark.streaming.

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object FromKafkaStreamDemo extends App{

val spark = SparkSession.builder().appName("aaaa").master("local[2]").getOrCreate()

val sc = spark.sparkContext

val ssc = new StreamingContext(sc,Seconds(5))

ssc.checkpoint("cp")

val kafkaParams = Map[String, Object](

"bootstrap.servers" -> "localhost:9092",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"group.id" -> "use_a_separate_group_id_for_each_stream",

"auto.offset.reset" -> "latest",

"enable.auto.commit" -> (false: java.lang.Boolean)

)

val topics = Array("test01")

val stream = KafkaUtils.createDirectStream[String, String](

ssc,

PreferConsistent,

Subscribe[String, String](topics, kafkaParams)

)

stream.map(record=>{

(record.key(),record.value(),record.topic(),record.toString)

}).print()

ssc.start()

ssc.awaitTermination()

}

SparkSubmit

以上内容均为作者个人笔记,如有错误欢迎指正...

关注CSDN博客 Zonzereal,更多大数据笔记等你...

  • 发表于:
  • 原文链接:http://kuaibao.qq.com/s/20171228G02T0H00?refer=cp_1026

扫码关注云+社区