前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flume 高级 —— source 自定义

Flume 高级 —— source 自定义

作者头像
solve
发布2019-10-30 13:21:02
8150
发布2019-10-30 13:21:02
举报
文章被收录于专栏:大数据技术栈大数据技术栈

前言

前面我们已经说过了flume的简单入门,这篇文章继续深入,来熟悉下source,并通过自定义 source 来了解其工作原理,接下来的一系列文章都会以flume的各个小组件慢慢深入,欢迎和我一起学习

source 到 channel 过程

source-to-channel.png

  1. 上图大致描述了 source 收集到数据推送到 channel 的基本过程,可以发现中间多出了一个 channel processor 的组件
  2. source 收集到的数据会经过拦截器链进行过滤,然后通过channel selector 发送到对应的 channel,从中我们可以想到,如果你要对数据进行一些特别的处理,可以自定义拦截器来进行数据清洗,当然不建议太过复杂的处理,否则你的 flume 将会和蜗牛一样慢,你还可以通过 selector 来控制数据储存的 channel。

source 是如何产生数据的

source 分为两大类:PollableSource 和 EventDrivenSource,不过笔者倒是没怎么弄清楚,这两大类区分的目的何在?如果你有什么想法,欢迎留言指教。

  • PollableSource
代码语言:javascript
复制
public interface PollableSource extends Source {

  public Status process() throws EventDeliveryException;
  public static enum Status {READY, BACKOFF}
}
  1. 当一个agent 启动之后,就会不断循环调用 process 以获取数据
  2. process 返回 READY,表示数据产生正常,如果是 BACKOFF 则表示异常,当产生异常时候,agent 会等待一段时间再来调用 process,异常次数越多,间隔时间越长,最长不超过 5s。
  3. 自带一个线程,工作都是在自己的独立线程之内的
  • EventDrivenSource
代码语言:javascript
复制
public interface EventDrivenSource extends Source
  1. 简单的一个标记接口,区分 PollableSource
  • 运行流程
  1. 当一个agent启动时候,会开始执行 application 的 main() 方法
  2. 进程启动之后,会通过 AbstractConfigurationProvider$getConfiguration解析配置文件中的各个组件和属性
  3. 针对 source 会生成 sourceRunner 通过 supervisor 来运行和管理其生命周期。
  4. source 的生命周期 start 方法正式开始执行,这样也就到了我们将要自定义代码的实现执行了。

这里只是大概说了一下流程,具体详情还是需要自己看源码的,我们的目的就是梳理一下整个流程,知道自己一个大概就好了,深究反而落的下乘,同时也是为了接下来自定义 source 打个基础,知道我们自己写的东西是怎么运行的。

自定义source

  1. 创建一个类,继承自 AbstractSource 并实现 Configurable 和( EventDrivenSource 或者PollableSource )
  2. 实现相关方法,以下是简单的一个生成序列的source
代码语言:javascript
复制
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package com.inveno.flume;

import java.util.ArrayList;
import java.util.List;

import com.google.common.collect.ImmutableMap;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SequenceSource extends AbstractSource implements Configurable ,EventDrivenSource {

    private static final Logger logger = LoggerFactory
            .getLogger(SequenceSource.class);

    private long seq;
    private int batchSize = 10;
    private List<Event> batchArrayList = new ArrayList<>();


    @Override
    public void configure(Context context) {
        //自定义配置属性
        batchSize = context.getInteger("batchSize", 1);
        //打印自定义属性
        ImmutableMap<String, String> map = context.getParameters();
        for (String s : map.keySet()) {
            logger.warn(s + "==============configure=============================" + map.get(s));
        }
    }

    private void process(){
        try {
            batchArrayList.add(EventBuilder.withBody(String.valueOf(seq++).getBytes()));
            if(batchArrayList.size()>=batchSize){
                getChannelProcessor().processEventBatch(batchArrayList);
                batchArrayList.clear();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    @Override
    public void start() {
        super.start();
        //开启一个线程来生产数据,当然你也可以整个线程池
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (!Thread.currentThread().isInterrupted()){
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        //这里有个java知识点 ,InterruptedException捕获后,
                        // 这个标记点会被重置 ,需要再次 interrupt才能正确退出
                        Thread.currentThread().interrupt();
                    }
                    process();
                }
            }
        }).start();
        logger.debug("==========================start");
    }

    @Override
    public void stop() {
        super.stop();
        logger.info("==========================stop", getName());
    }

}
  1. 我们在 configure方法里面获取配置属性-batchsize。
  2. 我们上面说过,source 最后开始会被调用start 方法,我们在start 方法里面开启一个线程,实现循环产生消息,并隔 batchsize 个消息就推送到 channel 里面。
  3. 这样一个简单的生产 source 就完成了
  4. 如果想实现 PollableSource 类型的 source ,只是不需要自己开启线程罢了,其余都差不多,就是这么简单。

上面我们自定义了一个 source,事件是交给 flume 自带的 ChannelProcessor 自己处理的,下一节,我们来说说 ChannelProcessor 相关细节

写在忘记后

搞了半天忘记写部署自定义代码了。。。抱歉!!!

  1. 首先将代码打 jar 包
  2. 放到 FLUME_HOME 目录的 lib 文件夹下
  3. 以下是配置文件
  • example.conf
代码语言:javascript
复制
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.inveno.flume.SequenceSource 
a1.sources.r1.batchSize = 5

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动脚本:bin/flume-ng agent --conf conf --conf-file ./my-conf/example.conf --name a1 -Dflume.root.logger=INFO,console
  2. 大功告成。。。
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.09.30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • source 到 channel 过程
  • source 是如何产生数据的
  • 自定义source
  • 写在忘记后
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档