Flume篇---Flume安装配置与相关使用

一.前述

Copy过来一段介绍Apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务,或者数集中机制。flume具有高可用,分布式,配置工具,其设计的原理也是基于将数据流,如日志数据从各种网站服务器上汇集起来存储到HDFS,HBase等集中存储器中。官网:http://flume.apache.org/FlumeUserGuide.html

二.架构

1.基本架构

介绍:

Source:(相当于一个来源)

   从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channal,Flume提供多种数据接收的方式,比如Avro,Thrift,twitter1%等

Channel:(相当于一个中转)

 channal是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着一共桥梁的作用,channal是一个完整的事务,这一点保证了数据在收发的时候的一致性. 并且它可以和任意数量的source和sink链接. 支持的类型有: JDBC channel , File System channel , Memort channel等.

sink:(相当于最后的写出)

  sink将数据存储到集中存储器比如Hbase和HDFS,它从channals消费数据(events)并将其传递给目标地. 目标地可能是另一个sink,也可能HDFS,HBase.

2.延伸架构

  2.1利用AVRO中转

2.2一般多个来源时可以配置这样

ps:

Avro([ævrə])是Hadoop的一个子项目,由Hadoop的创始人Doug Cutting(也是Lucene,Nutch等项目的创始人)牵头开发。Avro是一个数据序列化系统,设计用于支持大批量数据交换的应用。它的主要特点有:支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro提供的机制使动态语言可以方便地处理Avro数据。

三。具体实施

3.1 安装 1、上传 2、解压 3、修改conf/flume-env.sh  文件中的JDK目录  注意:JAVA_OPTS 配置  如果我们传输文件过大 报内存溢出时 需要修改这个配置项 4、验证安装是否成功  ./flume-ng version 5、配置环境变量     export FLUME_HOME=/home/apache-flume-1.6.0-bin 3.2 Source、Channel、Sink有哪些类型     Flume Source     Source类型                   | 说明     Avro Source                 | 支持Avro协议(实际上是Avro RPC),内置支持     Thrift Source               | 支持Thrift协议,内置支持  Exec Source              | 基于Unix的command在标准输出上生产数据     JMS Source                   | 从JMS系统(消息、主题)中读取数据     Spooling Directory Source | 监控指定目录内数据变更     Twitter 1% firehose Source|    通过API持续下载Twitter数据,试验性质     Netcat Source               | 监控某个端口,将流经端口的每一个文本行数据作为Event输入     Sequence Generator Source | 序列生成器数据源,生产序列数据     Syslog Sources               | 读取syslog数据,产生Event,支持UDP和TCP两种协议     HTTP Source                 | 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式     Legacy Sources               | 兼容老的Flume OG中Source(0.9.x版本)     Flume Channel     Channel类型       说明  Memory Channel                | Event数据存储在内存中     JDBC Channel                  | Event数据存储在持久化存储中,当前Flume Channel内置支持Derby     File Channel                  | Event数据存储在磁盘文件中     Spillable Memory Channel   | Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件     Pseudo Transaction Channel | 测试用途     Custom Channel                | 自定义Channel实现     Flume Sink     Sink类型     说明    HDFS Sink             | 数据写入HDFS     Logger Sink           | 数据写入日志文件     Avro Sink             | 数据被转换成Avro Event,然后发送到配置的RPC端口上     Thrift Sink           | 数据被转换成Thrift Event,然后发送到配置的RPC端口上     IRC Sink              | 数据在IRC上进行回放     File Roll Sink         | 存储数据到本地文件系统     Null Sink             | 丢弃到所有数据     HBase Sink             | 数据写入HBase数据库     Morphline Solr Sink | 数据发送到Solr搜索服务器(集群)     ElasticSearch Sink     | 数据发送到Elastic Search搜索服务器(集群)     Kite Dataset Sink     | 写数据到Kite Dataset,试验性质的     Custom Sink           | 自定义Sink实现 案例1、 A simple example     http://flume.apache.org/FlumeUserGuide.html#a-simple-example     配置文件     ############################################################     # Name the components on this agent     a1.sources = r1     a1.sinks = k1     a1.channels = c1     # Describe/configure the source     a1.sources.r1.type = netcat     a1.sources.r1.bind = localhost     a1.sources.r1.port = 44444     # Describe the sink     a1.sinks.k1.type = logger     # Use a channel which buffers events in memory     a1.channels.c1.type = memory     a1.channels.c1.capacity = 1000     a1.channels.c1.transactionCapacity = 100     # Bind the source and sink to the channel     a1.sources.r1.channels = c1     a1.sinks.k1.channel = c1     ############################################################ 启动flume flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console 指定配置目录

