前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Logstash与Kafka集成

Logstash与Kafka集成

作者头像
我是攻城师
发布2018-05-14 15:59:19
2.2K0
发布2018-05-14 15:59:19
举报
文章被收录于专栏:我是攻城师我是攻城师

在ELKK的架构中,各个框架的角色分工如下: ElasticSearch1.7.2:数据存储+全文检索+聚合计算+服务端 Logstasch2.2.2:日志收集与分发推送 Kafka0.9.0.0:分布式高可靠消息队列+数据中转存储(失效期默认7天,可配置时间或大小来控制删除策略) Kibana4.1.2:全文检索+查询+图形化页面展示+客户端 拓扑架构如下:

本篇主要讲logstash与kafka的集成: (1)logstash作为kafka的生产者,就是logstash收集的日志发送到kafka中 (2)logstash作为kafka的消费者,消费kafka里面的数据打印到终端 (一)安装kafka集群,请参考散仙上篇文章: http://qindongliang.iteye.com/blog/2278194 (二)安装logstash 这个非常简单,直接下载最新版的logstash,经测试logstash1.5.4有问题,不能正常安装插件 wget https://download.elastic.co/logstash/logstash/logstash-2.2.2.tar.gz 为了能够快速下载logstash的相关插件,然后修改logstash的代理 (方案一) 安装ruby的gem yum -y install ruby rubygems 安装国内淘宝的代理源: gem sources --remove http://rubygems.org/ gem sources -a https://ruby.taobao.org/ gem sources -l

Java代码

  1. *** CURRENT SOURCES ***
  2. https://ruby.taobao.org/

(方案二) 修改logstash目录下的Gemfile里面的source的url为 https://ruby.taobao.org/ 然后就不用用方案一的方法了 最新版的logstash2.2支持修改Gemfile里面的地址为淘宝的镜像地址 使用的是最新版本2.2.2的logstash

Java代码

  1. //安装logstash输出到kafka的插件:
  2. bin/plugin install logstash-output-kafka
  3. //安装logstash从kafka读取的插件:
  4. bin/plugin install logstash-input-kafka

logstash-consume-kafka.conf消费者配置

Java代码

  1. input{
  2. kafka{
  3. //zk的链接地址
  4. zk_connect=>"h1:2181,h2:2181,h3:2181/kafka"
  5. //topic_id,必须提前在kafka中建好
  6. topic_id=>'logstash'
  7. //解码方式json,
  8. codec => json
  9. //消费者id,多个消费者消费同一个topic时,做身份标识
  10. consumer_id => "187"
  11. //消费者组
  12. group_id=> "logstash"
  13. //重新负载时间
  14. rebalance_backoff_ms=>5000
  15. //最大重试次数
  16. rebalance_max_retries=>10
  17. }
  18. }
  19. output{
  20. stdout{
  21. codec=>line
  22. }
  23. }

procuder_kafka_es.conf生产者配置:

Java代码

  1. input{
  2. //监听log文件
  3. file{
  4. path=> ["/ROOT/server/logstash-2.2.2/t.log"]
  5. }
  6. }
  7. output{
  8. //输出端1=>Kafka
  9. kafka{
  10. bootstrap_servers=> 'h1:9092,h2:9092,h3:9092'
  11. topic_id=> 'logstash'
  12. }
  13. //输出端2=>ElasticSearch
  14. elasticsearch{
  15. hosts=> ["192.168.1.187:9200","192.168.1.184:9200","192.168.1.186:9200"]
  16. }
  17. }

如果想用Logstash读取kafka某个topic的所有数据,需要加上下面2个配置:

Java代码

  1. auto_offset_reset => 'smallest'
  2. reset_beginning => true

但需要注意的是,如果是读取所有的数据,那么此时,对于kafka的消费者同时只能有一个,如果有多个 那么会报错,因为读取所有的数据,保证顺序还不能重复读取消息,只能使用一个消费者,如果不是 读取所有,仅仅读取最新传过来的消息,那么可以启动多个消费者,但建议消费者的数目,与该topic的 partition的个数一致,这样效果最佳且能保证partition内的数据顺序一致,如果不需要保证partition分区内数据 有序,可以接受乱序,那就无所谓了 参考资料 http://bigbo.github.io/pages/2015/08/07/logstash_kafka_new/ http://soft.dog/2016/01/08/logstash-plugins/ http://www.rittmanmead.com/2015/10/forays-into-kafka-01-logstash-transport-centralisation/

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2016-02-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 我是攻城师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档