前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flume + Kafka + Spark Streaming整合

Flume + Kafka + Spark Streaming整合

作者头像
sparkle123
发布2018-06-14 15:26:37
1.3K0
发布2018-06-14 15:26:37
举报

参考: http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.5.0/FlumeUserGuide.html

  • Logger-->Flume

1/配置Flume配置文件streaming.conf

代码语言:javascript
复制
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.log-sink.type=logger

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

2/Java程序的日志配置文件

代码语言:javascript
复制
log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

3/启动flume-ng

代码语言:javascript
复制
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

4/在flume-ng窗口可以即时看到日志的产生

  • Logger-->Flume-->Kafka

1/启动kafka,并创建topic ./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic flume-kafka-streaming-topic

2/配置Flume配置文件streaming2.conf

代码语言:javascript
复制
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = flume-kafka-streaming-topic
agent1.sinks.kafka-sink.brokerList = hadoop:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

3/启动日志生产程序,产生的日志即时的在kafka-console-consumer窗口产生 kafka-console-consumer.sh --zookeeper hadoop:2181 --topic flume-kafka-streaming-topic

  • Logger-->Flume-->Kafka-->Spark Streaming

1/Java代码:

代码语言:javascript
复制
object FlumeKafkaReceiverWordCount {
  def main(args: Array[String]): Unit = {
    if(args.length < 4) {
      //Edit Configuration : hadoop:2181 test flume-kafka-streaming-topic 1
      System.err.println("Usage: FlumeKafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    val Array(zkQuorum, group, topics, numThreads) = args
    
    val sparkConf = new SparkConf().setAppName("FlumeKafkaReceiverWordCount").setMaster("local[2]")
    //val sparkConf = new SparkConf()

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap

    val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

    messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

2/启动上面的程序,即可在Console窗口实时看到单词基数 3/注意: 在本地进行测试, 在IDEA中运行LoggerGenerator, 然后使用Flume、Kafka以及Spark Streaming进行处理操作。

在生产环境上, 1.打包jar,执行LoggerGenerator类 2.Flume、Kafka和本地测试步骤是一样的 3.Spark Streaming的代码也是需要打成jar包,然后使用spark-submit的方式进行提交到环境上执行 4.可以根据实际情况选择运行模式:local/yarn/standalone/mesos 5.在生产上,整个流处理的流程都一样的,区别在于业务逻辑的复杂性

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.05.09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档