专栏首页SmartSiFlink DataStream 如何实现双流Join

Flink DataStream 如何实现双流Join

在离线 Hive 中,我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢?Flink DataStream API 为我们提供了3个算子来实现双流 join,分别是:

  • join
  • coGroup
  • intervalJoin

下面我们分别详细看一下这3个算子是如何实现双流 Join 的。

1. Join

Join 算子提供的语义为 “Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。

Join 可以支持处理时间和事件时间两种时间特征。

Join 通用用法如下:

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

Join 语义类似与离线 Hive 的 InnnerJoin (内连接),这意味着如果一个流中的元素在另一个流中没有相对应的元素,则不会输出该元素。

下面我们看一下 Join 算子在不同类型窗口上的具体表现。

1.1 滚动窗口Join

当在滚动窗口上进行 Join 时,所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。

如上图所示,我们定义了一个大小为 2 秒的滚动窗口,最终产生 [0,1],[2,3],… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是,在滚动窗口 [6,7] 中,由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素,因此该窗口不会输出任何内容。

下面我们一起看一下如何实现上图所示的滚动窗口 Join:

// 绿色流
DataStream<Tuple3<String, String, String>> greenStream = greenSource.map(new MapFunction<String, Tuple3<String, String, String>>() {
    @Override
    public Tuple3<String, String, String> map(String str) throws Exception {
        String[] params = str.split(",");
        String key = params[0];
        String value = params[1];
        String eventTime = params[2];
        LOG.info("[绿色流] Key: {}, Value: {}, EventTime: {}", key, value, eventTime);
        return new Tuple3<>(key, value, eventTime);
    }
}).assignTimestampsAndWatermarks(
        WatermarkStrategy.<Tuple3<String, String, String>>forBoundedOutOfOrderness(Duration.ofMillis(100))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, String>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, String> element, long recordTimestamp) {
                        Long timeStamp = 0L;
                        try {
                            timeStamp = DateUtil.date2TimeStamp(element.f2, "yyyy-MM-dd HH:mm:ss");
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                        return timeStamp;
                    }
                })
);

// 橘色流
DataStream<Tuple3<String, String, String>> orangeStream = orangeSource.map(new MapFunction<String, Tuple3<String, String, String>>() {
    @Override
    public Tuple3<String, String, String> map(String str) throws Exception {
        String[] params = str.split(",");
        String key = params[0];
        String value = params[1];
        String eventTime = params[2];
        LOG.info("[橘色流] Key: {}, Value: {}, EventTime: {}", key, value, eventTime);
        return new Tuple3<>(key, value, eventTime);
    }
}).assignTimestampsAndWatermarks(
        WatermarkStrategy.<Tuple3<String, String, String>>forBoundedOutOfOrderness(Duration.ofMillis(100))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, String>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, String> element, long recordTimestamp) {
                        Long timeStamp = 0L;
                        try {
                            timeStamp = DateUtil.date2TimeStamp(element.f2, "yyyy-MM-dd HH:mm:ss");
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                        return timeStamp;
                    }
                })
);

// 双流合并
DataStream<String> result = orangeStream.join(greenStream)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(TumblingEventTimeWindows.of(Time.seconds(2)))
    .apply(new JoinFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, String>() {
        @Override
        public String join(Tuple3<String, String, String> first, Tuple3<String, String, String> second) throws Exception {
            LOG.info("[合并流] Key: {}, Value: {}, EventTime: {}",
                    first.f0, first.f1 + "," + second.f1, first.f2 + "," + second.f2
            );
            return first.f1 + "," + second.f1;
        }
    });

完整代码请查阅:TumblingWindowJoinExample

如上代码所示为绿色流和橘色流指定 BoundedOutOfOrdernessWatermarks Watermark 策略,设置100毫秒的最大可容忍的延迟时间,同时也会为流分配事件时间戳。假设输入流为 格式,两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,9,2021-03-26 12:09:09

橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,9,2021-03-26 12:09:09

Join 效果如下所示:

1.2 滑动窗口Join

当在滑动窗口上进行 Join 时,所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。

