Flume:流式数据收集利器

在数据生命周期里的第一环就是数据收集。收集通常有两种办法,一种是周期性批处理拷贝,一种是流式收集。今天我们就说说流式收集利器Flume怎么使用。

使用flume收集数据保存到多节点 by 尹会生

1 使用flume 收集数据到hdfs

由于工作的需要,领导要求收集公司所有在线服务器节点的文本数据,进行存储分析,从网上做了些比较,发现flume 是个简单实现,而且非常强大的工具,这里介绍给大家

首先下载软件:http://flume.apache.org

flume是著名的开源数据收集系统,采用java语言开发,主要工作逻辑可以分成source sinks 和channels 三个部分,

source 是收集器,官方提供了多种收集方式,能够支持文件和常用接口;

sinks是存储器,兼容了大部分市面上看到的文件系统类型,而且还可以将数据传递到下一节点中;

channels用了连接source和sinks;而且还可以使用一个source连接多个channel(sinks)进行传输链路的高可用。

我这里的传感器数据被统一收集到了nginx中,因此只要实现将nginx数据输出到hdfs就可以完成汇总了,为了便于分析,nginx的数据打印到了一个固定文件名的文件中,每天分割一次。那么flume一直监视这个文件就可以持续收集数据到hdfs了。通过官方文档发现flume的tail方式很好用,这里就使用了exec类型的source收集数据。接下来我们看下主要配置文件

# 指定souce sink channel 分别用什么名称,后面配置文件会调用

agent1.sources = s1

agent1.sinks = k1

agent1.channels = c1

# source的类型为执行tail命令,跟踪文件变化来进行数据读取

agent1.sources.s1.type = exec

agent1.sources.s1.command = tail -F /usr/local/openresty/nginx/data/user_info.data

agent1.sources.s1.channels = c1

# 使用主机名来作为存储hdfs的目录,区分不同的主机收集数据

agent1.sources.s1.interceptors = i1

agent1.sources.s1.interceptors.i1.type = host

agent1.sources.s1.interceptors.i1.hostHeader = hostname

# 存储方式使用hdfs

agent1.sinks.k1.type = hdfs

agent1.sinks.k1.hdfs.path =hdfs://NameNodeIP:9000/tmp/nginx/%y-%m-%d/%H

# 存储的目录格式

agent1.sinks.k1.hdfs.filePrefix = %{hostname}/events-

# 设置存储间隔和文件分割,根据自己的实际文件大小自由调整

agent1.sinks.k1.hdfs.batchSize= 500

agent1.sinks.k1.hdfs.fileType = DataStream

agent1.sinks.k1.hdfs.writeFormat =Text

agent1.sinks.k1.hdfs.rollSize = 100

agent1.sinks.k1.hdfs.rollCount = 10000

agent1.sinks.k1.hdfs.rollInterval = 600

agent1.sinks.k1.hdfs.useLocalTimeStamp = True

# 设置中间缓存为内存方式还是磁盘方式

agent1.channels.c1.type = memory

agent1.channels.c1.keep-alive = 120

agent1.channels.c1.capacity = 500000

agent1.channels.c1.transactionCapacity = 600

# 将source 、sink 和channel关联起来

agent1.sources.s1.channels = c1

agent1.sinks.k1.channel = c1

配置好之后启动

cd /usr/local/flume160/

bin/flume-ng agent -c conf -n agent1 -f conf/flume-conf.properties -Dflume.root.logger=INFO,console

查看hdfs 发现我要采集的数据已经都在hdfs中

hadoop dfs -ls /tmp/nginx/15-12-31/17/172.24.150.74

要对hdfs中的数据分析,可以根据数据的格式制作hive表格,然后进行分析就可以了,非常方便吧!

2 收集数据到多个数据源

完成了领导的任务,继续研究下flume的其他强大功能,测试了一下上面提到的数据同时推送到其他节点的功能,使用的方法就是指定多个channel和sink,这里以收集到其他节点存储为文件格式为例,需要做以下修改

# agent1 的channel和sink改为2个

agent1.channels = c1 c2

agent1.sinks = k1 k2

# 第二个sink改为数据收集节点的ip和端口

agent1.sinks.k2.type = avro

agent1.sinks.k2.hostname=ReceiveIP

agent1.sinks.k2.port = 8888

agent1.sinks.k2.channel = c2

