我想连接一些风暴螺栓到一个TriggerSpout,它发射元组,假设每3-4个小时或在一天中的特定时间。当连接的螺栓接收到这个TriggerSpout的元组时,应该开始对聚合数据(来自另一个kafka喷口)进行进一步的计算。(在本例中,用于输入后续MLBolt的特征提取)
现在,我可以以某种方式使用它的nextTuple()方法来公开这个行为吗?这是应该走的路,还是有人能提出更好的办法来做到这一点呢?对于星系团来说,这是一种全球时钟。
问候‘n’谢谢
发布于 2014-05-16 14:30:58
与使用TriggerSpout不同,您可以在螺栓中添加滴答。在螺栓实现中添加以下内容:
import backtype.storm.Constants;
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 15);
return conf;
}
public static boolean isTickTuple(Tuple tuple) {
String sourceComponent = tuple.getSourceComponent();
String sourceStreamId = tuple.getSourceStreamId();
return sourceComponent.equals(Constants.SYSTEM_COMPONENT_ID)
&& sourceStreamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
}
在execute()方法中,首先检查输入是否为滴答元组,然后继续执行逻辑。
https://stackoverflow.com/questions/23658046
复制相似问题