如上图所示,我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是,一个元素可能会落在不同的窗口中,因此会在不同窗口中发生关联,例如,绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素,则不会输出该元素。

下面我们一起看一下如何实现上图所示的滑动窗口 Join:

DataStream<String> result = orangeStream.join(greenStream)
      .where(tuple -> tuple.f0)
      .equalTo(tuple -> tuple.f0)
      .window(SlidingEventTimeWindows.of(Time.seconds(2) /* size */, Time.seconds(1) /* slide */))
      .apply(new JoinFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, String>() {
          @Override
          public String join(Tuple3<String, String, String> first, Tuple3<String, String, String> second) throws Exception {
              LOG.info("[合并流] Key: {}, Value: {}, EventTime: {}",
                      first.f0, first.f1 + "," + second.f1, first.f2 + "," + second.f2
              );
              return first.f1 + "," + second.f1;
          }
      });

完整代码请查阅:SlidingWindowJoinExample

假设输入流为 格式,两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,9,2021-03-26 12:09:09

橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,9,2021-03-26 12:09:09

Join 效果如下所示:

1.3 会话窗口Join

当在会话窗口上进行 Join 时,所有有相同 Key 并且位于同一会话窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。

如上图所示,我们定义了一个会话窗口,其中每个会话之间的间隔至少为1秒。上图中一共有三个会话,在前两个会话中,两个流中的元素两两组合传递给 JoinFunction。在第三个会话中,绿色流中没有元素,因此元素 8 和 9 不会发生Join。

下面我们一起看一下如何实现上图所示的滑动窗口 Join:

DataStream<String> result = orangeStream.join(greenStream)
        .where(tuple -> tuple.f0)
        .equalTo(tuple -> tuple.f0)
        .window(EventTimeSessionWindows.withGap(Time.seconds(1)))
        .apply(new JoinFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, String>() {
            @Override
            public String join(Tuple3<String, String, String> first, Tuple3<String, String, String> second) throws Exception {
                LOG.info("[合并流] Key: {}, Value: {}, EventTime: {}",
                        first.f0, first.f1 + "," + second.f1, first.f2 + "," + second.f2
                );
                return first.f1 + "," + second.f1;
            }
        });

完整代码请查阅:SessionWindowJoinExample

假设输入流为 格式,两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,11,2021-03-26 12:09:11

橘色流:
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,5,2021-03-26 12:09:05
key,6,2021-03-26 12:09:06
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11

Join 效果如下所示:

2. CoGroup

CoGroup 算子是将两条数据流按照 Key 进行分组,然后将相同 Key 的数据进行处理。要实现 CoGroup 功能需要为两个输入流分别指定 KeySelector 和 WindowAssigner。它的调用方式类似于 Join 算子,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流或者右流的数据,基于此我们可以实现内连接(InnerJoin)、左连接(LeftJoin)以及右连接(RightJoin)。

目前,这些分组中的数据是在内存中保存的,因此需要确保保存的数据量不能太大,否则,JVM 可能会崩溃。

CoGroup 通用用法如下:

stream.coGroup(otherStream)
		.where(<KeySelector>)
		.equalTo(<KeySelector>)
		.window(<WindowAssigner>)
		.apply(<CoGroupFunction>);

下面我们看一下如何使用 CoGroup 算子实现内连接(InnerJoin)、左连接(LeftJoin)以及右连接(RightJoin)。

2.1 InnerJoin

下面我们看一下如何使用 CoGroup 实现内连接:

如上图所示,我们定义了一个大小为 2 秒的滚动窗口。InnerJoin 只有在两个流对应窗口中都存在元素时,才会输出。

我们以滚动窗口为例来实现 InnerJoin

// Join流
CoGroupedStreams coGroupStream = greenStream.coGroup(orangeStream);
DataStream<String> result = coGroupStream
        // 绿色流
        .where(new KeySelector<Tuple3<String, String, String>, String>() {
            @Override
            public String getKey(Tuple3<String, String, String> tuple3) throws Exception {
                return tuple3.f0;
            }
        })
        // 橘色流
        .equalTo(new KeySelector<Tuple3<String, String, String>, String>() {
            @Override
            public String getKey(Tuple3<String, String, String> tuple3) throws Exception {
                return tuple3.f0;
            }
        })
        // 滚动窗口
        .window(TumblingEventTimeWindows.of(Time.seconds(2)))
        .apply(new InnerJoinFunction());

