Flume + Kafka + Spark Streaming整合

参考: http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.5.0/FlumeUserGuide.html

  • Logger-->Flume

1/配置Flume配置文件streaming.conf

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.log-sink.type=logger

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

2/Java程序的日志配置文件

log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

3/启动flume-ng

flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console

4/在flume-ng窗口可以即时看到日志的产生

  • Logger-->Flume-->Kafka

1/启动kafka,并创建topic ./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic flume-kafka-streaming-topic

2/配置Flume配置文件streaming2.conf

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = flume-kafka-streaming-topic
agent1.sinks.kafka-sink.brokerList = hadoop:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

3/启动日志生产程序,产生的日志即时的在kafka-console-consumer窗口产生 kafka-console-consumer.sh --zookeeper hadoop:2181 --topic flume-kafka-streaming-topic

  • Logger-->Flume-->Kafka-->Spark Streaming

1/Java代码:

object FlumeKafkaReceiverWordCount {
  def main(args: Array[String]): Unit = {
    if(args.length < 4) {
      //Edit Configuration : hadoop:2181 test flume-kafka-streaming-topic 1
      System.err.println("Usage: FlumeKafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    val Array(zkQuorum, group, topics, numThreads) = args
    
    val sparkConf = new SparkConf().setAppName("FlumeKafkaReceiverWordCount").setMaster("local[2]")
    //val sparkConf = new SparkConf()

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap

    val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

    messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

2/启动上面的程序,即可在Console窗口实时看到单词基数 3/注意: 在本地进行测试, 在IDEA中运行LoggerGenerator, 然后使用Flume、Kafka以及Spark Streaming进行处理操作。

在生产环境上, 1.打包jar,执行LoggerGenerator类 2.Flume、Kafka和本地测试步骤是一样的 3.Spark Streaming的代码也是需要打成jar包,然后使用spark-submit的方式进行提交到环境上执行 4.可以根据实际情况选择运行模式:local/yarn/standalone/mesos 5.在生产上,整个流处理的流程都一样的,区别在于业务逻辑的复杂性

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏跟着阿笨一起玩NET

c#实现打印功能

3732
来自专栏杨龙飞前端

scrollto 到指定位置

2954
来自专栏我和未来有约会

Kit 3D 更新

Kit3D is a 3D graphics engine written for Microsoft Silverlight. Kit3D was inita...

2936
来自专栏张善友的专栏

Mix 10 上的asp.net mvc 2的相关Session

Beyond File | New Company: From Cheesy Sample to Social Platform Scott Hansel...

2787
来自专栏一个会写诗的程序员的博客

Spring Reactor 项目核心库Reactor Core

Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactiv...

2802
来自专栏大内老A

The .NET of Tomorrow

Ed Charbeneau(http://developer.telerik.com/featured/the-net-of-tomorrow/) Exciti...

38610
来自专栏闻道于事

js登录滑动验证,不滑动无法登陆

js的判断这里是根据滑块的位置进行判断,应该是用一个flag判断 <%@ page language="java" contentType="text/html...

8698
来自专栏ASP.NETCore

ASP.NET Core 整合Autofac和Castle实现自动AOP拦截

除了ASP.NETCore自带的IOC容器外,我们还可以使用其他成熟的DI框架,如Autofac,StructureMap等(笔者只用过Unity,Ninjec...

754
来自专栏张善友的专栏

LINQ via C# 系列文章

LINQ via C# Recently I am giving a series of talk on LINQ. the name “LINQ via C...

3015
来自专栏我和未来有约会

Silverlight第三方控件专题

这里我收集整理了目前网上silverlight第三方控件的专题,若果有所遗漏请告知我一下。 名称 简介 截图 telerik 商 RadC...

4395

扫码关注云+社区