前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flume ExecSource使用与源码阅读笔记

flume ExecSource使用与源码阅读笔记

原创
作者头像
皮皮熊
发布2018-09-17 21:55:06
1.1K0
发布2018-09-17 21:55:06
举报

ExecSource是很多人接触flume时第一个使用的source,这里我简单的分析下这个source的使用与实现。

一、FlumeUserGuide

FlumeUserGuide:https://flume.apache.org/FlumeUserGuide.html

这里我简单翻译和总结下:

Exec source在启动时运行Unix命令,并且期望它会不断的在标准输出中产生数据。 (stderr默认会被丢弃).如果进程因为某些原因退出,Exce Source也将退出并且不会再产生数据。

详细配置说明如下: 加粗的是必配项:

注意:ExecSource无法感知数据的丢失,比如channel满的时候数据发送失败。为了更健壮的数据可靠性,推荐:Spooling Directory Source、Taildir Source或者通过flume的SDK直接实现。

如此一个简单的配置,我们就能实时地将/var/log/secure下的日志发送到flume的channel里面。

代码语言:javascript
复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

简单说下shell和command,shell主要是区分Bash 或 Powershell。command作为一个参数传递给shell执行,这样command就可以使用shell的一些特性,如wildcards, back ticks, pipes, loops, conditionals,如果shell没有指定,一般会使用默认值‘/bin/sh -c’, ‘/bin/ksh -c’, ‘cmd /c’, ‘powershell -Command’。

二、核心流程源码分析

代码语言:java
复制
public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable{
    xxx;
}

主要函数/类有:

  • start()
  • stop()
  • configure(Context context)
  • static class ExecRunnable implements Runnable
  • static class StderrReader extends Thread

1. start():

代码比较少,功能比较简单:

  1. 线程池
  2. 构建ExecRunnable线程对象,传入配置文件的参数
  3. 启动计数器

2. stop():

功能同样比较简单

  1. kill runner
  2. shutdown executor
  3. stop sourceCounter

3. configure(Context context)

获取excel里面的那些参数

4. ExecRunnable 主要实现类,核心+关键

static class ExecRunnable implements Runnable

核心部分是一个

代码语言:java
复制
do{
   xxx;
}while(restart);

所以stop时需要将restart置为false,防止do重做。

do内容主要包括:

1) shell与command处理,然后执行:

代码语言:java
复制
if (shell != null) {
  String[] commandArgs = formulateShellCommand(shell, command);
 process = Runtime.getRuntime().exec(commandArgs);
}  else {
  String[] commandArgs = command.split("\\s+");
 process = new ProcessBuilder(commandArgs).start();
}

2)启动reader

代码语言:java
复制
reader = new BufferedReader(
 new InputStreamReader(process.getInputStream(), charset));

3)启动stderrReader

代码语言:java
复制
StderrReader stderrReader = new StderrReader(new BufferedReader(
 new InputStreamReader(process.getErrorStream(), charset)), logStderr);
xxxx;

4)定时任务,每batchTimeout时间批量将eventList flush

代码语言:javascript
复制
future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
 @Override
 public void run() {
 try {
 synchronized (eventList) {
 if (!eventList.isEmpty() && timeout()) {
            flushEventBatch(eventList);
          }
        }
      } catch (Exception e) {
 logger.error("Exception occurred when processing event batch", e);
 if (e instanceof InterruptedException) {
          Thread.currentThread().interrupt();
        }
      }
    }
},
batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);

5)读取和发送数据

代码语言:java
复制
while ((line = reader.readLine()) != null) {
 sourceCounter.incrementEventReceivedCount();
 synchronized (eventList) {
    eventList.add(EventBuilder.withBody(line.getBytes(charset)));
 if (eventList.size() >= bufferCount || timeout()) {
      flushEventBatch(eventList);
    }
  }
}
synchronized (eventList) {
 if (!eventList.isEmpty()) {
    flushEventBatch(eventList);
  }
}

这个通用的小技巧,几乎所有的批处理代码里面都有这类写法:

1. 没到最后一行,达到bufferCount或者到timeout一批处理列表里面的数据

2. 读完最后一行(字节流没数据了),执行批处理

5. StderrReader处理stderr,这里不再赘述

static class StderrReader extends Thread

三、其他代码片段

1.批量flush Event

代码语言:java
复制
private void flushEventBatch(List<Event> eventList) {
 channelProcessor.processEventBatch(eventList);
 sourceCounter.addToEventAcceptedCount(eventList.size());
  eventList.clear();
 lastPushToChannel = systemClock.currentTimeMillis();
}

2.将shell和Command结合在一起,组成一个数组

代码语言:java
复制
private static String[] formulateShellCommand(String shell, String command) {
  String[] shellArgs = shell.split("\\s+");
  String[] result = new String[shellArgs.length + 1];
  System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
  result[shellArgs.length] = command;
 return result;
}

3.如果是中断导致的,则需要中断线程

代码语言:javascript
复制
catch (Exception e) {
 logger.error("Failed while running command: " + command, e);
 if (e instanceof InterruptedException) {
    Thread.currentThread().interrupt();
  }
} 

4.超时时间判断

代码语言:java
复制
private boolean timeout() {
 return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
}

四、阅读收获

  • 学习开发一个EventDrivenSource
  • 部分代码适用于命令执行模块
  • ExecSource确实无法感知数据丢失
  • 了解ExecSource的使用

“了解ExecSource的使用” 放在最后是因为ExecSource确实不太适合用在生产环境

参考文章:https://blog.csdn.net/qianshangding0708/article/details/49736019

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、FlumeUserGuide
    • 二、核心流程源码分析
      • 1. start():
        • 2. stop():
          • 3. configure(Context context)
            • 4. ExecRunnable 主要实现类,核心+关键
              • 1) shell与command处理,然后执行:
              • 2)启动reader
              • 3)启动stderrReader
              • 4)定时任务,每batchTimeout时间批量将eventList flush
              • 5)读取和发送数据
            • 5. StderrReader处理stderr,这里不再赘述
            • 三、其他代码片段
              • 1.批量flush Event
                • 2.将shell和Command结合在一起,组成一个数组
                • 3.如果是中断导致的,则需要中断线程
                • 4.超时时间判断
            • 四、阅读收获
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档