前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用Kafka消费Flume传来的日志

用Kafka消费Flume传来的日志

原创
作者头像
ZHANGHAO
修改2018-12-20 10:00:00
7780
修改2018-12-20 10:00:00
举报
文章被收录于专栏:张浩的专栏张浩的专栏

1.创建Kafka Topic

kafka-topics.sh --create  --zookeeper localhost:2181  --partitions 1 --replication-factor 1   --topic weblogs

2.创建Flume Conf

Source 使用Spooling Directory Source

agent.sources=spooldir-source
agent.sources.spooldir-source.type=spooldir
agent.sources.spooldir-source.spoolDir=/flume/weblogs_spooldir
agent.sources.spooldir-source.deserializer.maxLineLength=20480

Channel 使用Memory Channel

agent.channels=memory-channel
agent.channels.memory-channel.type = memory
agent.channels.memory-channel.capacity = 10000
agent.channels.memory-channel.transactionCapacity = 1000

Flume Sink 使用Kafka Sink

agent.sinks=kafka-sink
agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka-sink.topic = weblogs
agent.sinks.kafka-sink.bootstrap.servers = localhost:9092
agent.sinks.kafka-sink.flumeBatchSize = 20
agent.sinks.kafka-sink.producer.acks = 1
agent.sinks.kafka-sink.producer.linger.ms = 1
agent.sinks.kafka-sink.producer.compression.type = snappy

连接Source与Sink

agent.sources.spooldir-source.channel=memory-channel
agent.sinks.kafka-sink.channel=memory-channel

3.启动Flume来传送日志

flume-ng  --conf  /opt/flume/conf    --conf-file   /opt/flume/conf/kafka-spool.conf    --name  agent -Dflume.root.logger=INFO,console

4.Kafka Producer消费消息

 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic weblogs

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.创建Kafka Topic
  • 2.创建Flume Conf
    • Source 使用Spooling Directory Source
      • Channel 使用Memory Channel
        • Flume Sink 使用Kafka Sink
          • 连接Source与Sink
          • 3.启动Flume来传送日志
            • 4.Kafka Producer消费消息
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档