聊聊flink的TimeCharacteristic

本文主要研究一下flink的TimeCharacteristic

TimeCharacteristic

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimeCharacteristic.java

/**
 * The time characteristic defines how the system determines time for time-dependent
 * order and operations that depend on time (such as time windows).
 */
@PublicEvolving
public enum TimeCharacteristic {
​
    /**
     * Processing time for operators means that the operator uses the system clock of the machine
     * to determine the current time of the data stream. Processing-time windows trigger based
     * on wall-clock time and include whatever elements happen to have arrived at the operator at
     * that point in time.
     *
     * <p>Using processing time for window operations results in general in quite non-deterministic
     * results, because the contents of the windows depends on the speed in which elements arrive.
     * It is, however, the cheapest method of forming windows and the method that introduces the
     * least latency.
     */
    ProcessingTime,
​
    /**
     * Ingestion time means that the time of each individual element in the stream is determined
     * when the element enters the Flink streaming data flow. Operations like windows group the
     * elements based on that time, meaning that processing speed within the streaming dataflow
     * does not affect windowing, but only the speed at which sources receive elements.
     *
     * <p>Ingestion time is often a good compromise between processing time and event time.
     * It does not need and special manual form of watermark generation, and events are typically
     * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
     * only be introduced by streaming shuffles or split/join/union operations. The fact that
     * elements are not very much out-of-order means that the latency increase is moderate,
     * compared to event
     * time.
     */
    IngestionTime,
​
    /**
     * Event time means that the time of each individual element in the stream (also called event)
     * is determined by the event's individual custom timestamp. These timestamps either exist in
     * the elements from before they entered the Flink streaming dataflow, or are user-assigned at
     * the sources. The big implication of this is that it allows for elements to arrive in the
     * sources and in all operators out of order, meaning that elements with earlier timestamps may
     * arrive after elements with later timestamps.
     *
     * <p>Operators that window or order data with respect to event time must buffer data until they
     * can be sure that all timestamps for a certain time interval have been received. This is
     * handled by the so called "time watermarks".
     *
     * <p>Operations based on event time are very predictable - the result of windowing operations
     * is typically identical no matter when the window is executed and how fast the streams
     * operate. At the same time, the buffering and tracking of event time is also costlier than
     * operating with processing time, and typically also introduces more latency. The amount of
     * extra cost depends mostly on how much out of order the elements arrive, i.e., how long the
     * time span between the arrival of early and late elements is. With respect to the
     * "time watermarks", this means that the cost typically depends on how early or late the
     * watermarks can be generated for their timestamp.
     *
     * <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the
     * event's original time, rather than the time assigned at the data source. Practically, that
     * means that event time has generally more meaning, but also that it takes longer to determine
     * that all elements for a certain time have arrived.
     */
    EventTime
}
  • ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间
  • IngestionTime是以数据进入flink streaming data flow的时间为准
  • EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段

区别

各个时间的区别如上图

实例

    public static void main(String[] args) throws Exception {
​
        final int popThreshold = 20; // threshold for popular places
​
        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);
​
        // configure the Kafka consumer
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
        kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
        kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
        // always read the Kafka topic from the start
        kafkaProps.setProperty("auto.offset.reset", "earliest");
​
        // create a Kafka consumer
        FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
                "cleansedRides",
                new TaxiRideSchema(),
                kafkaProps);
        // assign a timestamp extractor to the consumer
        consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());
​
        // create a TaxiRide data stream
        DataStream<TaxiRide> rides = env.addSource(consumer);
​
        // find popular places
        DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
                // match ride to grid cell and event type (start or end)
                .map(new GridCellMatcher())
                // partition by cell id and event type
                .keyBy(0, 1)
                // build sliding window
                .timeWindow(Time.minutes(15), Time.minutes(5))
                // count ride events in window
                .apply(new RideCounter())
                // filter by popularity threshold
                .filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
                // map grid cell to coordinates
                .map(new GridToCoordinates());
​
        popularPlaces.print();
​
        // execute the transformation pipeline
        env.execute("Popular Places from Kafka");
    }
​
    /**
     * Assigns timestamps to TaxiRide records.
     * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
     */
    public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {
​
        public TaxiRideTSExtractor() {
            super(Time.seconds(MAX_EVENT_DELAY));
        }
​
        @Override
        public long extractTimestamp(TaxiRide ride) {
            if (ride.isStart) {
                return ride.startTime.getMillis();
            }
            else {
                return ride.endTime.getMillis();
            }
        }
    }
  • 这里消费kafka的时候setStreamTimeCharacteristic为TimeCharacteristic.EventTime,同时assignTimestampsAndWatermarks指定为TaxiRideTSExtractor,它继承了BoundedOutOfOrdernessTimestampExtractor,这里的extractTimestamp根据ride的start与否返回ride.startTime.getMillis()或者ride.endTime.getMillis(),来自定义了eventTime

小结

  • flink的TimeCharacteristic枚举定义了三类值,分别是ProcessingTime、IngestionTime、EventTime
  • ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间;IngestionTime是以数据进入flink streaming data flow的时间为准;EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
  • 指定为EventTime的source需要自己定义event time以及emit watermark,或者在source之外通过assignTimestampsAndWatermarks在程序手工指定

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏jeremy的技术点滴

javacv编码mp4视频

2.5K70
来自专栏斑斓

响应式编程的实践

作者 | 张逸 特别说明:本文包含大量代码片段,若要获得更好阅读观感,请点击文末“阅读原文”或访问我的博客。 响应式编程在前端开发以及Android开发中有颇多...

39480
来自专栏京东技术

京东资深架构师代码评审歪诗

21030
来自专栏烙馅饼喽的技术分享

记一个脚本解释器的开发

最近可以有1个月左右的空闲,可以稍微整理一下这个脚本解释器的开发过程。 一、缘由   2014年左右,我们使用AIR技术,开发了一个3D战争类型的手游。那时候手...

45270
来自专栏Java技术栈

6 道 BATJ 必考的 Java 面试题

请对比 Exception 和 Error,另外,运行时异常与一般异常有什么区别?

11510
来自专栏Golang语言社区

Go 语言的演化历程

9、hello.c,标准C89 #include <stdio.h> main(void) //译注:与上面hello.c相比,多了个void { pr...

34680
来自专栏熊二哥

.NET工作准备--01前言

01应聘须知(已过时) -1.了解软件开发大环境。 -2.准备简历:不宜超过一页,永远准备中文,模板。 -3.渠道:3大网站,中华英才,前程无忧(51job最...

22480
来自专栏小二的折腾日记

LeetCode-32-Longest-Valid-Parentheses

表示这是一道没有看懂题目的题,看到题目的难度是hard,但是自己的想法很简答,以为直接一个栈就可以了。。 too young啊

12330
来自专栏我是攻城师

Java时间处理神器之Joda-Time

49250
来自专栏Java3y

给女朋友讲解什么是代理模式

27850

扫码关注云+社区

领取腾讯云代金券