前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Zzreal的大数据笔记-SparkDay05

Zzreal的大数据笔记-SparkDay05

作者头像
企鹅号小编
发布2018-01-22 10:49:35
4640
发布2018-01-22 10:49:35
举报
文章被收录于专栏:大数据大数据

Spark Streaming

SparkStreaming部分没做知识点的笔记,直接从代码上理解它的用法。后面整理Storm的时候会与SparkStreaming做一个对比,如果这时候难以理解SparkStreaming的话就先照着代码学会怎么用,后面结合Storm来理解实时计算体系。

flume+SparkStreaming.conf

---SparkStreaming集成flume的flume配置

#以下是push模式

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/lc/log/1.txt

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

#a1.sinks.k1.type = avro

#a1.sources.k1.channels = c1

#a1.sinks.k1.hostname = localhost

#a1.sinks.k1.port = 19999

#使用pull模式。需要三个jar包放在flume/lib下,分别是spark-streaming-flume-sink_2.11-2.1.1.jar ,scala-#library-2.11.8.jar, commons-lang3-3.3.2.jar,注意版本是你程序用的版本。

a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink

#a1.sources.k1.channels = c1

a1.sinks.k1.hostname = localhost

a1.sinks.k1.port = 19999

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

FlumeSparkStreaming.scala ---Spark代码

import org.apache.spark.sql.SparkSession

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming.flume.FlumeUtils

import org.apache.spark.streaming.

object FlumeSparkStreaming extends App{

val spark = SparkSession.builder().master("local[*]").appName("Streaming").getOrCreate()

val sc = spark.sparkContext;sc.setLogLevel("WARN")

val ssc = new StreamingContext(sc,Seconds(5))

val flumeStream = FlumeUtils.createPollingStream(ssc,"localhost",19999,StorageLevel.MEMORY_ONLY)

flumeStream.map(f=>f.event.toString).print()

ssc.start()

ssc.awaitTermination()

}

Flume+kafka+SparkStreaming.conf

---SparkStreaming集成flume+kafka的flume配置

#flume+kafka+sparkStreaming集成

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/sker/wc.txt

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = test01

a1.sinks.k1.kafka.bootstrap.servers = localhost:9092

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.ki.kafka.producer.compression.type = snappy

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume启动命令

#bin/flume-ng agent --conf conf --conf-file conf /Flume + kafka + SparkStreaming.conf --name a1 -#Dflume.root.logger=INFO,console

FromKafkaStream.scala ---Spark代码

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.sql.SparkSession

import org.apache.spark.streaming.

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object FromKafkaStreamDemo extends App{

val spark = SparkSession.builder().appName("aaaa").master("local[2]").getOrCreate()

val sc = spark.sparkContext

val ssc = new StreamingContext(sc,Seconds(5))

ssc.checkpoint("cp")

val kafkaParams = Map[String, Object](

"bootstrap.servers" -> "localhost:9092",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"group.id" -> "use_a_separate_group_id_for_each_stream",

"auto.offset.reset" -> "latest",

"enable.auto.commit" -> (false: java.lang.Boolean)

)

val topics = Array("test01")

val stream = KafkaUtils.createDirectStream[String, String](

ssc,

PreferConsistent,

Subscribe[String, String](topics, kafkaParams)

)

stream.map(record=>{

(record.key(),record.value(),record.topic(),record.toString)

}).print()

ssc.start()

ssc.awaitTermination()

}

SparkSubmit

以上内容均为作者个人笔记,如有错误欢迎指正...

关注CSDN博客 Zonzereal,更多大数据笔记等你...

本文来自企鹅号 - 全球大搜罗媒体

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文来自企鹅号 - 全球大搜罗媒体

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档