flume-ng agent -n a1 -f op5 -Dflume.root.logger=INFO,console 不用指定配置目录,将上诉source,channel,sink的文件起名为a1,同时指定这个文件在哪 安装telnet yum install telnet 退出 ctrl+]  quit Memory Chanel 配置   capacity:默认该通道中最大的可以存储的event数量是100,   trasactionCapacity:每次最大可以source中拿到或者送到sink中的event数量也是100   keep-alive:event添加到通道中或者移出的允许时间   byte**:即event的字节量的限制,只包括eventbody 案例2、两个flume做集群(第一个agent的sink作为第二个agent的source)     node01服务器中,配置文件     ############################################################     # Name the components on this agent     a1.sources = r1     a1.sinks = k1     a1.channels = c1     # Describe/configure the source     a1.sources.r1.type = netcat     a1.sources.r1.bind = node1     a1.sources.r1.port = 44444     # Describe the sink     # a1.sinks.k1.type = logger     a1.sinks.k1.type = avro     a1.sinks.k1.hostname = node2     a1.sinks.k1.port = 60000     # Use a channel which buffers events in memory     a1.channels.c1.type = memory     a1.channels.c1.capacity = 1000     a1.channels.c1.transactionCapacity = 100     # Bind the source and sink to the channel     a1.sources.r1.channels = c1     a1.sinks.k1.channel = c1     ############################################################     node02服务器中,安装Flume(步骤略)     配置文件     ############################################################     # Name the components on this agent     a1.sources = r1     a1.sinks = k1     a1.channels = c1     # Describe/configure the source     a1.sources.r1.type = avro     a1.sources.r1.bind = node2     a1.sources.r1.port = 60000     # Describe the sink     a1.sinks.k1.type = logger     # Use a channel which buffers events in memory     a1.channels.c1.type = memory     a1.channels.c1.capacity = 1000     a1.channels.c1.transactionCapacity = 100     # Bind the source and sink to the channel     a1.sources.r1.channels = c1     a1.sinks.k1.channel = c1     ############################################################     先启动node02的Flume     flume-ng agent  -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console     再启动node01的Flume     flume-ng agent  -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console     打开telnet 测试  node02控制台输出结果 案例3、Exec Source(监听一个文件)         http://flume.apache.org/FlumeUserGuide.html#exec-source     配置文件     ############################################################     a1.sources = r1     a1.sinks = k1     a1.channels = c1     # Describe/configure the source     a1.sources.r1.type = exec     a1.sources.r1.command = tail -F /home/flume.exec.log     # Describe the sink     a1.sinks.k1.type = logger     # Use a channel which buffers events in memory     a1.channels.c1.type = memory     a1.channels.c1.capacity = 1000     a1.channels.c1.transactionCapacity = 100     # Bind the source and sink to the channel     a1.sources.r1.channels = c1     a1.sinks.k1.channel = c1     ############################################################     启动Flume     flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console     创建空文件演示 touch flume.exec.log     循环添加数据     for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done 案例4、Spooling Directory Source(监听一个目录)         http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source     配置文件     ############################################################     a1.sources = r1     a1.sinks = k1     a1.channels = c1     # Describe/configure the source     a1.sources.r1.type = spooldir     a1.sources.r1.spoolDir = /home/logs     a1.sources.r1.fileHeader = true     # Describe the sink     a1.sinks.k1.type = logger     # Use a channel which buffers events in memory     a1.channels.c1.type = memory     a1.channels.c1.capacity = 1000     a1.channels.c1.transactionCapacity = 100     # Bind the source and sink to the channel     a1.sources.r1.channels = c1     a1.sinks.k1.channel = c1     ############################################################     启动Flume     flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console     拷贝文件演示     mkdir logs     cp flume.exec.log logs/ 案例5、hdfs sink         http://flume.apache.org/FlumeUserGuide.html#hdfs-sink         配置文件     ############################################################     a1.sources = r1     a1.sinks = k1     a1.channels = c1     # Describe/configure the source     a1.sources.r1.type = spooldir     a1.sources.r1.spoolDir = /home/logs     a1.sources.r1.fileHeader = true     # Describe the sink     ***只修改上一个spool sink的配置代码块 a1.sinks.k1.type = logger     a1.sinks.k1.type=hdfs  a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M     ##每隔60s或者文件大小超过10M的时候产生新文件     # hdfs有多少条消息时新建文件,0不基于消息个数     a1.sinks.k1.hdfs.rollCount=0  # hdfs创建多长时间新建文件,0不基于时间     a1.sinks.k1.hdfs.rollInterval=60     # hdfs多大时新建文件,0不基于文件大小     a1.sinks.k1.hdfs.rollSize=10240     # 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件     a1.sinks.k1.hdfs.idleTimeout=3     a1.sinks.k1.hdfs.fileType=DataStream

   #时间参数一定要带上 true     a1.sinks.k1.hdfs.useLocalTimeStamp=true     ## 每五分钟生成一个目录:     # 是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式     a1.sinks.k1.hdfs.round=true     # 时间上进行“舍弃”的值;     a1.sinks.k1.hdfs.roundValue=5     # 时间上进行”舍弃”的单位,包含:second,minute,hour     a1.sinks.k1.hdfs.roundUnit=minute     # Use a channel which buffers events in memory     a1.channels.c1.type = memory     a1.channels.c1.capacity = 1000     a1.channels.c1.transactionCapacity = 100     # Bind the source and sink to the channel  a1.sources.r1.channels = c1     a1.sinks.k1.channel = c1(将source,channel,sink关联)     ############################################################     创建HDFS目录     hadoop fs -mkdir /flume     启动Flume     flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console     查看hdfs文件     hadoop fs -ls /flume/...     hadoop fs -get /flume/...

