前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >(七)Flume搭建、组成与测试

(七)Flume搭建、组成与测试

作者头像
wolf
发布2020-09-23 11:59:11
8320
发布2020-09-23 11:59:11
举报
文章被收录于专栏:大数据分享大数据分享

集群规划:

服务器hadoop002 服务器hadoop003 服务器hadoop004

Flume(采集日志) Flume Flume

安装部署

1)将apache-flume-1.7.0-bin.tar.gz上传到linux的/opt/software目录下

2)解压apache-flume-1.7.0-bin.tar.gz到/opt/module/目录下

tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/

3)修改apache-flume-1.7.0-bin的名称为flume

mv apache-flume-1.7.0-bin flume

4)将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件

mv flume-env.sh.template flume-env.sh

vi flume-env.sh

export JAVA_HOME=/opt/module/jdk1.8.0_144

第4章Flume监控之Ganglia

4.1 Ganglia的安装与部署

1) 安装httpd服务与php

sudo yum -y install httpd php

2) 安装其他依赖

sudo yum -y install rrdtool perl-rrdtool rrdtool-devel

sudo yum -y install apr-devel

3) 安装ganglia

sudo rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm

sudo yum -y install ganglia-gmetad

sudo yum -y install ganglia-web

sudo yum install -y ganglia-gmond

4) 修改配置文件/etc/httpd/conf.d/ganglia.conf

sudo vim /etc/httpd/conf.d/ganglia.conf

修改为红颜色的配置:

# Ganglia monitoring system php web frontend

Alias /ganglia /usr/share/ganglia

<Location /ganglia>

Order deny,allow

Deny from all

Allow from all

# Allow from 127.0.0.1

# Allow from ::1

# Allow from .example.com

</Location>

5) 修改配置文件/etc/ganglia/gmetad.conf

sudo vim /etc/ganglia/gmetad.conf

修改为:

data_source "hadoop002" 192.168.xx.102

6) 修改配置文件/etc/ganglia/gmond.conf

sudo vim /etc/ganglia/gmond.conf

修改为:

cluster {

name = "hadoop002"

owner = "unspecified"

latlong = "unspecified"

url = "unspecified"

}

udp_send_channel {

#bind_hostname = yes # Highly recommended, soon to be default.

# This option tells gmond to use a source address

# that resolves to the machine's hostname. Without

# this, the metrics may appear to come from any

# interface and the DNS names associated with

# those IPs will be used to create the RRDs.

# mcast_join = 239.2.11.71

host = 192.168.xx.102

port = 8649

ttl = 1

}

udp_recv_channel {

# mcast_join = 239.2.11.71

port = 8649

bind = 192.168.xx.102

retry_bind = true

# Size of the UDP buffer. If you are handling lots of metrics you really

# should bump it up to e.g. 10MB or even higher.

# buffer = 10485760

}

7) 修改配置文件/etc/selinux/config

sudo vim /etc/selinux/config

修改为:

# This file controls the state of SELinux on the system.

# SELINUX= can take one of these three values:

# enforcing - SELinux security policy is enforced.

# permissive - SELinux prints warnings instead of enforcing.

# disabled - No SELinux policy is loaded.

SELINUX=disabled

# SELINUXTYPE= can take one of these two values:

# targeted - Targeted processes are protected,

# mls - Multi Level Security protection.

SELINUXTYPE=targeted

尖叫提示:selinux本次生效关闭必须重启,如果此时不想重启,可以临时生效之:

sudo setenforce 0

5) 启动ganglia

sudo service httpd start

sudo service gmetad start

sudo service gmond start

6) 打开网页浏览ganglia页面

http://192.168.xx.102/ganglia

尖叫提示:如果完成以上操作依然出现权限不足错误,请修改/var/lib/ganglia目录的权限:

sudo chmod -R 777 /var/lib/ganglia

Flume组件

1)Source

(1)Taildir Source相比Exec Source、Spooling Directory Source的优势

TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。

Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。

Spooling Directory Source监控目录,不支持断点续传。

(2)batchSize大小如何设置?

答:Event 1K左右时,500-1000合适(默认为100)

2)Channel

采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。

注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

日志采集Flume配置

官网配置案例:

1)Flume配置分析

Flume直接读log日志的数据,log日志的格式是app-yyyy-mm-dd.log。

2)Flume的具体配置如下:

(1)在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件

vim file-flume-kafka.conf

在文件配置如下内容

a1.sources=r1

a1.channels=c1 c2

# configure source

a1.sources.r1.type = TAILDIR

a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /tmp/logs/app.+

a1.sources.r1.fileHeader = true

a1.sources.r1.channels = c1 c2

#interceptor

a1.sources.r1.interceptors = i1 i2

a1.sources.r1.interceptors.i1.type = com.xxxx.flume.interceptor.LogETLInterceptor$Builder

a1.sources.r1.interceptors.i2.type = com.xxxx.flume.interceptor.LogTypeInterceptor$Builder

a1.sources.r1.selector.type = multiplexing

a1.sources.r1.selector.header = topic

a1.sources.r1.selector.mapping.topic_start = c1

a1.sources.r1.selector.mapping.topic_event = c2

# configure channel

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = hadoop002:9092,hadoop003:9092,hadoop004:9092

a1.channels.c1.kafka.topic = topic_start

a1.channels.c1.parseAsFlumeEvent = false

a1.channels.c1.kafka.consumer.group.id = flume-consumer

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c2.kafka.bootstrap.servers = hadoop002:9092,hadoop003:9092,hadoop004:9092

