前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flume拦截器实现按照事件时间接入HDFS

Flume拦截器实现按照事件时间接入HDFS

作者头像
Eights
发布2020-09-10 15:14:51
1.3K0
发布2020-09-10 15:14:51
举报
文章被收录于专栏:Eights做数据Eights做数据

本篇文章大概6457字,阅读时间大约17分钟

Flume作为一个数据接入组件,广泛应用于Hadoop生态中。在业务时间混乱的情况下,按照机器数据在HDFS上分区会降低ETL的效率。采用Flume自定义拦截器可以实现按照事件时间Sink到HDFS目录,以应对数据的事件时间混乱问题

1

文档编写目的

  • Flume自定义拦截器的开发和测试,应对日志事件时间混乱问题

集群环境

  • CDH5.16.2

2

组件介绍

Flume是一个分布式、高可靠、高可用的海量日志采集、聚合、传输系统

  • Agent是一个JVM进程,控制Event从source到sink。
  • Source数据源,负责数据接收
  • Channel位于Source和Sink之间的buffer。Channel是线程安全的,可以同步处理多个source的写操作和多个sink的读操作
    • Memory Channel基于内存,效率高,但在agent挂掉,重启等可能会有数据丢失
    • File Channel基于磁盘,效率较低,不会丢数据
  • Sink不断轮询Channel的事件且批量拉取,并将这些Event写入外部系统。Sink具有事务,在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出,sink就会进行事务提交。事务提交后,Channel从buffer中移除这批Event
  • Event是Flume定义的一个数据流传输的最小单位
Flume拦截器
  • Flume支持使用拦截器在运行时对event进行修改或丢弃
  • Flume支持链式的拦截器执行方式,在配置文件里面配置多个拦截器,拦截器的执行顺序取决于它们配置的顺序,Event按照顺序经过每一个拦截器

3

Flume自定义拦截器实战

业务场景

在物联网的场景中,存在网络信号不佳,这时设备不会把数据传输到云平台上,而是放置在本地存储中,等待下一个开机,网络信号良好的情况下,将数据上传,造成了事件时间和平台接收时间存在跨天的情况,甚至由于设备本地时钟混乱,获取不到正确的事件时间,产生无效数据。

设备的数据上传后会进入kafka中,采用Flume拉取kafka的数据sink到HDFS接入Hive外部表进行离线分析,这里就需要使用Flume自定义拦截器按照事件时间将kafka中的数据sink到按天分区的不同的HDFS目录

实战

这里使用样例数据代替真实数据,样例数据如下:

代码语言:javascript
复制
2020-08-20 11:56:02.557 [main] INFO com.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1595312507640},"attr":{"area":"石嘴山","uid":"2F10092A99995","app_v":"1.1.4","event_type":"common","device_id":"1FB872-9A10099995","os_type":"0.87","channel":"XO","language":"chinese","brand":"Huawei-0"}}
2020-08-20 11:56:02.557 [main] INFO com.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"0","error_code":"0"},"time":1595312539940},"attr":{"area":"九江","uid":"2F10092A99996","app_v":"1.1.5","event_type":"common","device_id":"1FB872-9A10099996","os_type":"9.0","channel":"PU","language":"chinese","brand":"xiaomi-9"}}

自定义Flume拦截器主要就是需要实现flume的Interceptor接口,核心方法是重写intercept方法

代码语言:javascript
复制
public interface Interceptor {
  /**
   * Any initialization / startup needed by the Interceptor.
   */
  public void initialize();

  /**
   * Interception of a single {@link Event}.
   * @param event Event to be intercepted
   * @return Original or modified event, or {@code null} if the Event
   * is to be dropped (i.e. filtered out).
   */
  public Event intercept(Event event);

