前面我们已经说过了flume的简单入门,这篇文章继续深入,来熟悉下source,并通过自定义 source 来了解其工作原理,接下来的一系列文章都会以flume的各个小组件慢慢深入,欢迎和我一起学习
source-to-channel.png
source 分为两大类:PollableSource 和 EventDrivenSource,不过笔者倒是没怎么弄清楚,这两大类区分的目的何在?如果你有什么想法,欢迎留言指教。
public interface PollableSource extends Source {
public Status process() throws EventDeliveryException;
public static enum Status {READY, BACKOFF}
}
process
返回 READY,表示数据产生正常,如果是 BACKOFF 则表示异常,当产生异常时候,agent 会等待一段时间再来调用 process
,异常次数越多,间隔时间越长,最长不超过 5s。public interface EventDrivenSource extends Source
main()
方法AbstractConfigurationProvider$getConfiguration
解析配置文件中的各个组件和属性supervisor
来运行和管理其生命周期。start
方法正式开始执行,这样也就到了我们将要自定义代码的实现执行了。这里只是大概说了一下流程,具体详情还是需要自己看源码的,我们的目的就是梳理一下整个流程,知道自己一个大概就好了,深究反而落的下乘,同时也是为了接下来自定义 source 打个基础,知道我们自己写的东西是怎么运行的。
AbstractSource
并实现 Configurable
和( EventDrivenSource
或者PollableSource
)/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.inveno.flume;
import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.ImmutableMap;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SequenceSource extends AbstractSource implements Configurable ,EventDrivenSource {
private static final Logger logger = LoggerFactory
.getLogger(SequenceSource.class);
private long seq;
private int batchSize = 10;
private List<Event> batchArrayList = new ArrayList<>();
@Override
public void configure(Context context) {
//自定义配置属性
batchSize = context.getInteger("batchSize", 1);
//打印自定义属性
ImmutableMap<String, String> map = context.getParameters();
for (String s : map.keySet()) {
logger.warn(s + "==============configure=============================" + map.get(s));
}
}
private void process(){
try {
batchArrayList.add(EventBuilder.withBody(String.valueOf(seq++).getBytes()));
if(batchArrayList.size()>=batchSize){
getChannelProcessor().processEventBatch(batchArrayList);
batchArrayList.clear();
}
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void start() {
super.start();
//开启一个线程来生产数据,当然你也可以整个线程池
new Thread(new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
//这里有个java知识点 ,InterruptedException捕获后,
// 这个标记点会被重置 ,需要再次 interrupt才能正确退出
Thread.currentThread().interrupt();
}
process();
}
}
}).start();
logger.debug("==========================start");
}
@Override
public void stop() {
super.stop();
logger.info("==========================stop", getName());
}
}
configure
方法里面获取配置属性-batchsize。PollableSource
类型的 source ,只是不需要自己开启线程罢了,其余都差不多,就是这么简单。上面我们自定义了一个 source,事件是交给 flume 自带的 ChannelProcessor 自己处理的,下一节,我们来说说 ChannelProcessor 相关细节
搞了半天忘记写部署自定义代码了。。。抱歉!!!
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.inveno.flume.SequenceSource
a1.sources.r1.batchSize = 5
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1