前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flume采集App端埋点行为数据至Hdfs

Flume采集App端埋点行为数据至Hdfs

原创
作者头像
码农GT038527
发布2024-08-07 12:04:11
1540
发布2024-08-07 12:04:11
举报
文章被收录于专栏:脚本

采集背景

此文章来自尚硅谷电商数仓6.0

我们在采集日志服务器的日志数据时,先将数据通过Flumel中转到Kafka中(方便后续实时处理),再通过Flume将数据采集至Hdfs。再将数据从Kafka采集到hdfs中。此时会出现零点漂移问题。(第一天接近24点的数据从Kafka流过被flume采集时header里面的时间戳时间【记录的是当前时间不是业务时间】会因延迟导致变成第二天的时间)而我们在HDFSSink的时间路径又是来自于header的时间戳,因此我们构造一个拦截器来处理这种情况。从而将数据准确采集到Hdfs中的日期目录。

Flume采集器1

file_to_kafka.conf

此采集器将日志服务器的埋点行为数据采集至kafka中

由于KafkaChannel可以将数据直接采集到Kafka中,所以我们不再使用sink来处理

代码语言:shell
复制
vim file_to_kafka.conf

#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1

采集器1启动脚本

代码语言:shell
复制
# 创建脚本
vim f1.sh

#!/bin/bash

echo " --------启动 hadoop102 采集flume-------"
nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &

# 增加权限
chmod 777 ./f1.sh

Flume采集器2

kafka_to_hdfs_log.conf

此采集器将kafka数据采集至Hdfs中,我们增加一个拦截器来确保数据的准确性

代码语言:shell
复制
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
# 拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.example.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

采集器2启动脚本

代码语言:shell
复制
# 创建脚本
vim f2.sh

#!/bin/bash

echo " --------启动 hadoop102 日志数据flume-------"
nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &

# 增加权限
chmod 777 ./f2.sh

Flume拦截器

日志数据的数据格式如下:

代码语言:json
复制
{
  "common": {
    "ar": "12",
    "ba": "realme",
    "ch": "wandoujia",
    "is_new": "1",
    "md": "realme Neo2",
    "mid": "mid_411",
    "os": "Android 13.0",
    "sid": "4f34596c-ca8f-434c-a8d5-356b944eb0d6",
    "vc": "v2.1.134"
  },
  "start": {
    "entry": "icon",
    "loading_time": 12974,
    "open_ad_id": 16,
    "open_ad_ms": 5415,
    "open_ad_skip_ms": 0
  },
  "ts": 1654620592548
}

pom文件

若maven加载不了,可在项目根目录下强制更新缓存中的依赖项 :mvn clean install -U

代码语言:xml
复制
<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.10.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

TimestampInterceptor

采集器原理:

由于零点漂移问题,我们设置一个拦截器,对每个Event进行拦截,此时封装的数据来自kafka,Kafka的数据来自日志服务器,我们需要的数据是body的ts,用于Flume采集器的路径配置。(/%Y-%m-%d) 所以我们要取到这个数据进行处理,然后加载到header中。

代码语言:java
复制
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

import java.util.List;
import java.util.Map;

public class TimestampInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
    //1、获取header和body的数据
    Map<String, String> headers = event.getHeaders();
    String log = new String(event.getBody(), StandardCharsets.UTF_8);

    try {
        //2、将body的数据类型转成jsonObject类型(方便获取数据)
        JSONObject jsonObject = JSONObject.parseObject(log);

        //3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)
        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);

        return event;
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}

@Override
public List<Event> intercept(List<Event> list) {
    Iterator<Event> iterator = list.iterator();
    while (iterator.hasNext()) {
        Event event = iterator.next();
        if (intercept(event) == null) {
            iterator.remove();
        }
    }
    return list;
}

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

启动采集通道

代码语言:shell
复制
# 启动flume采集器
f1.sh
f2.sh

# 启动日志服务器
java -jar /opt/module/applog/gmall-remake-mock-2023-05-15-3.jar

检查结果

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 采集背景
  • Flume采集器1
  • 采集器1启动脚本
  • Flume采集器2
  • 采集器2启动脚本
  • Flume拦截器
  • 启动采集通道
  • 检查结果
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档