Flume + Kafka + Spark Streaming整合

参考: http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.5.0/FlumeUserGuide.html

  • Logger-->Flume

1/配置Flume配置文件streaming.conf

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.log-sink.type=logger

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

2/Java程序的日志配置文件

log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

3/启动flume-ng

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

4/在flume-ng窗口可以即时看到日志的产生

  • Logger-->Flume-->Kafka

1/启动kafka,并创建topic ./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic flume-kafka-streaming-topic

2/配置Flume配置文件streaming2.conf

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = flume-kafka-streaming-topic
agent1.sinks.kafka-sink.brokerList = hadoop:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

3/启动日志生产程序,产生的日志即时的在kafka-console-consumer窗口产生 kafka-console-consumer.sh --zookeeper hadoop:2181 --topic flume-kafka-streaming-topic

  • Logger-->Flume-->Kafka-->Spark Streaming

1/Java代码:

object FlumeKafkaReceiverWordCount {
  def main(args: Array[String]): Unit = {
    if(args.length < 4) {
      //Edit Configuration : hadoop:2181 test flume-kafka-streaming-topic 1
      System.err.println("Usage: FlumeKafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    val Array(zkQuorum, group, topics, numThreads) = args
    
    val sparkConf = new SparkConf().setAppName("FlumeKafkaReceiverWordCount").setMaster("local[2]")
    //val sparkConf = new SparkConf()

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap

    val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

    messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

2/启动上面的程序,即可在Console窗口实时看到单词基数 3/注意: 在本地进行测试, 在IDEA中运行LoggerGenerator, 然后使用Flume、Kafka以及Spark Streaming进行处理操作。

在生产环境上, 1.打包jar,执行LoggerGenerator类 2.Flume、Kafka和本地测试步骤是一样的 3.Spark Streaming的代码也是需要打成jar包,然后使用spark-submit的方式进行提交到环境上执行 4.可以根据实际情况选择运行模式:local/yarn/standalone/mesos 5.在生产上,整个流处理的流程都一样的,区别在于业务逻辑的复杂性

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Hadoop实操

如何使用Spark Streaming读取HBase的数据并写入到HDFS

Spark Streaming是在2013年被添加到Apache Spark中的,作为核心Spark API的扩展它允许用户实时地处理来自于Kafka、Flum...

7104
来自专栏Core Net

ASP.NET Core 2.0 : 七.一张图看透启动背后的秘密

3435
来自专栏依乐祝

.NET Core开发者的福音之玩转Redis的又一傻瓜式神器推荐

为什么写这篇文章呢?因为.NET Core的生态越来越好了!之前玩转.net的时候操作Redis相信大伙都使用过一些组件,但都有一些缺点,如ServiceSta...

432
来自专栏北京马哥教育

Linux内存管理--基本概念

1. Linux物理内存三级架构 ? 对于内存管理,Linux采用了与具体体系架构不相关的设计模型,实现了良好的可伸缩性。它主要由内存节点node、内存区域zo...

3528
来自专栏王亚昌的专栏

【Zookeeper】Leader选举机制示例

一、 选项设置 提到Leader选举,先需要重点介绍下创建znode时的Flag选项。

590
来自专栏芋道源码1024

注册中心 Eureka 源码解析 —— 应用实例注册发现(七)之增量获取

本文主要分享 Eureka-Client 向 Eureka-Server 获取增量注册信息的过程。

1310
来自专栏博岩Java大讲堂

Java日志体系(log4j)

47111
来自专栏Kirito的技术分享

研究优雅停机时的一点思考

最近瞥了一眼项目的重启脚本,发现运维一直在使用 kill-9<pid> 的方式重启 springboot embedded tomcat,其实大家几乎一致认为...

1K6
来自专栏ml

flume安装及配置介绍(二)

注: 环境: skylin-linux Flume的下载方式:   wget http://www.apache.org/dyn/closer.lua/flu...

32511
来自专栏小灰灰

Quick-Task 动态脚本支持框架之使用介绍篇

文章链接:https://liuyueyi.github.io/hexblog/2018/07/19/180719-Quick-Task-动态脚本支持框架之使用...

732

扫码关注云+社区