前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据- Flume经典案例

大数据- Flume经典案例

作者头像
cwl_java
发布2019-12-30 17:53:05
8470
发布2019-12-30 17:53:05
举报
文章被收录于专栏:cwl_Javacwl_Java

5. Flume 案例一

1. 案例场景

A、B两台日志服务机器实时生产日志主要类型为access.log、nginx.log、web.log 现在要求:

把A、B 机器中的access.log、nginx.log、web.log 采集汇总到C机器上然后统一收集到hdfs中。 但是在hdfs中要求的目录为:

代码语言:javascript
复制
/source/logs/access/20180101/** 
/source/logs/nginx/20180101/** 
/source/logs/web/20180101/**

2. 场景分析

在这里插入图片描述
在这里插入图片描述

3. 数据流程处理分析

在这里插入图片描述
在这里插入图片描述

4、实现

服务器A对应的IP为 192.168.174.100 服务器B对应的IP为 192.168.174.110 服务器C对应的IP为 192.168.174.120

采集端配置文件开发 node01与node02服务器开发flume的配置文件

代码语言:javascript
复制
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf 
vim exec_source_avro_sink.conf
代码语言:javascript
复制
# Name the components on this agent 
<p class="mume-header " id="name-the-components-on-this-agent-2"></p> 
a1.sources = r1 r2 r3 
a1.sinks = k1 a1.channels = c1 
# Describe/configure the source 
<p class="mume-header " id="describeconfigure-the-source-2"></p> 
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -F /export/servers/taillogs/access.log 
a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = static 
## static拦截器的功能就是往采集到的数据的header中插入自己定
## 义的key-value对 
<p class="mume-header " id="static拦截器的功能就是往采集到的数据的header中插入自己定-义的key-va a1.sources.r1.interceptors.i1.key = type 
a1.sources.r1.interceptors.i1.value = access 
a1.sources.r2.type = exec 
a1.sources.r2.command = tail -F /export/servers/taillogs/nginx.log 
a1.sources.r2.interceptors = i2 
a1.sources.r2.interceptors.i2.type = static 
a1.sources.r2.interceptors.i2.key = type 
a1.sources.r2.interceptors.i2.value = nginx 
a1.sources.r3.type = exec 
a1.sources.r3.command = tail -F /export/servers/taillogs/web.log 
a1.sources.r3.interceptors = i3 
a1.sources.r3.interceptors.i3.type = static 
a1.sources.r3.interceptors.i3.key = type 
a1.sources.r3.interceptors.i3.value = web # Describe the sink 
<p class="mume-header " id="describe-the-sink-2"></p> 
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = node03 
a1.sinks.k1.port = 41414 # Use a channel which buffers events in memory 
<p class="mume-header " id="use-a-channel-which-buffers-events-in-memory-2"></p> 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 20000 
a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel 
<p class="mume-header " id="bind-the-source-and-sink-to-the-channel-2"></p> 
a1.sources.r1.channels = c1 
a1.sources.r2.channels = c1 
a1.sources.r3.channels = c1 
a1.sinks.k1.channel = c1

服务端配置文件开发 在node03上面开发flume配置文件

代码语言:javascript
复制
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf 
vim avro_source_hdfs_sink.conf
代码语言:javascript
复制
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 
# 定义source 
<p class="mume-header " id="定义source"></p> 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 192.168.174.120 
a1.sources.r1.port =41414 
# 添加时间拦截器 
<p class="mume-header " id="添加时间拦截器"></p> 
a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$ 
# 定义channels 
<p class="mume-header " id="定义channels"></p> 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 20000 
a1.channels.c1.transactionCapacity = 10000 
# 定义sink 
<p class="mume-header " id="定义sink"></p> 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path=hdfs://192.168.174.100:8020/source/logs/%{type}/%Y%m%d 
a1.sinks.k1.hdfs.filePrefix =events 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
# 时间类型 
<p class="mume-header " id="时间类型"></p> 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
# 生成的文件不按条数生成 
<p class="mume-header " id="生成的文件不按条数生成"></p> 
a1.sinks.k1.hdfs.rollCount = 0 
# 生成的文件按时间生成 
<p class="mume-header " id="生成的文件按时间生成"></p> 
a1.sinks.k1.hdfs.rollInterval = 30 
# 生成的文件按大小生成 
<p class="mume-header " id="生成的文件按大小生成"></p> 
a1.sinks.k1.hdfs.rollSize = 10485760 
# 批量写入hdfs的个数 
<p class="mume-header " id="批量写入hdfs的个数"></p> 
a1.sinks.k1.hdfs.batchSize = 10000 
# flume操作hdfs的线程数(包括新建,写入等) 
<p class="mume-header " id="flume操作hdfs的线程数包括新建写入等"></p> 
a1.sinks.k1.hdfs.threadsPoolSize=10 
# 操作hdfs超时时间
<p class="mume-header " id="操作hdfs超时时间"></p> 
a1.sinks.k1.hdfs.callTimeout=30000 
# 组装source、channel、sink 
<p class="mume-header " id="组装source-channel-sink"></p> 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

采集端文件生成脚本 在node01与node02上面开发shell脚本,模拟数据生成

代码语言:javascript
复制
cd /export/servers/shells 
vim server.sh # !/bin/bash 

<p class="mume-header " id="binbash"></p> 
while true 
	dodate >> /export/servers/taillogs/access.log; 
	date >> /export/servers/taillogs/web.log; 
	date >> /export/servers/taillogs/nginx.log; s
	leep 0.5; 
done

顺序启动服务 node03启动flume实现数据收集

代码语言:javascript
复制
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin bin/flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1

node01与node02启动flume实现数据监控

代码语言:javascript
复制
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin bin/flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1

node01与node02启动生成文件脚本

代码语言:javascript
复制
cd /export/servers/shells 
sh server.sh

5、项目实现截图

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 5. Flume 案例一
    • 1. 案例场景
      • 2. 场景分析
        • 3. 数据流程处理分析
          • 4、实现
            • 5、项目实现截图
            相关产品与服务
            大数据
            全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档