flume ExecSource使用与源码阅读笔记

ExecSource是很多人接触flume时第一个使用的source,这里我简单的分析下这个source的使用与实现。

一、FlumeUserGuide

FlumeUserGuide:https://flume.apache.org/FlumeUserGuide.html

这里我简单翻译和总结下:

Exec source在启动时运行Unix命令,并且期望它会不断的在标准输出中产生数据。 (stderr默认会被丢弃).如果进程因为某些原因退出,Exce Source也将退出并且不会再产生数据。

详细配置说明如下: 加粗的是必配项:

注意:ExecSource无法感知数据的丢失,比如channel满的时候数据发送失败。为了更健壮的数据可靠性,推荐:Spooling Directory Source、Taildir Source或者通过flume的SDK直接实现。

如此一个简单的配置,我们就能实时地将/var/log/secure下的日志发送到flume的channel里面。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

简单说下shell和command,shell主要是区分Bash 或 Powershell。command作为一个参数传递给shell执行,这样command就可以使用shell的一些特性,如wildcards, back ticks, pipes, loops, conditionals,如果shell没有指定,一般会使用默认值‘/bin/sh -c’, ‘/bin/ksh -c’, ‘cmd /c’, ‘powershell -Command’。

二、核心流程源码分析

public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable{
    xxx;
}

主要函数/类有:

  • start()
  • stop()
  • configure(Context context)
  • static class ExecRunnable implements Runnable
  • static class StderrReader extends Thread

1. start():

代码比较少,功能比较简单:

  1. 线程池
  2. 构建ExecRunnable线程对象,传入配置文件的参数
  3. 启动计数器

2. stop():

功能同样比较简单

  1. kill runner
  2. shutdown executor
  3. stop sourceCounter

3. configure(Context context)

获取excel里面的那些参数

4. ExecRunnable 主要实现类,核心+关键

static class ExecRunnable implements Runnable

核心部分是一个

do{
   xxx;
}while(restart);

所以stop时需要将restart置为false,防止do重做。

do内容主要包括:

1) shell与command处理,然后执行:

if (shell != null) {
  String[] commandArgs = formulateShellCommand(shell, command);
 process = Runtime.getRuntime().exec(commandArgs);
}  else {
  String[] commandArgs = command.split("\\s+");
 process = new ProcessBuilder(commandArgs).start();
}

2)启动reader

reader = new BufferedReader(
 new InputStreamReader(process.getInputStream(), charset));

3)启动stderrReader

StderrReader stderrReader = new StderrReader(new BufferedReader(
 new InputStreamReader(process.getErrorStream(), charset)), logStderr);
xxxx;

4)定时任务,每batchTimeout时间批量将eventList flush

future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
 @Override
 public void run() {
 try {
 synchronized (eventList) {
 if (!eventList.isEmpty() && timeout()) {
            flushEventBatch(eventList);
          }
        }
      } catch (Exception e) {
 logger.error("Exception occurred when processing event batch", e);
 if (e instanceof InterruptedException) {
          Thread.currentThread().interrupt();
        }
      }
    }
},
batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);

5)读取和发送数据

while ((line = reader.readLine()) != null) {
 sourceCounter.incrementEventReceivedCount();
 synchronized (eventList) {
    eventList.add(EventBuilder.withBody(line.getBytes(charset)));
 if (eventList.size() >= bufferCount || timeout()) {
      flushEventBatch(eventList);
    }
  }
}
synchronized (eventList) {
 if (!eventList.isEmpty()) {
    flushEventBatch(eventList);
  }
}

这个通用的小技巧,几乎所有的批处理代码里面都有这类写法:

1. 没到最后一行,达到bufferCount或者到timeout一批处理列表里面的数据

2. 读完最后一行(字节流没数据了),执行批处理

5. StderrReader处理stderr,这里不再赘述

static class StderrReader extends Thread

三、其他代码片段

1.批量flush Event

