前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hadoop数据分析平台实战——150Flume介绍离线数据分析平台实战——150Flume介绍

Hadoop数据分析平台实战——150Flume介绍离线数据分析平台实战——150Flume介绍

作者头像
Albert陈凯
发布2018-04-08 11:28:30
5300
发布2018-04-08 11:28:30
举报
文章被收录于专栏:Albert陈凯Albert陈凯

离线数据分析平台实战——150Flume介绍

Nginx介绍

Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件(IMAP/POP3)代理服务器。 其特点是占有内存少,并发能力强,事实上nginx的并发能力确实在同类型的网页服务器中表现较好。 一般情况下,我们会将nginx服务器作为一个静态资源的访问容器。

Nginx安装步骤

Nginx安装步骤如下:(使用yum命令安装)

  1. 使用root用户登录。
  2. 查看nginx信息,命令:yum info nginx.
  3. 如果查看nginx信息提示nginx找不到,那么可以通过修改rpm源来进行后续步骤,执行命令:rpm -ivh http://nginx.org/packages/centos/6/noarch/RPMS/nginx-release-centos-6-0.el6.ngx.noarch.rpm
  4. 在查看nginx信息。
  5. 安装,命令: yum install nginx。在安装过程中直接输入y。
  6. 启动nginx,命令:service nginx start
  7. 访问http://192.168.0.120查看nginx的web页面。

image.png

image.png

image.png

Flume介绍

Flume是Apache基金会组织的一个提供的高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统, Flume支持在日志系统中定制各类数据发送方,用于收集数据; 同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

当前Flume有两个版本, Flume0.9x版本之前的统称为Flume-og, Flume1.X版本被统称为Flume-ng。

参考文档:http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6/FlumeUserGuide.html

Flume-og和Flume-ng的区别

主要区别如下:

  1. Flume-og中采用master结构,为了保证数据的一致性,引入zookeeper进行管理。Flume-ng中取消了集中master机制和zookeeper管理机制,变成了一个纯粹的传输工具。
  2. Flume-ng中采用不同的线程进行数据的读写操作;在Flume-og中,读数据和写数据是由同一个线程操作的,如果写出比较慢的话,可能会阻塞flume的接收数据的能力。

Flume结构

Flume中以Agent为基本单位,一个agent可以包括source、channel、sink,三种组件都可以有多个。 其中source组件主要功能是接收外部数据,并将数据传递到channel中; sink组件主要功能是发送flume接收到的数据到目的地; channel的主要作用就是数据传输和保存的一个作用。 Flume主要分为三类结构:单agent结构、多agent链式结构和多路复用agent结构。

单agent结构

image.png

多agent链式结构

image.png

多路复用agent结构

image.png

Source介绍

Source的主要作用是接收客户端发送的数据,并将数据发送到channel中,source和channel之间的关系是多对多关系,不过一般情况下使用一个source对应多个channel。 通过名称区分不同的source。 Flume常用source有:Avro Source、Thrift Source、Exec Source、Kafka Source、Netcat Source等。 设置格式如下:

代码语言:javascript
复制
<agent-name>.sources.<source_name>.type=指定类型
<agent-name>.sources.<source_name>.channels=channels
.... 其他对应source类型需要的参数

Channel介绍

Channel的主要作用是提供一个数据传输通道,提供数据传输和数据存储(可选)等功能。 source将数据放到channel中,sink从channel中拿数据。 通过不同的名称来区分channel。 Flume常用channel有:Memory Channel、JDBC Channel、Kafka Channel、File Channel等。设置格式如下:

代码语言:javascript
复制
<agent-name>.channels.<channel_name>.type=指定类型
.... 其他对应channel类型需要的参数

Sink介绍

Sink的主要作用是定义数据写出方式,一般情况下sink从channel中获取数据,然后将数据写出到file、hdfs或者网络上。 channel和sink之间的关系是一对多的关系。 通过不同的名称来区分sink。 Flume常用sink有:Hdfs Sink、Hive Sink、File Sink、HBase Sink、Avro Sink、Thrift Sink、Logger Sink等。 设置格式如下:

代码语言:javascript
复制
<agent-name>.sinks.<sink_name1>.type=指定类型
<agent-name>.sinks.<sink_name1>.channel=<channe_name>
.... 其他对应sink类型需要的参数

Flume安装

安装步骤如下:

  1. 下载flume:wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6.tar.gz
  2. 解压flume。
  3. 修改conf/flume-env.sh文件,如果没有就新建一个。
  4. 添加flume的bin目录到环境变量中去。
  5. 验证是否安装成功, flume-ng version

Flume案例1

使用netcat source监听客户端的请求,使用memory channel作为数据的传输通道,使用logger sink打印监听到的信息。 步骤:

  1. 在conf文件夹中建立test1.conf,里面是agent的配置。
  2. 启动flume-ng agent --conf ./conf/ --conf-file ./conf/test.conf --name a1 -Dflume.root.logger=INFO,console。
  3. 使用telenet命令连接机器,命令:telnet 192.168.100.120 4444
  4. 输入信息查看是否成功。