http://flume.apache.org/ 安装 1、上传 2、解压 3、修改conf/flume-env.sh  文件中的JDK目录  注意:JAVA_OPTS 配置  如果我们传输文件过大 报内存溢出时 需要修改这个配置项 4、验证安装是否成功  ./flume-ng version 5、配置环境变量     export FLUME_HOME=/home/apache-flume-1.6.0-bin Source、Channel、Sink有哪些类型     Flume Source     Source类型                   | 说明     Avro Source                 | 支持Avro协议(实际上是Avro RPC),内置支持     Thrift Source               | 支持Thrift协议,内置支持     Exec Source                 | 基于Unix的command在标准输出上生产数据     JMS Source                   | 从JMS系统(消息、主题)中读取数据     Spooling Directory Source | 监控指定目录内数据变更     Twitter 1% firehose Source|    通过API持续下载Twitter数据,试验性质     Netcat Source               | 监控某个端口,将流经端口的每一个文本行数据作为Event输入     Sequence Generator Source | 序列生成器数据源,生产序列数据     Syslog Sources               | 读取syslog数据,产生Event,支持UDP和TCP两种协议     HTTP Source                 | 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式     Legacy Sources               | 兼容老的Flume OG中Source(0.9.x版本)     Flume Channel     Channel类型       说明     Memory Channel                | Event数据存储在内存中     JDBC Channel                  | Event数据存储在持久化存储中,当前Flume Channel内置支持Derby     File Channel                  | Event数据存储在磁盘文件中     Spillable Memory Channel   | Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件     Pseudo Transaction Channel | 测试用途     Custom Channel                | 自定义Channel实现     Flume Sink     Sink类型     说明     HDFS Sink             | 数据写入HDFS     Logger Sink           | 数据写入日志文件     Avro Sink             | 数据被转换成Avro Event,然后发送到配置的RPC端口上     Thrift Sink           | 数据被转换成Thrift Event,然后发送到配置的RPC端口上     IRC Sink              | 数据在IRC上进行回放     File Roll Sink         | 存储数据到本地文件系统     Null Sink             | 丢弃到所有数据     HBase Sink             | 数据写入HBase数据库     Morphline Solr Sink | 数据发送到Solr搜索服务器(集群)     ElasticSearch Sink     | 数据发送到Elastic Search搜索服务器(集群)     Kite Dataset Sink     | 写数据到Kite Dataset,试验性质的     Custom Sink           | 自定义Sink实现 案例1、 A simple example     http://flume.apache.org/FlumeUserGuide.html#a-simple-example     配置文件     ############################################################     # Name the components on this agent     a1.sources = r1     a1.sinks = k1     a1.channels = c1     # Describe/configure the source     a1.sources.r1.type = netcat     a1.sources.r1.bind = localhost     a1.sources.r1.port = 44444     # Describe the sink     a1.sinks.k1.type = logger     # Use a channel which buffers events in memory     a1.channels.c1.type = memory     a1.channels.c1.capacity = 1000     a1.channels.c1.transactionCapacity = 100     # Bind the source and sink to the channel     a1.sources.r1.channels = c1     a1.sinks.k1.channel = c1     ############################################################ 启动flume flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console 安装telnet yum install telnet 退出 ctrl+]  quit Memory Chanel 配置   capacity:默认该通道中最大的可以存储的event数量是100,   trasactionCapacity:每次最大可以source中拿到或者送到sink中的event数量也是100   keep-alive:event添加到通道中或者移出的允许时间   byte**:即event的字节量的限制,只包括eventbody 案例2、两个flume做集群     node01服务器中,配置文件     ############################################################     # Name the components on this agent     a1.sources = r1     a1.sinks = k1     a1.channels = c1     # Describe/configure the source     a1.sources.r1.type = netcat     a1.sources.r1.bind = node1     a1.sources.r1.port = 44444     # Describe the sink     # a1.sinks.k1.type = logger     a1.sinks.k1.type = avro     a1.sinks.k1.hostname = node2     a1.sinks.k1.port = 60000     # Use a channel which buffers events in memory     a1.channels.c1.type = memory     a1.channels.c1.capacity = 1000     a1.channels.c1.transactionCapacity = 100     # Bind the source and sink to the channel     a1.sources.r1.channels = c1     a1.sinks.k1.channel = c1     ############################################################     node02服务器中,安装Flume(步骤略)     配置文件     ############################################################     # Name the components on this agent     a1.sources = r1     a1.sinks = k1     a1.channels = c1     # Describe/configure the source     a1.sources.r1.type = avro     a1.sources.r1.bind = node2     a1.sources.r1.port = 60000     # Describe the sink     a1.sinks.k1.type = logger     # Use a channel which buffers events in memory     a1.channels.c1.type = memory     a1.channels.c1.capacity = 1000     a1.channels.c1.transactionCapacity = 100     # Bind the source and sink to the channel     a1.sources.r1.channels = c1     a1.sinks.k1.channel = c1     ############################################################     先启动node02的Flume     flume-ng agent  -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console     再启动node01的Flume     flume-ng agent  -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console     打开telnet 测试  node02控制台输出结果 案例3、Exec Source         http://flume.apache.org/FlumeUserGuide.html#exec-source     配置文件     ############################################################     a1.sources = r1     a1.sinks = k1     a1.channels = c1     # Describe/configure the source     a1.sources.r1.type = exec     a1.sources.r1.command = tail -F /home/flume.exec.log     # Describe the sink     a1.sinks.k1.type = logger     # Use a channel which buffers events in memory     a1.channels.c1.type = memory     a1.channels.c1.capacity = 1000     a1.channels.c1.transactionCapacity = 100     # Bind the source and sink to the channel     a1.sources.r1.channels = c1     a1.sinks.k1.channel = c1     ############################################################     启动Flume     flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console     创建空文件演示 touch flume.exec.log     循环添加数据     for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done 案例4、Spooling Directory Source         http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source     配置文件     ############################################################     a1.sources = r1     a1.sinks = k1     a1.channels = c1     # Describe/configure the source     a1.sources.r1.type = spooldir     a1.sources.r1.spoolDir = /home/logs     a1.sources.r1.fileHeader = true     # Describe the sink     a1.sinks.k1.type = logger     # Use a channel which buffers events in memory     a1.channels.c1.type = memory     a1.channels.c1.capacity = 1000     a1.channels.c1.transactionCapacity = 100     # Bind the source and sink to the channel     a1.sources.r1.channels = c1     a1.sinks.k1.channel = c1     ############################################################     启动Flume     flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console     拷贝文件演示     mkdir logs     cp flume.exec.log logs/ 案例5、hdfs sink         http://flume.apache.org/FlumeUserGuide.html#hdfs-sink         配置文件     ############################################################     a1.sources = r1     a1.sinks = k1     a1.channels = c1     # Describe/configure the source     a1.sources.r1.type = spooldir     a1.sources.r1.spoolDir = /home/logs     a1.sources.r1.fileHeader = true     # Describe the sink     ***只修改上一个spool sink的配置代码块 a1.sinks.k1.type = logger     a1.sinks.k1.type=hdfs     a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M     ##每隔60s或者文件大小超过10M的时候产生新文件     # hdfs有多少条消息时新建文件,0不基于消息个数     a1.sinks.k1.hdfs.rollCount=0     # hdfs创建多长时间新建文件,0不基于时间     a1.sinks.k1.hdfs.rollInterval=60     # hdfs多大时新建文件,0不基于文件大小     a1.sinks.k1.hdfs.rollSize=10240     # 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件     a1.sinks.k1.hdfs.idleTimeout=3     a1.sinks.k1.hdfs.fileType=DataStream     a1.sinks.k1.hdfs.useLocalTimeStamp=true     ## 每五分钟生成一个目录:     # 是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式     a1.sinks.k1.hdfs.round=true     # 时间上进行“舍弃”的值;     a1.sinks.k1.hdfs.roundValue=5     # 时间上进行”舍弃”的单位,包含:second,minute,hour     a1.sinks.k1.hdfs.roundUnit=minute     # Use a channel which buffers events in memory     a1.channels.c1.type = memory     a1.channels.c1.capacity = 1000     a1.channels.c1.transactionCapacity = 100     # Bind the source and sink to the channel     a1.sources.r1.channels = c1     a1.sinks.k1.channel = c1     ############################################################     创建HDFS目录     hadoop fs -mkdir /flume     启动Flume     flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console     查看hdfs文件     hadoop fs -ls /flume/...     hadoop fs -get /flume/... 作业: 1、flume如何收集java请求数据 2、项目当中如何来做? 日志存放/log/目录下 以yyyyMMdd为子目录 分别存放每天的数据

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏吴生的专栏