// 内连接
private static class InnerJoinFunction implements CoGroupFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, String> {
    @Override
    public void coGroup(Iterable<Tuple3<String, String, String>> greenIterable, Iterable<Tuple3<String, String, String>> orangeIterable, Collector<String> collector) throws Exception {
        for (Tuple3<String, String, String> greenTuple : greenIterable) {
            for (Tuple3<String, String, String> orangeTuple : orangeIterable) {
                LOG.info("[Join流] Key : {}, Value: {}, EventTime: {}",
                        greenTuple.f0, greenTuple.f1 + ", " + orangeTuple.f1, greenTuple.f2 + ", " + orangeTuple.f2
                );
                collector.collect(greenTuple.f1 + ", " + orangeTuple.f1);
            }
        }
    }
}

完整代码请查阅:CoGroupJoinExample

如上代码所示,我们实现了 CoGroupFunction 接口,重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable),本示例中绿色流为 greenIterable,橘色流为 orangeIterable,如果要实现 InnerJoin ,只需要两个迭代器中的元素两两组合即可。两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11

橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11

Join 效果如下所示:

2.2 LeftJoin

下面我们看一下如何使用 CoGroup 实现左连接:

如上图所示,我们定义了一个大小为 2 秒的滚动窗口。LeftJoin 只要绿色流窗口中有元素时,就会输出。即使在橘色流对应窗口中没有相对应的元素。

我们以滚动窗口为例来实现 LeftJoin

// Join流
CoGroupedStreams coGroupStream = greenStream.coGroup(orangeStream);
DataStream<String> result = coGroupStream
        // 绿色流
        .where(new KeySelector<Tuple3<String, String, String>, String>() {
            @Override
            public String getKey(Tuple3<String, String, String> tuple3) throws Exception {
                return tuple3.f0;
            }
        })
        // 橘色流
        .equalTo(new KeySelector<Tuple3<String, String, String>, String>() {
            @Override
            public String getKey(Tuple3<String, String, String> tuple3) throws Exception {
                return tuple3.f0;
            }
        })
        // 滚动窗口
        .window(TumblingEventTimeWindows.of(Time.seconds(2)))
        .apply(new LeftJoinFunction());

// 左连接
private static class LeftJoinFunction implements CoGroupFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, String> {
    @Override
    public void coGroup(Iterable<Tuple3<String, String, String>> greenIterable, Iterable<Tuple3<String, String, String>> orangeIterable, Collector<String> collector) throws Exception {
        for (Tuple3<String, String, String> greenTuple : greenIterable) {
            boolean noElements = true;
            for (Tuple3<String, String, String> orangeTuple : orangeIterable) {
                noElements = false;
                LOG.info("[Join流] Key : {}, Value: {}, EventTime: {}",
                        greenTuple.f0, greenTuple.f1 + ", " + orangeTuple.f1, greenTuple.f2 + ", " + orangeTuple.f2
                );
                collector.collect(greenTuple.f1 + ", " + orangeTuple.f1);
            }
            if (noElements){
                LOG.info("[Join流] Key : {}, Value: {}, EventTime: {}",
                        greenTuple.f0, greenTuple.f1 + ", null", greenTuple.f2 + ", null"
                );
                collector.collect(greenTuple.f1 + ", null");
            }
        }
    }
}

完整代码请查阅:CoGroupJoinExample

如上代码所示,我们实现了 CoGroupFunction 接口,重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable),本示例中绿色流为 greenIterable,橘色流为 orangeIterable,如果要实现 LeftJoin ,需要保证 orangeIterable 中没有元素,greenIterable 中的元素也能输出。因此我们定义了一个 noElements 变量来判断 orangeIterable 是否有元素,如果 orangeIterable 中没有元素,单独输出 greenIterable 中的元素即可。Join 效果如下所示:

2.3 RightJoin

下面我们看一下如何使用 CoGroup 实现右连接:

如上图所示,我们定义了一个大小为 2 秒的滚动窗口。LeftJoin 只要橘色流窗口中有元素时,就会输出。即使在绿色流对应窗口中没有相对应的元素。

我们以滚动窗口为例来实现 RightJoin

// Join流
CoGroupedStreams coGroupStream = greenStream.coGroup(orangeStream);
DataStream<String> result = coGroupStream
        // 绿色流
        .where(new KeySelector<Tuple3<String, String, String>, String>() {
            @Override
            public String getKey(Tuple3<String, String, String> tuple3) throws Exception {
                return tuple3.f0;
            }
        })
        // 橘色流
        .equalTo(new KeySelector<Tuple3<String, String, String>, String>() {
            @Override
            public String getKey(Tuple3<String, String, String> tuple3) throws Exception {
                return tuple3.f0;
            }
        })
        // 滚动窗口
        .window(TumblingEventTimeWindows.of(Time.seconds(2)))
        .apply(new RightJoinFunction());

// 右连接
private static class RightJoinFunction implements CoGroupFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, String> {
    @Override
    public void coGroup(Iterable<Tuple3<String, String, String>> greenIterable, Iterable<Tuple3<String, String, String>> orangeIterable, Collector<String> collector) throws Exception {
        for (Tuple3<String, String, String> orangeTuple : orangeIterable) {
            boolean noElements = true;
            for (Tuple3<String, String, String> greenTuple : greenIterable) {
                noElements = false;
                LOG.info("[Join流] Key : {}, Value: {}, EventTime: {}",
                        greenTuple.f0, greenTuple.f1 + ", " + orangeTuple.f1, greenTuple.f2 + ", " + orangeTuple.f2
                );
                collector.collect(greenTuple.f1 + ", " + orangeTuple.f1);
            }
            if (noElements) {
                LOG.info("[Join流] Key : {}, Value: {}, EventTime: {}",
                        orangeTuple.f0, "null, " + orangeTuple.f1, "null, " + orangeTuple.f2
                );
                collector.collect("null, " + orangeTuple.f2);
            }
        }
    }
}

完整代码请查阅:CoGroupJoinExample

如上代码所示,我们实现了 CoGroupFunction 接口,重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable),本示例中绿色流为 greenIterable,橘色流为 orangeIterable,如果要实现 RightJoin,实现原理跟 LeftJoin 一样,需要保证 greenIterable 中没有元素,orangeIterable 中的元素也能输出。因此我们定义了一个 noElements 变量来判断 greenIterable 是否有元素,如果 greenIterable 中没有元素,单独输出 orangeIterable 中的元素即可。Join 效果如下所示:

3. Interval Join

Flink 中基于 DataStream 的 Join,只能实现在同一个窗口的两个数据流进行 Join,但是在实际中常常会存在数据乱序或者延时的情况,导致两个流的数据进度不一致,就会出现数据跨窗口的情况,那么数据就无法在同一个窗口内 Join。Flink 基于 KeyedStream 提供的 Interval Join 机制可以对两个 keyedStream 进行 Join, 按照相同的 key 在一个相对数据时间的时间段内进行 Join。按照指定字段以及右流相对左流偏移的时间区间进行关联:

b.timestamp ∈ [a.timestamp + lowerBound, a.timestamp + upperBound]

或者

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

其中a和b分别是上图中绿色流和橘色流中的元素,并且有相同的 key。只需要保证 lowerBound 永远小于等于 upperBound 即可,均可以为正数或者负数。

从上面可以看出绿色流可以晚到 lowerBound(lowerBound为负的话)时间,也可以早到 upperBound(upperBound为正的话)时间。也可以理解为橘色流中的每个元素可以和绿色流指中定区间的元素进行 Join。需要注意的是 Interval Join 当前仅支持事件时间:

public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {
			if (timeBehaviour != TimeBehaviour.EventTime) {
				throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
			}
}

下面我们具体看看如何实现一个 Interval Join:

// 绿色流
DataStream<Tuple3<String, String, String>> greenStream = greenSource.map(new MapFunction<String, Tuple3<String, String, String>>() {
    @Override
    public Tuple3<String, String, String> map(String str) throws Exception {
        String[] params = str.split(",");
        String key = params[0];
        String eventTime = params[2];
        String value = params[1];
        LOG.info("[绿色流] Key: {}, Value: {}, EventTime: {}", key, value, eventTime);
        return new Tuple3<>(key, value, eventTime);
    }
}).assignTimestampsAndWatermarks(
        // 需要指定Watermark
        WatermarkStrategy.<Tuple3<String, String, String>>forBoundedOutOfOrderness(Duration.ofMillis(100))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, String>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, String> element, long recordTimestamp) {
                        Long timeStamp = null;
                        try {
                            timeStamp = DateUtil.date2TimeStamp(element.f2, "yyyy-MM-dd HH:mm:ss");
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                        return timeStamp;
                    }
                })
);

// 橘色流
DataStream<Tuple3<String, String, String>> orangeStream = orangeSource.map(new MapFunction<String, Tuple3<String, String, String>>() {
    @Override
    public Tuple3<String, String, String> map(String str) throws Exception {
        String[] params = str.split(",");
        String key = params[0];
        String value = params[1];
        String eventTime = params[2];
        LOG.info("[橘色流] Key: {}, Value: {}, EventTime: {}", key, value, eventTime);
        return new Tuple3<>(key, value, eventTime);
    }
}).assignTimestampsAndWatermarks(
        // 需要指定Watermark
        WatermarkStrategy.<Tuple3<String, String, String>>forBoundedOutOfOrderness(Duration.ofMillis(100))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, String>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, String> element, long recordTimestamp) {
                        Long timeStamp = null;
                        try {
                            timeStamp = DateUtil.date2TimeStamp(element.f2, "yyyy-MM-dd HH:mm:ss");
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                        return timeStamp;
                    }
                })
);

KeySelector<Tuple3<String, String, String>, String> keySelector = new KeySelector<Tuple3<String, String, String>, String>() {
    @Override
    public String getKey(Tuple3<String, String, String> value) throws Exception {
        return value.f0;
    }
};

// 双流合并
DataStream result = orangeStream
    .keyBy(keySelector)
    .intervalJoin(greenStream.keyBy(keySelector))
    .between(Time.seconds(-2), Time.seconds(1))
    .process(new ProcessJoinFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, String>() {
        @Override
        public void processElement(Tuple3<String, String, String> left,
                                   Tuple3<String, String, String> right,
                                   Context ctx, Collector<String> out) throws Exception {
            LOG.info("[合并流] Key: {}, Value: {}, EventTime: {}",
                    left.f0, "[" + left.f1 + ", " + right.f1 + "]",
                    "[" + right.f2 + "|" + ctx.getRightTimestamp() + ", " + right.f2 + "|" + ctx.getLeftTimestamp() + "]"
            );
            out.collect(left.f1 + ", " + right.f1);
        }
    });

需要注意的是 Interval Join 当前仅支持事件时间,所以需要为流指定事件时间戳。

完整代码请查阅:IntervalJoinExample

两条流输入元素如下所示:

绿色流:
c,0,2021-03-23 12:09:00
c,1,2021-03-23 12:09:01
c,6,2021-03-23 12:09:06
c,7,2021-03-23 12:09:07

橘色流:
c,0,2021-03-23 12:09:00
c,2,2021-03-23 12:09:02
c,3,2021-03-23 12:09:03
c,4,2021-03-23 12:09:04
c,5,2021-03-23 12:09:05
c,7,2021-03-23 12:09:07

Join 效果如下所示:

参考:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink1.4 数据流类型与转换关系

    Flink 为流处理和批处理分别提供了 DataStream API 和 DataSet API。正是这种高层的抽象和 flunent API 极大地便利了用户...

    smartsi
  • Flink Session Window 六个灵魂拷问

    与翻滚窗口(Tumbling Window)和滑动窗口(Sliding Window)相比,会话窗口(Session Window)不重叠并且没有固定的开始和结...

    kk大数据
  • Flink算子使用方法及实例演示:union和connect

    Flink的Transformation转换主要包括四种:单数据流基本转换、基于Key的分组转换、多数据流转换和数据重分布转换。读者可以使用Flink Scal...

    PP鲁
  • Flink重点难点:维表关联理论和Join实战

    数据流操作的另一个常见需求是对两条数据流中的事件进行联结(connect)或Join。Flink DataStream API中内置有两个可以根据时间条件对数据...

    王知无-import_bigdata
  • Flink学习笔记

    流式计算是大数据计算的痛点,第1代实时计算引擎Storm对Exactly Once 语义和窗口支持较弱,使用的场景有限且无法支持高吞吐计算;Spark Stre...

    数据社
  • Apache Flink 1.12.0 正式发布,流批一体真正统一运行!

    Apache Flink 社区很荣幸地宣布 Flink 1.12.0 版本正式发布!近 300 位贡献者参与了 Flink 1.12.0 的开发,提交了超过 1...

    zhisheng
  • 【Flink】Flink流应用开发

    (1)Flink简介 Flink是一个低延迟、高吞吐、统一的大数据分布式实时计算引擎,使用官网的一句话来介绍Flink就是“Stateful Computat...

    魏晓蕾
  • Flink双流处理:实时对账实现

    更多内容详见:https://github.com/pierre94/flink-notes

    皮皮熊
  • 数栈技术分享:开源·数栈-扩展FlinkSQL实现流与维表的join

    SQL是数据处理中使用最广泛的语言。它允许用户简明扼要地声明他们的业务逻辑。大数据批计算使用SQL很常见,但是支持SQL的实时计算并不多。其实,用SQL开发实时...

    数栈DTinsight
  • 零基础学Flink:Join两个流

    《零基础学Flink》这个系列已经做了不少篇了,接下来几章会更加贴近案例来说明一些功能,今天我们先来说说如何将两个流join起来。这次我们以实时汇率和订单流合并...

    麒思妙想
  • [源码分析] 带你梳理 Flink SQL / Table API内部执行流程

    本文将简述Flink SQL / Table API的内部实现,为大家把 "从SQL语句到具体执行" 这个流程串起来。并且尽量多提供调用栈,这样大家在遇到问题时...

    罗西的思考
  • 全网第一 | Flink学习面试灵魂40问答案!

    Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的A...

    大数据真好玩
  • Flink入门(四)——编程模型

    flink是一款开源的大数据流式处理框架,他可以同时批处理和流处理,具有容错性、高吞吐、低延迟等优势,本文简述flink的编程模型。

    实时计算
  • 硬核!一文学完Flink流计算常用算子(Flink算子大全)

    Flink和Spark类似,也是一种一站式处理的框架;既可以进行批处理(DataSet),也可以进行实时处理(DataStream)。

    五分钟学大数据
  • FlinkSQL演进过程,解析原理及一些优化策略

    flink 1.9之前的版本,对于Table API和SQL的底层实现结构如下图,可以看处流处理和批处理有各自独立的api (流处理DataStream,批处理...

    Spark学习技巧
  • Flink-1.9流计算开发:二、Map函数

    整体来讲一个流处理过程可以划分为三部分DataSource、Transformations、Sinks。DataSource用来产生或者获取数据流,Transf...

    cosmozhu
  • Flink面试通关手册「160题升级版」

    主要是当Flink开启Checkpoint的时候,会往Source端插入一条barrir,然后这个barrir随着数据流向一直流动,当流入到一个算子的时候,这个...

    大数据真好玩
  • Flink重点难点:Flink Table&SQL必知必会(一)

    Flink本身是批流统一的处理框架,所以Table API和SQL,就是批流统一的上层处理API。目前功能尚未完善,处于活跃的开发阶段。

    王知无-import_bigdata
  • Flink-1.9流计算开发:四、filter函数

    在本篇文章中我们接着来说filter函数,此函数主要作用就是根据用户条件,过滤数据流中数据。

    cosmozhu

扫码关注云+社区

领取腾讯云代金券