private void flushEventBatch(List<Event> eventList) {
 channelProcessor.processEventBatch(eventList);
 sourceCounter.addToEventAcceptedCount(eventList.size());
  eventList.clear();
 lastPushToChannel = systemClock.currentTimeMillis();
}

2.将shell和Command结合在一起,组成一个数组

private static String[] formulateShellCommand(String shell, String command) {
  String[] shellArgs = shell.split("\\s+");
  String[] result = new String[shellArgs.length + 1];
  System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
  result[shellArgs.length] = command;
 return result;
}

3.如果是中断导致的,则需要中断线程

catch (Exception e) {
 logger.error("Failed while running command: " + command, e);
 if (e instanceof InterruptedException) {
    Thread.currentThread().interrupt();
  }
} 

4.超时时间判断

private boolean timeout() {
 return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
}

四、阅读收获

  • 学习开发一个EventDrivenSource
  • 部分代码适用于命令执行模块
  • ExecSource确实无法感知数据丢失
  • 了解ExecSource的使用

“了解ExecSource的使用” 放在最后是因为ExecSource确实不太适合用在生产环境

参考文章:https://blog.csdn.net/qianshangding0708/article/details/49736019

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏李蔚蓬的专栏

跨程序共享数据——Content Provider 之 创建自己的内容提供器

本模块共有四篇文章,参考郭神的《第一行代码》,对Content Provider的学习做一个详细的笔记,大家可以一起交流一下:

1265
来自专栏芋道源码1024

注册中心 Eureka 源码解析 —— EndPoint 与 解析器

目前有多种 Eureka-Server 访问地址的配置方式,本文只分享 Eureka 1.x 的配置,不包含 Eureka 1.x 对 Eureka 2.x 的...

1220
来自专栏码匠的流水账

java9系列(八)Multi-Release JAR Files

java9新支持了multi-release jar的功能,包括jar、javac、javap、jdeps等命令都能支持这个特性。所谓multi-release...

1712
来自专栏刘望舒

Android解析ClassLoader(二)Android中的ClassLoader

前言 在上一篇文章我们学习了Java的ClassLoader,很多同学会把Java和Android的ClassLoader搞混,甚至会认为Android中的Cl...

2108
来自专栏积累沉淀

XFire发布Web Services

步骤 1. 把XFire依赖的jar包放入lib文件夹下。 2. 在web.xml中加入如下代码: 3. 编写程序代码 首先编写一个功能的接口类。 pac...

1966
来自专栏开源优测

[接口测试 - http.client篇] 16 基于http.client之POM实战一下

概述 关注公众号回复: http.client_pom_demo 获取本文示例源码 你需要了解以下知识和技术,以便掌握后续的实例代码: http.client常...

3578
来自专栏Java编程技术

使用数据库悲观锁实现不可重入的分布式锁

在同一个jvm进程中时,可以使用JUC提供的一些锁来解决多个线程竞争同一个共享资源时候的线程安全问题,但是当多个不同机器上的不同jvm进程共同竞争同一个共享资源...

751
来自专栏程序猿DD

程序员你为什么这么累【续】:编写简陋的接口调用框架 - 动态代理学习

导读: 程序员你为什么这么累? 我的编码习惯 - 接口定义 我的编码习惯 - Controller规范 我的编码习惯 - 日志建议 我的编码习惯 - 异常处理 ...

4317
来自专栏Java3y

Web开发模式【Mode I 和Mode II的介绍、应用案例】

开发模式的介绍 在Web开发模式中,有两个主要的开发结构,称为模式一(Mode I)和模式二(Mode II). 首先我们来理清一些概念吧: DAO(Data ...

3467
来自专栏aoho求索

Spring Cloud Bus中的事件的订阅与发布(一)

年前最后一篇文章,提前祝大家新年快乐! 下面进入正文。Spring Cloud Bus用轻量级的消息代理将分布式系统的节点连接起来。这可以用来广播状态的该表(比...

44010

扫码关注云+社区

领取腾讯云代金券