使用 maven 生成一个支持端到端自动测试的 RESTful 服务项目脚手架

和传统后端页面生成技术相较, RESTful 数据服务专注与数据逻辑, 而将数据呈现完全交给前端应用. 这样做可以让后端开发更加单纯, 而且更容易测试. 本文将...

3615
来自专栏老码农专栏

使用 maven 生成一个支持端到端自动测试的 RESTful 服务项目脚手架

1974
来自专栏Java工程师日常干货

写出我的第一个框架:迷你版Spring MVC前期准备关于自定义注解编写核心控制器:DispatcherServletMake it run!

OK,到这里,一个迷你版的Spring MVC就开发完成了,以前,都是用Spring MVC,从没有想过可以自己开发一个出来,这是我的第一次,我的能量超乎我的想...

1094
来自专栏大闲人柴毛毛

Spring速查手册——Bean装配

Spring提供三种Bean的装配方式,分别是: 1. 自动装配Bean 2. 在Java中装配Bean 3. 在XML中装配Bean 1. 自动...

3618
来自专栏芋道源码1024

网关 Spring-Cloud-Gateway 源码解析 —— 网关初始化

7553
来自专栏猿天地

Spring Boot系列之配置读取

子曰:温故而知新,可以为师矣。周日还在学习的就真的是爱学习的人,周日大放送,这周的精彩文章推荐阅读:

2662
来自专栏腾讯云Elasticsearch Service

Elasticsearch Rest Client实战

Elasticsearch官方推荐使用Java REST客户端连接集群并进行数据操作。

5654
来自专栏Java学习网

java开发中spring和springboot常用注解总结,开发人员必学

这个注解可以用于类和方法上,用于类上,表示父路径,如类上是demo,方法上是/demo1,那么访问路径就是demo/demo1

1255
来自专栏代码拾遗

Spring Boot 2.0 教程 - 深入SpringAplication

可以通过SpringApplication.run() 方法轻松的启动一个Spring应用,例如

1104
来自专栏程序猿DD

Spring Cloud实战小贴士:Feign的继承特性(伪RPC模式)

通过之前发布的《Spring Cloud构建微服务架构:服务消费者(Feign)》,我们已经学会如何使用Spring MVC的注解来绑定服务接口。我们几乎完全可...

2718

扫码关注云+社区