test1.conf

代码语言:javascript
复制
a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=4444
a1.sources.r1.channels=c1

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1

Flume案例2

Nginx作为日志服务器,通过exec source监听nginx的日志文件,使用memory channel作为数据传输通道,使用hdfs sink将数据存储到hdfs上。

项目用到的配置文件

nginx.conf
代码语言:javascript
复制
user  nginx;
worker_processes  1;
error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;

events {
    worker_connections  1024;
}

http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';


    log_format  log_format   '$remote_addr^A$msec^A$http_host^A$request_uri';

    sendfile        on;
    keepalive_timeout  65;
    #include /etc/nginx/conf.d/*.conf;

server {
    listen       80;
    server_name  hh 0.0.0.0;

    location ~ .*(BfImg)\.(gif)$ {
      default_type image/gif;
      access_log /home/hadoop/access.log log_format;
      root /etc/nginx/www/source;  
   }
}
}
test2.conf
代码语言:javascript
复制
agent.sources = r1
agent.sinks = k1
agent.channels = c1

## common
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

## sources config
agent.sources.r1.type  = exec
agent.sources.r1.command = tail -F /home/hadoop/access.log

## channels config
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 1000
agent.channels.c1.byteCapacityBufferPercentage = 20
agent.channels.c1.byteCapacity = 1000000
agent.channels.c1.keep-alive = 60

#sinks config
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.path = hdfs://hh:8020/logs/%m/%d
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = BF-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.minBlockReplicas=1
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollSize=132692539
agent.sinks.k1.hdfs.idleTimeout=10
agent.sinks.k1.hdfs.batchSize = 1
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.round = true
agent.sinks.k1.hdfs.roundValue = 2
agent.sinks.k1.hdfs.roundUnit = minute
agent.sinks.k1.hdfs.useLocalTimeStamp = true

复杂的配置方法

代码语言:javascript
复制
agent.sources = r1
agent.sinks = k1 k2
agent.channels = c1

## common
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
agent.sinks.k2.channel = c1

## sources config
agent.sources.r1.type  = exec
agent.sources.r1.command = tail -F /home/hadoop/access.log
agent.sources.r1.selector.type= replicating

## channels config
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000000
agent.channels.c1.transactionCapacity = 10000000
agent.channels.c1.byteCapacityBufferPercentage = 60
agent.channels.c1.byteCapacity = 12800000000
agent.channels.c1.keep-alive = 60

#sinks config
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.path = hdfs://server30-244:8020/logs/%Y/%m/%d
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = BF-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.minBlockReplicas=1
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollSize=132692539
agent.sinks.k1.hdfs.idleTimeout=600
agent.sinks.k1.hdfs.batchSize = 100
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.round = true
agent.sinks.k1.hdfs.roundValue = 2
agent.sinks.k1.hdfs.roundUnit = minute
agent.sinks.k1.hdfs.useLocalTimeStamp = true

agent.sinks.k2.type = hdfs
agent.sinks.k2.channel = c1
agent.sinks.k2.hdfs.path = hdfs://server30-99:8020/agentLog/%Y/%m/%d
agent.sinks.k2.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = ub-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k2.hdfs.minBlockReplicas=1
agent.sinks.k2.hdfs.rollInterval=3600
agent.sinks.k2.hdfs.rollSize=132692539
agent.sinks.k2.hdfs.idleTimeout=600
agent.sinks.k2.hdfs.batchSize = 100
agent.sinks.k2.hdfs.rollCount=0
agent.sinks.k2.hdfs.round = true
agent.sinks.k2.hdfs.roundValue = 2
agent.sinks.k2.hdfs.roundUnit = minute
agent.sinks.k2.hdfs.useLocalTimeStamp = true


#####sink groups
agent.sinkgroups=g1
agent.sinkgroups.g1.sinks=k1 k2
agent.sinkgroups.g1.processor.type= failover
agent.sinkgroups.g1.processor.priority.k1=10
agent.sinkgroups.g1.processor.priority.k2=5
agent.sinkgroups.g1.processor.maxpenalty=10000
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.08.31 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 离线数据分析平台实战——150Flume介绍
    • Nginx介绍
      • Nginx安装步骤
        • Flume介绍
          • Flume-og和Flume-ng的区别
            • Flume结构
              • 单agent结构
                • 多agent链式结构
                  • 多路复用agent结构
                    • Source介绍
                      • Channel介绍
                        • Sink介绍
                          • Flume安装
                            • Flume案例1
                              • test1.conf
                                • Flume案例2
                              • 项目用到的配置文件
                                • 复杂的配置方法
                                相关产品与服务
                                数据保险箱
                                数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
                                领券
                                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档