首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >触发后取消Kafka Streams上的Punctuator

触发后取消Kafka Streams上的Punctuator
EN

Stack Overflow用户
提问于 2019-02-21 17:07:39
回答 1查看 718关注 0票数 0

我在一个转换器上创建了一个预定的标点符号,并安排它定期运行(使用kafka v2.1.0)。每次我接受一个特定的密钥时,我都会像这样创建一个新的密钥

代码语言:javascript
复制
        scheduled = context.schedule (Duration.ofMillis(scheduleTime),
             PunctuationType.WALL_CLOCK_TIME,new CustomPunctuator(context, customStateStoreName));

我的问题是,我创建的所有这些标点符号都在不断运行,我找不到一种方法来取消它们。我在互联网上找到了一个可以使用的片段

代码语言:javascript
复制
    private Cancellable scheduled;

    @Override
    public void init(PorcessorContext processContext) {
        this.context = processorContext;
        scheduled = context.schedule(TimeUnit.SECONDS.toMillis(5), PunctuationType.WALL_CLOCK_TIME,
                                     this::punctuateCancel);
    }

    private void punctuateCancel(long timestamp) {
        scheduled.cancel();
    }

但不幸的是,这似乎只取消了最新创建的Punctuator。

我正在编辑我的帖子,只是为了进一步了解我的方法,以及这与wardzinia的评论有何关系。我的方法非常类似,只是使用了一个Map,因为我只需要为每个事件键激活一个标点符号,所以在我的Transformer类中,我启动了

代码语言:javascript
复制
   private Map<String,Cancellable> scheduled  = new HashMap<>();

在我的transform方法中,我执行了下面的代码

代码语言:javascript
复制
{
 final Cancellable cancelSched = scheduled.get(recordKey);

 // Every time I get a new event I cancel my previous Punctuator
 // and schedule a new one ( context.schedule a few lines later)

 if(cancelSched != null)
    cancelSched.cancel();

 // This is supposed to work like a closure by capturing the currentCancellable which in the next statement
 // is moved to the scheduled map. Scheduled map at any point will have the only active Punctuator for a
 // specific String as it is constantly renewed
 // Note: Previous registered punctuators have already been cancelled as it can be seen by the previous
 // statement (cancelSched.cancel();)

 Cancellable currentCancellable = context.schedule(Duration.ofMillis(scheduleTime), PunctuationType.WALL_CLOCK_TIME,
                new CustomPunctuator(context, recordKey ,()-> scheduled ));

 // Update Active Punctuators for a specific key.

 scheduled.put(recordKey,currentCancellable);   
}

并且我在Punctuator标点方法上使用已注册的回调来取消启动后的最后一个活动Punctuator。它似乎是有效的(虽然不确定),但它感觉非常“老套”,而且不是一种它肯定是可取的解决方案。

那么,如何在触发后取消标点符号。有没有办法解决这个问题?

EN

回答 1

Stack Overflow用户

发布于 2020-05-22 00:42:49

我也遇到过同样的情况,对于对scala感兴趣的人,我的处理方式是

代码语言:javascript
复制
val punctuation = new myPunctuation()
val scheduled:Cancellable=context.schedule(Duration.ofSeconds(5), PunctuationType.WALL_CLOCK_TIME, punctuation)
punctuation.schedule=scheduled

这个类

代码语言:javascript
复制
class myPunctuation() extends Punctuator{
      var schedule: Cancellable = _
      override def punctuate(timestamp: Long): Unit = {
      println("hello")
      schedule.cancel()
      }

    }

像个魔法师一样工作

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54803124

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档