我通过Pub/sub摄取数据(json文件),所以默认情况下我的事件时间是主题中发布的时间。我想强迫事件发生时间并改变它。我在数据中添加了一个日期时间字段。
我希望根据json文件的新时间戳字段进行聚合和组合。
Ps:该字段名为“时间戳”,它是一个字符串。这就是为什么我把它转换成一个日期时间,然后在数据流中转换一个时间戳。
def get_timestamp(data):
my_date = (data['timestamp']) # date : 2010-09-18......string
times = datetime.fromisoformat(my_date) #type: datetime.datetime
return beam.window.TimestampedValue(data, datetime.timestamp(times))稍后,在进行窗口化之前,我将调用管道中的函数。
我从公共部门收到我的数据:
lines = p | 'receive_data' >> beam.io.ReadFromPubSub(
subscription=known_args.in_topic).with_input_types(str)
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'jsonload' >> beam.Map(lambda x: json.loads(x))然后做我的处理:
(lines |'timestamp' >> beam.Map(get_timestamp)
| 'print timestamp' >> beam.ParDo(PrintFn2())
| 'window' >> beam.WindowInto(
window.FixedWindows(10),
trigger=trigger.AfterWatermark(),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
| 'CountGlobally' >> beam.CombineGlobally(
beam.combiners.CountCombineFn()
).without_defaults()
)发布于 2019-09-20 01:57:49
从PubSub读取时,设置元素EvenTime的最佳方法是使用
Python 属性
这将设置元素时间戳,并确保水印信号具有良好的数据。
如果这不是一个选项,您可以按照DoFn中的向PCollection添加时间戳更改元素的时间戳。然而,这种方法将不允许设置时间戳<当时的水印。这就是为什么withTimestampAttribute方法是解决此模式的最佳方法的原因。
https://stackoverflow.com/questions/58014098
复制相似问题