  /**
   * Interception of a batch of {@linkplain Event events}.
   * @param events Input list of events
   * @return Output list of events. The size of output list MUST NOT BE GREATER
   * than the size of the input list (i.e. transformation and removal ONLY).
   * Also, this method MUST NOT return {@code null}. If all events are dropped,
   * then an empty List is returned.
   */
  public List<Event> intercept(List<Event> events);

  /**
   * Perform any closing / shutdown needed by the Interceptor.
   */
  public void close();

  /** Builder implementations MUST have a no-arg constructor */
  public interface Builder extends Configurable {
    public Interceptor build();
  }
}

根据事件时间分区的原理就是,将设备中的事件时间解析出来,作为一个属性put到event的header中,然后在Flume的HDFS Sink配置中指定header中put的属性,代码实现如下:

代码语言:javascript
复制
/**
 * 物联网的部分数据会保存在边缘设备上,直到下次开机进行上传,因此在用flume进行数据搜集的时候会存在补发的问题
 * 落分区应该按照事件时间而不是flume主机的时间
 * 事件时间拦截器则是为了应对以上场景
 * @author Eights
 */
public class EventTimeInterceptor implements Interceptor {

    private static FastDateFormat dateFormat = FastDateFormat.getInstance("yyyy-MM-dd");

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        //获取header
        Map<String, String> headers = event.getHeaders();

        //获取body
        String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);

        String[] bodyArr = eventBody.split("\\s+");

        try {
            String jsonStr = bodyArr[6];

            //数据为空,返回null
            if (Strings.isNullOrEmpty(jsonStr)) {
                return null;
            }

            long ts = Long.parseLong(JSON.parseObject(jsonStr).getJSONObject("app_active").getString("time"));
            //打上事件日期
            String eventDate = dateFormat.format(ts);
            //header中添加event date
            headers.put("eventDate", eventDate);
            event.setHeaders(headers);
        } catch (Exception e) {
            //脏数据,需要sink到一个目录进行核查
            headers.put("eventDate", "unknow");
            event.setHeaders(headers);
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {

        return list.stream().map(this::intercept)
                .filter(Objects::nonNull)
                .collect(Collectors.toList());

    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new EventTimeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
代码语言:javascript
复制
# pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.eights</groupId>
    <artifactId>flume-ng-interceptors</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <compiler.version>1.8</compiler.version>
        <flume.version>1.9.0</flume.version>
        <fastjson.version>1.2.73</fastjson.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>${flume.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>${compiler.version}</source>
                    <target>${compiler.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  • 代码开发完成后,打包放在flume的lib目录下
  • CDH集群放在/opt/cloudera/parcels/CDH/lib/flume-ng/lib,注意每个agent节点都需要配置

4

功能测试

  • 将机器上的日志,通过flume sink到hdfs目录上,观察是否根据事件时间生成目录,Flume配置如下
代码语言:javascript
复制
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile =/u01/sample_data/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /u01/sample_data/middlelog/.*log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.eights.EventTimeInterceptor$Builder

# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000

# sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =/ext-data/start-log/dt=%{eventDate}/
a1.sinks.k1.hdfs.filePrefix = startlog
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.batchSize = 1000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 启动flume agent,发现hdfs sink目录按照事件时间正确创建
  • 检查HDFS目录,flume自定义拦截器按照事件时间接入HDFS完成

5

总结

在未使用Flume拦截器的时候,会在数仓层面对昨天入库的数据,先按照事件时间进行重分区在做ETL,采用自定义拦截器的方式,可以直接将事件时间分区操作提前,提升数仓ETL的效率。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Eights做数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 集群环境
    • Flume拦截器
      • 业务场景
        • 实战
        相关产品与服务
        专用宿主机
        专用宿主机(CVM Dedicated Host,CDH)提供用户独享的物理服务器资源,满足您资源独享、资源物理隔离、安全、合规需求。专用宿主机搭载了腾讯云虚拟化系统,购买之后,您可在其上灵活创建、管理多个自定义规格的云服务器实例,自主规划物理资源的使用。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档