专栏首页大数据-Hadoop、SparkFlume + Kafka + Spark Streaming整合

Flume + Kafka + Spark Streaming整合

参考: http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.5.0/FlumeUserGuide.html

  • Logger-->Flume

1/配置Flume配置文件streaming.conf

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.log-sink.type=logger

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

2/Java程序的日志配置文件

log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

3/启动flume-ng

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

4/在flume-ng窗口可以即时看到日志的产生

  • Logger-->Flume-->Kafka

1/启动kafka,并创建topic ./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic flume-kafka-streaming-topic

2/配置Flume配置文件streaming2.conf

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = flume-kafka-streaming-topic
agent1.sinks.kafka-sink.brokerList = hadoop:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

3/启动日志生产程序,产生的日志即时的在kafka-console-consumer窗口产生 kafka-console-consumer.sh --zookeeper hadoop:2181 --topic flume-kafka-streaming-topic

  • Logger-->Flume-->Kafka-->Spark Streaming

1/Java代码:

object FlumeKafkaReceiverWordCount {
  def main(args: Array[String]): Unit = {
    if(args.length < 4) {
      //Edit Configuration : hadoop:2181 test flume-kafka-streaming-topic 1
      System.err.println("Usage: FlumeKafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    val Array(zkQuorum, group, topics, numThreads) = args
    
    val sparkConf = new SparkConf().setAppName("FlumeKafkaReceiverWordCount").setMaster("local[2]")
    //val sparkConf = new SparkConf()

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap

    val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

    messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

2/启动上面的程序,即可在Console窗口实时看到单词基数 3/注意: 在本地进行测试, 在IDEA中运行LoggerGenerator, 然后使用Flume、Kafka以及Spark Streaming进行处理操作。

在生产环境上, 1.打包jar,执行LoggerGenerator类 2.Flume、Kafka和本地测试步骤是一样的 3.Spark Streaming的代码也是需要打成jar包,然后使用spark-submit的方式进行提交到环境上执行 4.可以根据实际情况选择运行模式:local/yarn/standalone/mesos 5.在生产上,整个流处理的流程都一样的,区别在于业务逻辑的复杂性

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flume + Kafka整合

    Flume的安装与综合使用 https://www.jianshu.com/p/90e17b80f366 实时日志采集框架图 ? Flume + Ka...

    sparkle123
  • Linux解决方案:No space left on device

    sparkle123
  • Spark Streaming + Kafka整合

    sparkle123
  • 大数据-Flume采集文件到HDFS

    需求 比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到 hdfs

    cwl_java
  • Linux TraceEvent - 我见过的史上最长宏定义

    TraceEvent是内核中一种探测的机制,据说在不使能的时候是没有损耗的。据说使用起来挺简单,但是要看懂着实需要花些力气。

    Linux阅码场
  • 智能股民宝获1亿美元天使投资,有望成中国首个 IPO 智能硬件

    随着A股的爆发式增长以及雪崩式暴跌,中国股民正在成为地球上最受关注的弱势群体。在华尔街打拼多年的海归团队回国创立“一万点”公司,于今天发布了世界首款专为股民打造...

    罗超频道
  • Softmax,Softmax loss&Cross entropy

    这张图的等号左边部分就是全连接层做的事,W是全连接层的参数,我们也称为权值,X是全连接层的输入,也就是特征。从图上可以看出特征X是N*1的向量,这是怎么得到的呢...

    用户3636924
  • Qt开源作品1-视频流播放ffmpeg内核

    好久以前就写过这个工具,后来因为Qt版本的不断升级以及ffmpeg也经历过好多次的迭代,可能从官网下载的ffmpeg搭配原来的代码不能正确编译,因为很多api已...

    feiyangqingyun
  • iOS AutoLayout全解

    AutoLayout简介 Autolayout是一种全新的布局技术,专门用来布局UI界面的,用来取代Frame布局在遇见屏幕尺寸多重多样的问题。Autolayo...

    xiangzhihong
  • 跟我学 Java 8 新特性之 Stream 流(二)关键知识点

    我们的第一篇文章,主要是通过一个Demo,让大家体验了一下使用流API的那种酣畅淋漓的感觉。如果你没有实践,我还是再次呼吁你动手敲一敲,自己实实在跑一遍上一篇的...

    Java技术栈

扫码关注云+社区

领取腾讯云代金券