专栏首页码匠的流水账聊聊flink的TimeCharacteristic

聊聊flink的TimeCharacteristic

本文主要研究一下flink的TimeCharacteristic

TimeCharacteristic

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimeCharacteristic.java

/**
 * The time characteristic defines how the system determines time for time-dependent
 * order and operations that depend on time (such as time windows).
 */
@PublicEvolving
public enum TimeCharacteristic {

    /**
     * Processing time for operators means that the operator uses the system clock of the machine
     * to determine the current time of the data stream. Processing-time windows trigger based
     * on wall-clock time and include whatever elements happen to have arrived at the operator at
     * that point in time.
     *
     * <p>Using processing time for window operations results in general in quite non-deterministic
     * results, because the contents of the windows depends on the speed in which elements arrive.
     * It is, however, the cheapest method of forming windows and the method that introduces the
     * least latency.
     */
    ProcessingTime,

    /**
     * Ingestion time means that the time of each individual element in the stream is determined
     * when the element enters the Flink streaming data flow. Operations like windows group the
     * elements based on that time, meaning that processing speed within the streaming dataflow
     * does not affect windowing, but only the speed at which sources receive elements.
     *
     * <p>Ingestion time is often a good compromise between processing time and event time.
     * It does not need and special manual form of watermark generation, and events are typically
     * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
     * only be introduced by streaming shuffles or split/join/union operations. The fact that
     * elements are not very much out-of-order means that the latency increase is moderate,
     * compared to event
     * time.
     */
    IngestionTime,

    /**
     * Event time means that the time of each individual element in the stream (also called event)
     * is determined by the event's individual custom timestamp. These timestamps either exist in
     * the elements from before they entered the Flink streaming dataflow, or are user-assigned at
     * the sources. The big implication of this is that it allows for elements to arrive in the
     * sources and in all operators out of order, meaning that elements with earlier timestamps may
     * arrive after elements with later timestamps.
     *
     * <p>Operators that window or order data with respect to event time must buffer data until they
     * can be sure that all timestamps for a certain time interval have been received. This is
     * handled by the so called "time watermarks".
     *
     * <p>Operations based on event time are very predictable - the result of windowing operations
     * is typically identical no matter when the window is executed and how fast the streams
     * operate. At the same time, the buffering and tracking of event time is also costlier than
     * operating with processing time, and typically also introduces more latency. The amount of
     * extra cost depends mostly on how much out of order the elements arrive, i.e., how long the
     * time span between the arrival of early and late elements is. With respect to the
     * "time watermarks", this means that the cost typically depends on how early or late the
     * watermarks can be generated for their timestamp.
     *
     * <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the
     * event's original time, rather than the time assigned at the data source. Practically, that
     * means that event time has generally more meaning, but also that it takes longer to determine
     * that all elements for a certain time have arrived.
     */
    EventTime
}
  • ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间
  • IngestionTime是以数据进入flink streaming data flow的时间为准
  • EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段

区别

各个时间的区别如上图

实例

    public static void main(String[] args) throws Exception {

        final int popThreshold = 20; // threshold for popular places

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);

        // configure the Kafka consumer
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
        kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
        kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
        // always read the Kafka topic from the start
        kafkaProps.setProperty("auto.offset.reset", "earliest");

        // create a Kafka consumer
        FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
                "cleansedRides",
                new TaxiRideSchema(),
                kafkaProps);
        // assign a timestamp extractor to the consumer
        consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());

        // create a TaxiRide data stream
        DataStream<TaxiRide> rides = env.addSource(consumer);

        // find popular places
        DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
                // match ride to grid cell and event type (start or end)
                .map(new GridCellMatcher())
                // partition by cell id and event type
                .keyBy(0, 1)
                // build sliding window
                .timeWindow(Time.minutes(15), Time.minutes(5))
                // count ride events in window
                .apply(new RideCounter())
                // filter by popularity threshold
                .filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
                // map grid cell to coordinates
                .map(new GridToCoordinates());

        popularPlaces.print();

        // execute the transformation pipeline
        env.execute("Popular Places from Kafka");
    }

    /**
     * Assigns timestamps to TaxiRide records.
     * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
     */
    public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {

        public TaxiRideTSExtractor() {
            super(Time.seconds(MAX_EVENT_DELAY));
        }

        @Override
        public long extractTimestamp(TaxiRide ride) {
            if (ride.isStart) {
                return ride.startTime.getMillis();
            }
            else {
                return ride.endTime.getMillis();
            }
        }
    }
  • 这里消费kafka的时候setStreamTimeCharacteristic为TimeCharacteristic.EventTime,同时assignTimestampsAndWatermarks指定为TaxiRideTSExtractor,它继承了BoundedOutOfOrdernessTimestampExtractor,这里的extractTimestamp根据ride的start与否返回ride.startTime.getMillis()或者ride.endTime.getMillis(),来自定义了eventTime

小结

  • flink的TimeCharacteristic枚举定义了三类值,分别是ProcessingTime、IngestionTime、EventTime
  • ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间;IngestionTime是以数据进入flink streaming data flow的时间为准;EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
  • 指定为EventTime的source需要自己定义event time以及emit watermark,或者在source之外通过assignTimestampsAndWatermarks在程序手工指定

doc

  • Event Time

本文分享自微信公众号 - 码匠的流水账(geek_luandun)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-12-18

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊flink的TimeCharacteristic

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/Time...

    codecraft
  • 聊聊flink的RichParallelSourceFunction

    flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/func...

    codecraft
  • 聊聊storm的WindowedBoltExecutor

    storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor....

    codecraft
  • 聊聊flink的TimeCharacteristic

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/Time...

    codecraft
  • 预料意料之外的事情:开发自主系统设计原则,以应对意料之外的事件和条件(AI)

    在开发自主系统时,工程师和其他利益相关者会努力使系统为所有可预见的事件和条件做好准备。但是,这些系统仍然会遇到在设计时没有考虑到的事件和条件。出于安全、成本或伦...

    李欣颖6837176
  • CodeForces 157B Trace

    B. Trace time limit per test 2 seconds memory limit per test 256 megabytes...

    ShenduCC
  • Windows Server AppFabric Beta 2 已经发布

    Windows Server AppFabric Beta 2是一个包含完全功能的AppFabric版本(This build represents our “...

    张善友
  • “理性无限制反驳的演绎联合支持”技术报告(CS AI)

    在 ASPIC 风格的结构化论证中,一个论点可以通过攻击另一个论点的结论来反驳。有人提出了两种形式化的反驳方式。在有限制的反驳中,被攻击的结论必须是在一个可被击...

    刘持诚
  • The Perfect Server -

    chkconfig --levels 235 mysqld on /etc/init.d/mysqld start

    py3study
  • js的bind函数那些你可能没想过的点

    加上上一篇文章中说的情况,我们可以总结到,当我们对一个bind返回的函数进行操作时,实际上这种操作是对目标函数的操作,也就是调用bind的函数。下面我们看一下e...

    theanarkh

扫码关注云+社区

领取腾讯云代金券