在环境变量中增加如下命令,可以使用 bd
快速切换到 /data/tools/bigdata
cd /etc/profile.d/
vi bd.sh
内容如下
alias bd='cd /data/tools/bigdata'
配置生效
source /etc/profile
下载地址
https://dlcdn.apache.org/flume/1.9.0/
上传至虚拟机,并解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /data/tools/bigdata
环境变量
创建配置文件
vi /etc/profile.d/flume.sh
内容设置为
# FLUME
export FLUME_HOME=/data/tools/bigdata/apache-flume-1.9.0-bin
export PATH=$PATH:$FLUME_HOME/bin
配置生效
source /etc/profile
查看是否生效
echo $FLUME_HOME
查看flume版本
flume-ng version
监控一个目录,将数据打印出来
配置文件
vi $FLUME_HOME/conf/spoolingtest.conf
内容如下
# a1表示agent的名字 可以自定义
# 给sources(在一个agent里可以定义多个source)取个名字
a1.sources = r1
# 给channel个名字
a1.channels = c1
# 给channel个名字
a1.sinks = k1
# 对source进行配置
# agent的名字.sources.source的名字.参数 = 参数值
# source的类型 spoolDir(监控一个目录下的文件的变化)
a1.sources.r1.type = spooldir
# 监听哪一个目录
a1.sources.r1.spoolDir = /root/data
# 是否在event的headers中保存文件的绝对路径
a1.sources.r1.fileHeader = true
# 给拦截器取个名字 i1
a1.sources.r1.interceptors = i1
# 使用timestamp拦截器,将处理数据的时间保存到event的headers中
a1.sources.r1.interceptors.i1.type = timestamp
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置sink为logger
# 直接打印到控制台
a1.sinks.k1.type = logger
# 将source、channel、sink组装成agent
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
新建/root/data目录
mkdir /root/data
启动agent
flume-ng agent -n a1 -f $FLUME_HOME/conf/spoolingtest.conf -Dflume.root.logger=DEBUG,console
在/root/data/目录下新建文件,输入内容,观察flume进程打印的日志
随意在test.txt中加入一些内容
vi /root/data/test.txt
我们会发现它做了如下操作
test.txt.COMPLETED
创建配置文件
vi $FLUME_HOME/conf/spoolingToHDFS.conf
配置文件
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = spooldir
a.sources.r1.spoolDir = /root/data
a.sources.r1.fileHeader = true
a.sources.r1.interceptors = i1
a.sources.r1.interceptors.i1.type = timestamp
#指定sink的类型
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir1
# 指定文件名前缀
a.sinks.k1.hdfs.filePrefix = student
# 指定达到多少数据量写一次文件 单位:bytes
a.sinks.k1.hdfs.rollSize = 102400
# 指定多少条写一次文件
a.sinks.k1.hdfs.rollCount = 1000
# 指定文件类型为 流 来什么输出什么
a.sinks.k1.hdfs.fileType = DataStream
# 指定文件输出格式 为text
a.sinks.k1.hdfs.writeFormat = text
# 指定文件名后缀
a.sinks.k1.hdfs.fileSuffix = .txt
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
在 /root/data/目录下准备数据
vi /root/data/study.txt
内容如下
good good study
day day up
启动agent
如果HDFS没有启动要先启动HDFS
flume-ng agent -n a -f $FLUME_HOME/conf/spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console
启动HDFS
$HADOOP_HOME/sbin/start-dfs.sh
停止HDFS
$HADOOP_HOME/sbin/stop-dfs.sh
可以从网址中查看文件保存情况(192.168.160.130是我的服务器IP)
http://192.168.160.130:50070/explorer.html#/
vi $FLUME_HOME/conf/hbaseLogToHDFS.conf
配置文件
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = exec
a.sources.r1.command = tail -f /data/tools/bigdata/hbase-2.4.11/logs/hbase-root-master-master.log
#指定sink的类型
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir2
# 指定文件名前缀
a.sinks.k1.hdfs.filePrefix = hbaselog
# 指定达到多少数据量写一次文件 单位:bytes
a.sinks.k1.hdfs.rollSize = 102400
# 指定多少条写一次文件
a.sinks.k1.hdfs.rollCount = 1000
# 指定文件类型为 流 来什么输出什么
a.sinks.k1.hdfs.fileType = DataStream
# 指定文件输出格式 为text
a.sinks.k1.hdfs.writeFormat = text
# 指定文件名后缀
a.sinks.k1.hdfs.fileSuffix = .txt
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
启动
flume-ng agent -n a -f $FLUME_HOME/conf/hbaseLogToHDFS.conf -Dflume.root.logger=DEBUG,console
创建配置文件
vi $FLUME_HOME/conf/hbaselogToHBase.conf
配置文件
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = exec
a.sources.r1.command = cat /data/tools/bigdata/hbase-2.4.11/logs/hbase-root-master-master.log
#指定sink的类型
a.sinks.k1.type = hbase
a.sinks.k1.table = t_log
a.sinks.k1.columnFamily = cf1
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 100000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
在hbase中创建log表
create 't_log','cf1'
启动
flume-ng agent -n a -f $FLUME_HOME/conf/hbaselogToHBase.conf -Dflume.root.logger=DEBUG,console
创建配置文件
vi $FLUME_HOME/conf/netcatLogger.conf
配置文件
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = netcat
a.sources.r1.bind = 0.0.0.0
a.sources.r1.port = 8888
#指定sink的类型
a.sinks.k1.type = logger
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
启动
先启动agent
flume-ng agent -n a -f $FLUME_HOME/conf/netcatLogger.conf -Dflume.root.logger=DEBUG,console
新打开一个控制台
安装telnet
yum install telnet
再启动telnet
telnet localhost 8888
创建配置文件
vi $FLUME_HOME/conf/httpToLogger.conf
配置文件
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = http
a.sources.r1.port = 6666
#指定sink的类型
a.sinks.k1.type = logger
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100
# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
先启动agent
flume-ng agent -n a -f $FLUME_HOME/conf/httpToLogger.conf -Dflume.root.logger=DEBUG,console
再使用curl发起一个http请求
新打开一个控制台
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://localhost:6666