# 第二个channel配置和第一个channel一样

agent1.channels.c2.type = memory

agent1.channels.c2.keep-alive = 120

agent1.channels.c2.capacity = 500000

agent1.channels.c2.transactionCapacity = 600

# 将第二个sink连接起来

agent1.sources.s1.channels = c2

agent1.sinks.k2.channel = c2

第二个节点参考上面来配置一套flume,这里使用了FILE_ROLL的sink类型,保存数据到文件

# agent2的基本定义

agent2.sources = s1

agent2.sinks = k1

agent2.channels = c1

# 利用avro RPC 接收 agent1 传过来的数据

agent2.sources.s1.type = avro

agent2.sources.s1.bind = ReceiveIP

agent2.sources.s1.port = 8888

agent2.sources.s1.thread = 5

agent2.sources.s1.channels = c1

# 使用文件接收,这里可以定义文件名称等,省略

agent2.sinks.k1.type = FILE_ROLL

agent2.sinks.k1.sink.directory = /tmp/flume-fileout

agent2.sinks.k1.channel = c1

# 和第一节点一样

agent2.channels.c1.type = memory

agent2.channels.c1.keep-alive = 120

agent2.channels.c1.capacity = 500000

agent2.channels.c1.transactionCapacity = 600

# 连接source和sink

agent2.sources.s1.channels = c1

agent2.sinks.k1.channel = c1

第二节点启动起来

bin/flume-ng agent -c conf -n agent2 -f conf/flume-conf.properties -Dflume.root.logger=INFO,console

之后再次查看,发现数据在hdfs和第二个节点的/tmp/flume-fileout目录都保存了一份数据。这里只是简化功能,可以根据这种方法将采集的数据放入到实时计算,如spark streaming 、storm 等工具中,更能发挥它的价值。

原文发布于微信公众号 - 奇点(qddata)

原文发表时间:2016-01-07

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏JavaWeb

使用Spring profile 多环境配置管理

3648
来自专栏后端技术探索

新浪微博平台服务部署及Web框架

平台作为整个微博架构的基础功能服务层,对外以Http接口的方式提供服务。接口遵守RESTful规范。接口示例如下:

2082
来自专栏FreeBuf

潜伏7年的Linux内核漏洞CVE-2017-2636曝光,可本地提权

又一个古老的Linux内核漏洞被曝光!这次的漏洞可以追溯到2009年,影响的linux发行版包括Red Hat、Debian、Fedora、OpenSUSE和U...

2918
来自专栏不止是前端

大厂的第一堂课,完整的git流程

大厂有着数量庞大的代码库以及复杂的权限验证体系,囊括着开发、测试、上线的完整流程。因此必然会有一套代码仓库的管理流程,而不再是个人的代码随意开发、随意提交。这也...

2544
来自专栏JetpropelledSnake

Python Web学习笔记之并发和并行的区别和实现

你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。 你吃饭吃到一半,电话来了,你停了下来接了电话,接完后继续吃饭,这说明你支...

2277
来自专栏Java帮帮-微信公众号-技术文章全总结

【大牛经验】搜狗商业平台Java技术实践

搜狗商业平台Java技术实践 Java自1995年问世以来,已历经20多年岁月。20年来,IT技术风起云涌,Java始终以其可移植性、跨平台性、生态系统完备性等...

43610
来自专栏喵了个咪的博客空间

phalapi-进阶篇1(Api,Domain,和Model)

#phalapi-进阶篇1(Api,Domain,和Model)# ? ##前言## 先在这里感谢phalapi框架创始人@dogstar,为我们提供了这样一个...

3737
来自专栏Kevin-ZhangCG

前后端分离原理

  前后端分离已成为互联网项目开发的业界标准使用方式,通过Nginx+Tomcat的方式(也可以中间加一个Node.js)有效的进行解耦,并且前后端分离会为以后...

4.4K3
来自专栏程序你好

微服务中使用工作流方式Sagas事务来保证数据完整

3115
来自专栏lulianqi

使用浏览器访问或调试微信公众号(跳过微信认证)

因为大部分公众号web应用实际登录都是使用用户微信认证登录,下文主要是提供一种方法使在PC端使用任意浏览器绕过微信认证完成登录,后面就可以在浏览器中使用或调试w...

3.7K3

扫码关注云+社区

领取腾讯云代金券