我在一个转换器上创建了一个预定的标点符号,并安排它定期运行(使用kafka v2.1.0)。每次我接受一个特定的密钥时,我都会像这样创建一个新的密钥
scheduled = context.schedule (Duration.ofMillis(scheduleTime),
PunctuationType.WALL_CLOCK_TIME,new CustomPunctuator(context, customStateStoreName));
我的问题是,我创建的所有这些标点符号都在不断运行,我找不到一种方法来取消它们。我在互联网上找到了一个可以使用的片段
private Cancellable scheduled;
@Override
public void init(PorcessorContext processContext) {
this.context = processorContext;
scheduled = context.schedule(TimeUnit.SECONDS.toMillis(5), PunctuationType.WALL_CLOCK_TIME,
this::punctuateCancel);
}
private void punctuateCancel(long timestamp) {
scheduled.cancel();
}
但不幸的是,这似乎只取消了最新创建的Punctuator。
我正在编辑我的帖子,只是为了进一步了解我的方法,以及这与wardzinia的评论有何关系。我的方法非常类似,只是使用了一个Map,因为我只需要为每个事件键激活一个标点符号,所以在我的Transformer类中,我启动了
private Map<String,Cancellable> scheduled = new HashMap<>();
在我的transform方法中,我执行了下面的代码
{
final Cancellable cancelSched = scheduled.get(recordKey);
// Every time I get a new event I cancel my previous Punctuator
// and schedule a new one ( context.schedule a few lines later)
if(cancelSched != null)
cancelSched.cancel();
// This is supposed to work like a closure by capturing the currentCancellable which in the next statement
// is moved to the scheduled map. Scheduled map at any point will have the only active Punctuator for a
// specific String as it is constantly renewed
// Note: Previous registered punctuators have already been cancelled as it can be seen by the previous
// statement (cancelSched.cancel();)
Cancellable currentCancellable = context.schedule(Duration.ofMillis(scheduleTime), PunctuationType.WALL_CLOCK_TIME,
new CustomPunctuator(context, recordKey ,()-> scheduled ));
// Update Active Punctuators for a specific key.
scheduled.put(recordKey,currentCancellable);
}
并且我在Punctuator标点方法上使用已注册的回调来取消启动后的最后一个活动Punctuator。它似乎是有效的(虽然不确定),但它感觉非常“老套”,而且不是一种它肯定是可取的解决方案。
那么,如何在触发后取消标点符号。有没有办法解决这个问题?
https://stackoverflow.com/questions/54803124
复制相似问题