首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在数据流流道中运行管道时获取IllegalStateException

在数据流流道中运行管道时获取IllegalStateException
EN

Stack Overflow用户
提问于 2017-09-26 05:38:01
回答 1查看 839关注 0票数 1

(5d8e3f411b5a4ccb):[2017-09-25T13:53:08.088Z..2017-09-25T13:53:08.719Z)窗口的java.lang.IllegalStateException: TimestampCombiner从2017-09-25T13:53:08.725Z移动到更早的时间2017-09-25T13:53:08.718Z。

预期的原因是什么?

WindowFn代码很简单:

代码语言:javascript
复制
public class BQTablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {

/**
 * 
 */
private static final long serialVersionUID = 1L;

private IntervalWindow assignWindow(AssignContext context) {
    TableRow tableRow = (TableRow) context.element();
    String timestamp = tableRow.get(BQConstants.LOG_TIME).toString();
    String currentTime = DateUtil.getFormatedDate(new Date());
    DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
            .withZoneUTC();
    Instant start_point = Instant.parse(timestamp, formatter);
    Instant end_point = Instant.parse(currentTime, formatter);

    return new IntervalWindow(start_point, end_point);
};

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}

@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
    return Arrays.asList(assignWindow(c));
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return false;
}

@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
    throw new IllegalArgumentException(
            "Attempted to get side input window for GlobalWindow from non-global WindowFn");
}

}

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-09-26 17:07:25

GroupByKey的默认行为是使用时间戳输出可迭代性,时间戳是窗口中允许的最大时间戳。对于您的窗口,这是时间戳13:53:08.718Z

元素具有时间戳13:53:08.725Z,它不会落在从13:53:08.088Z13:53:08.719Z的窗口中。

您可以共享您的WindowFn以及任何调整时间戳的ParDo吗?

更新:谢谢你分享你的WindowFn。有几件事会给你带来麻烦。

1.指定窗口的开始时间不是基于元素的时间戳.

提取元素的一列,并根据context.element().get(BQConstants.LOG_TIME)的值分配窗口(忽略转换和解析)。从您的错误消息来看,这似乎不是context.timestamp()的实际值,它是元素的事件时间戳。

相反,您应该编写WindowFn以使用context.timestamp()。您可以根据数据是否有界,以不同的方式确保时间戳是您想要的:

  • 如果数据是有界的,则可以使用WithTimestamps通过提取时间戳字段来分配时间戳。
  • 如果您的数据是无界的,则源需要了解更多信息,以便能够管理水印,因此配置取决于源。例如,PubsubIO从您可以指定的属性读取时间戳。

2.指定窗口的结束时间基于系统日期

有几个问题:

  • 结束时间被四舍五入,可能在开始时间之前,从而导致无效窗口。
  • 结束时间是不确定的。Beam中的一般期望是,您将主要根据元素的时间戳(该时间戳必须落在窗口结束之前)和元素本身来确定地分配一个窗口。分配这样的不确定窗口可能会有不可预见的缺点。一个已知的问题是,您的结果是不可复制的,如果您需要修复数据处理错误或对归档的数据进行实验,这可能会带来麻烦。这取决于您的用例,但您可能会考虑一些更未来的证明。

这里的目标是什么?您是否只是为了提取动态目的地的端点而设置它?如果是这样的话,我建议将数据划分为何时发生,而不是何时处理。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46418568

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档