a1.channels.c2.kafka.topic = topic_event

a1.channels.c2.parseAsFlumeEvent = false

a1.channels.c2.kafka.consumer.group.id = flume-consumer

注意:com.xxxx.flume.interceptor.LogETLInterceptor和com.xxxx.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

配置完毕,同步分发,004节点可以不用

官网拦截器配置:

选择器操作:

官网kafkachannel配置:

Java端代码实现ETL拦截器:建立 LogETLInterceptor类

package com.xxxx.flume.interceptor;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;

import java.util.ArrayList;

import java.util.List;

public class LogETLInterceptor implements Interceptor {

// 不要导错包,实现四个方法

@Override

public void initialize() {

// 初始化

}

@Override

public Eventintercept(Event event) {

// 单event

// 清洗数据--ETL,初级岗位(不参与数据分析) { } =》 { xxx 不完整的大括号,就是脏数据

// {"action":"1","ar":"MX","ba":"Huawei","detail":"325"," 非法数据,必须清洗掉

// 1.得到一个字节数组,获取日志

byte[] body = event.getBody();

// 将字节数组转为string类型,导入lang包下,并定义编码类型

String log =new String(body, Charset.forName("UTF-8"));

// 2.区分类型处理

if (log.contains("start")){

// 验证启动日志的逻辑

if(LogUtils.validateStart(log)){

return event;

}

}else{

// 验证事件日志的逻辑

if(LogUtils.validateEvent(log)){

return event;

}

}

return null;

}

@Override

public Listintercept(List events) {

// 多event处理

ArrayList interceptors =new ArrayList<>();

// 判断取出校验合格的数据返回

for (Event event : events) {

Event intercept1 = intercept(event);

// 校验合格的放入list集合

if(intercept1 !=null){

interceptors.add(intercept1);

}

}

return interceptors;

}

@Override

public void close() {

// 关闭资源

}

// 静态内部类,实现builder

public static class Builderimplements Interceptor.Builder{

@Override

public Interceptorbuild() {

// 实例化一个对象,进行调用

return new LogETLInterceptor();

}

@Override

public void configure(Context context) {

}

}

}

建立LogUtils类:

package com.atguigu.flume.interceptor;

import org.apache.commons.lang.math.NumberUtils;

public class LogUtils {

// 验证启动日志

public static boolean validateStart(String log) {

// {"action":"1","ar":"MX","ba":"Huawei","detail":"325","}

if(log ==null){

return false;

}

// 去除空格,如果不是{开头或者不是}结尾,全部返回false

if(!log.trim().startsWith("{") || !log.trim().endsWith("}")){

return false;

}

// 正常的数据,直接返回

return true;

}

// 验证事件日志

public static boolean validateEvent(String log) {

// 1600432697673|{"cm":{"ln":"-49.4","sv":"V2.9.2","os":"8.0.2","g":"UR801VVP@gmail.com",

// 服务器时间 | 日志内容

if(log ==null){

return false;

}

// 切割

String[] logContents = log.split("\\|");

// 防止数组越界

if(logContents.length !=2){

return false;

}

// 校验服务器时间(长度必须是13位 必须全部是数字)

if(logContents[0].length()!=13 || !NumberUtils.isDigits(log)){

return false;

}

// 校验日志格式

if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){

return false;

}

return true;

}

}

类型区分拦截器:建立 LogTypeInterceptor类

package com.xxxx.flume.interceptor;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

public class LogTypeInterceptor implements Interceptor {

@Override

public void initialize() {

}

@Override

public Eventintercept(Event event) {

// 区分类型start event

// body header

// 获取body

byte[] body = event.getBody();

String log =new String(body, Charset.forName("UTF-8"));

// 获取头信息,拿到的是地址--对象

Map headers = event.getHeaders();

// 业务逻辑判断,区分类型

if (log.contains("start")){

// k,v

headers.put("topic","topic_start");

}else {

headers.put("topic","topic_event");

}

return event;

}

@Override

public Listintercept(List events) {

ArrayList interceptors =new ArrayList<>();

for (Event event : events) {

Event intercept1 = intercept(event);

interceptors.add(intercept1);

}

return interceptors;

}

@Override

public void close() {

}

public static class Builderimplements Interceptor.Builder{

@Override

public Interceptorbuild() {

return new LogETLInterceptor();

}

@Override

public void configure(Context context) {

}

}

}

maven打包上传到flume下的lib中:
配置完毕,启动集群

后台启动flume:bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

集群启动、停止脚本

# !/bin/bash

case $1 in

"start"){

for i in hadoop002 hadoop003

do

echo ------------- $i flume启动---------------

ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/test1 2>&1 &"

done

};;

"stop"){

for i in hadoop002 hadoop003

do

echo ------------- $i flume停止---------------

# 使用唯一值过滤,避免杀死重名的线程,后面的$2需要\转义一下

ssh i "ps -ef | grep file-flume-kafka.conf | grep -v grep | awk '{print \

done

};;

esac

演示效果:

说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。

说明2:/dev/null代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。

说明3:awk 默认分割符号为空格

说明4:xargs 表示取出签名命令运行的结果,作为后面命令的输入参数。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Java端代码实现ETL拦截器:建立 LogETLInterceptor类
  • 建立LogUtils类:
  • 类型区分拦截器:建立 LogTypeInterceptor类
    • maven打包上传到flume下的lib中:
      • 配置完毕,启动集群
        • 集群启动、停止脚本
          • 演示效果:
          相关产品与服务
          数据保险箱
          数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档