首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何改变apache光束中的事件时间?

如何改变apache光束中的事件时间?
EN

Stack Overflow用户
提问于 2019-09-19 15:10:22
回答 1查看 2.4K关注 0票数 0

我通过Pub/sub摄取数据(json文件),所以默认情况下我的事件时间是主题中发布的时间。我想强迫事件发生时间并改变它。我在数据中添加了一个日期时间字段。

我希望根据json文件的新时间戳字段进行聚合和组合。

Ps:该字段名为“时间戳”,它是一个字符串。这就是为什么我把它转换成一个日期时间,然后在数据流中转换一个时间戳。

代码语言:javascript
复制
def get_timestamp(data):
    my_date = (data['timestamp']) # date : 2010-09-18......string
    times = datetime.fromisoformat(my_date) #type: datetime.datetime
    return beam.window.TimestampedValue(data, datetime.timestamp(times))

稍后,在进行窗口化之前,我将调用管道中的函数。

我从公共部门收到我的数据:

代码语言:javascript
复制
lines = p | 'receive_data' >> beam.io.ReadFromPubSub(
        subscription=known_args.in_topic).with_input_types(str) 
        | 'decode' >> beam.Map(lambda x: x.decode('utf-8')) 
        | 'jsonload' >> beam.Map(lambda x: json.loads(x))

然后做我的处理:

代码语言:javascript
复制
 (lines |'timestamp' >> beam.Map(get_timestamp)
           | 'print timestamp' >> beam.ParDo(PrintFn2())
           | 'window' >> beam.WindowInto(
            window.FixedWindows(10),
            trigger=trigger.AfterWatermark(),
            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
        )
        | 'CountGlobally' >> beam.CombineGlobally(
                beam.combiners.CountCombineFn()
            ).without_defaults() 
    )
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-09-20 01:57:49

从PubSub读取时,设置元素EvenTime的最佳方法是使用

Java withTimestampAttribute

Python 属性

这将设置元素时间戳,并确保水印信号具有良好的数据。

如果这不是一个选项,您可以按照DoFn中的向PCollection添加时间戳更改元素的时间戳。然而,这种方法将不允许设置时间戳<当时的水印。这就是为什么withTimestampAttribute方法是解决此模式的最佳方法的原因。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58014098

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档