(5d8e3f411b5a4ccb):[2017-09-25T13:53:08.088Z..2017-09-25T13:53:08.719Z)窗口的java.lang.IllegalStateException: TimestampCombiner从2017-09-25T13:53:08.725Z移动到更早的时间2017-09-25T13:53:08.718Z。
预期的原因是什么?
WindowFn代码很简单:
public class BQTablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
/**
*
*/
private static final long serialVersionUID = 1L;
private IntervalWindow assignWindow(AssignContext context) {
TableRow tableRow = (TableRow) context.element();
String timestamp = tableRow.get(BQConstants.LOG_TIME).toString();
String currentTime = DateUtil.getFormatedDate(new Date());
DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
.withZoneUTC();
Instant start_point = Instant.parse(timestamp, formatter);
Instant end_point = Instant.parse(currentTime, formatter);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}}
发布于 2017-09-26 17:07:25
GroupByKey的默认行为是使用时间戳输出可迭代性,时间戳是窗口中允许的最大时间戳。对于您的窗口,这是时间戳13:53:08.718Z。
元素具有时间戳13:53:08.725Z,它不会落在从13:53:08.088Z到13:53:08.719Z的窗口中。
您可以共享您的WindowFn以及任何调整时间戳的ParDo吗?
更新:谢谢你分享你的WindowFn。有几件事会给你带来麻烦。
1.指定窗口的开始时间不是基于元素的时间戳.
提取元素的一列,并根据context.element().get(BQConstants.LOG_TIME)的值分配窗口(忽略转换和解析)。从您的错误消息来看,这似乎不是context.timestamp()的实际值,它是元素的事件时间戳。
相反,您应该编写WindowFn以使用context.timestamp()。您可以根据数据是否有界,以不同的方式确保时间戳是您想要的:
WithTimestamps通过提取时间戳字段来分配时间戳。PubsubIO从您可以指定的属性读取时间戳。2.指定窗口的结束时间基于系统日期。
有几个问题:
这里的目标是什么?您是否只是为了提取动态目的地的端点而设置它?如果是这样的话,我建议将数据划分为何时发生,而不是何时处理。
https://stackoverflow.com/questions/46418568
复制相似问题