前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink1.4 生成时间戳与Watermarks

Flink1.4 生成时间戳与Watermarks

作者头像
smartsi
发布2019-08-07 14:15:07
2.1K0
发布2019-08-07 14:15:07
举报
文章被收录于专栏:SmartSiSmartSi

本节适用于在事件时间上运行的程序。有关事件时间,处理时间和提取时间的介绍,请参阅Flink1.4 事件时间与处理时间

为了处理事件时间,流处理程序需要相应地设置TimeCharacteristic

Java版本:

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Scala版本:

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

1. 分配时间戳

为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每个元素都需要分配事件时间戳。这通常通过访问/提取元素中某个字段的时间戳来完成。时间戳分配与生成watermarks相结合,告诉系统有关事件时间的进度progress。分配时间戳和生成watermarks有两种方法:

  • 直接在数据流源中分配与生成
  • 通过时间戳分配器/watermark生成器:在Flink时间戳分配器中也会定义要发送的watermarks

备注:

代码语言:javascript
复制
时间戳和watermarks都是从Java历元1970-01-01T00:00:00Z以来的毫秒数。
1.1 带有时间戳和watermarks的数据源函数

流数据源还可以直接为它们产生的元素分配时间戳,并且也可以发送watermarks。如果数据源分配了时间戳,那么就不需要时间戳分配器。

备注:

代码语言:javascript
复制
如果继续使用时间戳分配器,将会覆盖数据源提供的时间戳和watermarks。

如果直接向数据源中的元素分配时间戳,数据源必须使用SourceContext上collectWithTimestamp()方法。如果要生成watermarks,数据源必须调用emitWatermark(Watermark)函数。

以下是分配时间戳并生成watermarks的源(non-checkpointed)的简单示例:

Java版本:

代码语言:javascript
复制
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
	while (/* condition */) {
		MyType next = getNext();
		ctx.collectWithTimestamp(next, next.getEventTimestamp());

		if (next.hasWatermarkTime()) {
			ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
		}
	}
}

Scala版本:

代码语言:javascript
复制
override def run(ctx: SourceContext[MyType]): Unit = {
	while (/* condition */) {
		val next: MyType = getNext()
		ctx.collectWithTimestamp(next, next.eventTimestamp)

		if (next.hasWatermarkTime) {
			ctx.emitWatermark(new Watermark(next.getWatermarkTime))
		}
	}
}
1.2 时间戳分配器/Watermark生成器

时间戳分配器接收数据流并产生一个新的数据流,包含带有时间戳的元素和Watermark。如果原始流已经拥有时间戳或watermarks,那么如果使用时间戳分配器将会覆盖它们。

时间戳分配器通常在数据源之后立马指定,但也不是严格遵循这样的原则。例如,一个常见的模式是在时间戳分配器之前需要进行解析(MapFunction)和过滤(FilterFunction)。无论如何,时间戳分配器都需要在第一个基于事件时间的操作(例如第一个窗口操作)之前被指定。但也有特殊情况,当使用Kafka作为流作业的数据源时,Flink允许在数据源(消费者)内部定义时间戳分配器/watermarks生成器。有关如何执行此操作的更多信息,请参见Kafka Connector文档

备注:

代码语言:javascript
复制
本节的其余部分介绍了程序员为了创建自己的时间戳提取器/watermarks生成器而必须实现的主要接口。如果要查看Flink内置的执行器,请参阅[Pre-defined Timestamp Extractors / Watermark Emitters](https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamp_extractors.html)

Java版本:

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);

Scala版本:

代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter());

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)
1.2.1 Periodic Watermarks 分配器

AssignerWithPeriodicWatermarks分配时间戳并定期生成Watermarks(可能取决于流元素,或纯粹基于处理时间)。

通过ExecutionConfig.setAutoWatermarkInterval()定义Watermarks的时间间隔(每n毫秒)。每次调用分配器的getCurrentWatermark()方法,如果返回的Watermark非null,并且大于先前的Watermark,则会发送(emitted)这个新的Watermarks

以下是带有周期性Watermark的时间戳分配器的两个简单示例:

Java版本:

代码语言:javascript
复制
/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {

	private final long maxTimeLag = 5000; // 5 seconds

	@Override
	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
		return element.getCreationTime();
	}

	@Override
	public Watermark getCurrentWatermark() {
		// return the watermark as current time minus the maximum time lag
		return new Watermark(System.currentTimeMillis() - maxTimeLag);
	}
}

Scala版本:

代码语言:javascript
复制
/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L; // 3.5 seconds

    var currentMaxTimestamp: Long;

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp;
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxTimeLag = 5000L; // 5 seconds

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current time minus the maximum time lag
        new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
}
1.2.2 Punctuated Watermarks 分配器

每当某个事件表明一个新的Watermarks可能要生成时,需要调用AssignerWithPunctuatedWatermarks方法来生成Watermarks(To generate watermarks whenever a certain event indicates that a new watermark might be generated, use AssignerWithPunctuatedWatermarks)。对于这个类,Flink首先调用extractTimestamp()方法为元素分配时间戳,然后立即调用该元素上的checkAndGetNextWatermark()方法。

把在extractTimestamp()方法中分配的时间戳传递给checkAndGetNextWatermark()方法,并且可以决定是否要生成Watermarks。只要checkAndGetNextWatermark()方法返回非null的Watermark,并且该Watermark比以前最新的Watermark都大,则会发送这个新的Watermark

Java版本:

代码语言:javascript
复制
public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {

	@Override
	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
		return element.getCreationTime();
	}

	@Override
	public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
		return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
	}
}

Scala版本:

代码语言:javascript
复制
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

	override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
		element.getCreationTime
	}

	override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
		if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
	}
}

备注:

代码语言:javascript
复制
可以在每个单独的事件上生成Watermark。但是,由于每个Watermark在下游引起一些计算,所以过多的Watermark会降低性能。

2. 每个Kafka分区一个时间戳

当使用Apache Kafka作为数据源时,每个Kafka分区都可能有一个简单的事件时间模式(时间戳按升序递增或有界无序)。然而,当消费Kafka中的流时,多个分区通常并行消费,来自多个分区的事件会交叉在一起,破坏每个分区模式。

在这种情况下,你可以使用FlinkKafka分区感知Watermark的生成(Kafka-partition-aware watermark generation)。使用该特性,在Kafka消费者中,每个Kafka分区都生成watermark,并且每个分区的watermark的合并方式与在数据流shuffle上合并方式相同(the per-partition watermarks are merged in the same way as watermarks are merged on stream shuffles.)。

例如,如果在每个Kafka分区中的事件时间戳严格递增,则使用递增时间戳watermark生成器生成每个分区的watermark,在整体watermark上产生的结果也非常好。

下图显示了如何使用每个Kafka分区生成watermark,以及在这种情况下watermark如何通过流数据流进行传播:

Java版本:

代码语言:javascript
复制
FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {

    @Override
    public long extractAscendingTimestamp(MyType element) {
        return element.eventTimestamp();
    }
});

DataStream<MyType> stream = env.addSource(kafkaSource);

Scala版本:

代码语言:javascript
复制
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
    def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})

val stream: DataStream[MyType] = env.addSource(kafkaSource)

备注:

代码语言:javascript
复制
Flink版本:1.4

原文:https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-01-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 分配时间戳
    • 1.1 带有时间戳和watermarks的数据源函数
      • 1.2 时间戳分配器/Watermark生成器
        • 1.2.1 Periodic Watermarks 分配器
        • 1.2.2 Punctuated Watermarks 分配器
    • 2. 每个Kafka分区一个时间戳
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档