前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink-1.9流计算开发:十六、intervalJoin函数

Flink-1.9流计算开发:十六、intervalJoin函数

作者头像
cosmozhu
修改2020-06-15 09:31:35
8220
修改2020-06-15 09:31:35
举报
文章被收录于专栏:cosmozhu技术篇cosmozhu技术篇

1+

flink intervalJoin,Flink是下一代大数据计算平台,可处理流计算和批量计算。《Flink-1.9流计算开发:十六、intervalJoin函数》cosmozhu写的本系列文章的第十六篇。通过简单的DEMO来演示flink intervalJoin函数执行的效果 。如果您还不了解join如何使用,请跳到我上一篇文章Flink-1.9流计算开发:十五、join函数

需求

  1. 当前有一个CNY->USD的汇率流,数据流速为1/s;
  2. 当前有一个订单流,数据流速为1/10s;
  3. 实时计算订单汇率金额。

解决方案

代码语言:javascript
复制
public class StreamTest {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTest.class);
    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //CNY -> USD 汇率流
        SingleOutputStreamOperator<ExchangeRateInfo> usdToCny = env.addSource(new ExchangeRateDataSource(CurrencyType.CNY, CurrencyType.USD, 7, 6),"USD-CNY")
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ExchangeRateInfo>(Time.milliseconds(100)) {
                    @Override
                    public long extractTimestamp(ExchangeRateInfo element) {
                        return element.getTimeStamp().getTime();
                    }
                });
        //订单流
        SingleOutputStreamOperator<OrderInfo> orderDs = env.addSource(new OrderDataSource())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderInfo>(Time.milliseconds(100)) {
                    @Override
                    public long extractTimestamp(OrderInfo element) {
                        return element.getTimeStamp().getTime();
                    }
                });

        KeyedStream<ExchangeRateInfo, CurrencyType> usdToCnyKeyedStream = usdToCny.keyBy((KeySelector<ExchangeRateInfo, CurrencyType>) (ExchangeRateInfo value) -> {return value.getFrom();});
        KeyedStream<OrderInfo, CurrencyType> orderDsKeyedStream = orderDs.keyBy((KeySelector<OrderInfo, CurrencyType>) (OrderInfo order) -> {return order.getCurrencyType();});

        //订单流inner join 汇率流
        usdToCnyKeyedStream.intervalJoin(orderDsKeyedStream)
        .between(Time.milliseconds(-500), Time.milliseconds(500))
        .upperBoundExclusive()
        .lowerBoundExclusive()
        .process(new ProcessJoinFunction<ExchangeRateInfo, OrderInfo, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void processElement(ExchangeRateInfo arg0, OrderInfo arg1,
                    ProcessJoinFunction<ExchangeRateInfo, OrderInfo, String>.Context arg2, Collector<String> arg3)
                    throws Exception {
                arg3.collect(arg1.getTotalAmt().divide(arg0.getCoefficient(),2,BigDecimal.ROUND_HALF_UP).toPlainString());
            }

        })
        .print();

        env.execute("Flink Streaming Java API Skeleton");
    }
}

执行效果

代码语言:javascript
复制
10:18:27,672 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.20, timeStamp=Tue May 12 10:18:27 GMT+08:00 2020]
10:18:29,212 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.91, timeStamp=Tue May 12 10:18:29 GMT+08:00 2020]
10:18:30,213 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.43, timeStamp=Tue May 12 10:18:30 GMT+08:00 2020]
10:18:31,214 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.92, timeStamp=Tue May 12 10:18:31 GMT+08:00 2020]
10:18:32,214 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.77, timeStamp=Tue May 12 10:18:32 GMT+08:00 2020]
10:18:33,215 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.99, timeStamp=Tue May 12 10:18:33 GMT+08:00 2020]
10:18:34,215 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.90, timeStamp=Tue May 12 10:18:34 GMT+08:00 2020]
10:18:35,216 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.89, timeStamp=Tue May 12 10:18:35 GMT+08:00 2020]
10:18:36,216 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.42, timeStamp=Tue May 12 10:18:36 GMT+08:00 2020]
10:18:36,683 INFO  fun.cosmozhu.session16.datasource.OrderDataSource             - OrderInfo [orderNo=20200512101836994, timeStamp=Tue May 12 10:18:36 GMT+08:00 2020, totalAmt=0, goods=[], currencyType=CNY]
3> 0.00
10:18:37,216 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.48, timeStamp=Tue May 12 10:18:37 GMT+08:00 2020]
10:18:38,216 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.69, timeStamp=Tue May 12 10:18:38 GMT+08:00 2020]
10:18:39,217 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.28, timeStamp=Tue May 12 10:18:39 GMT+08:00 2020]
10:18:40,217 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.09, timeStamp=Tue May 12 10:18:40 GMT+08:00 2020]
10:18:41,217 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.49, timeStamp=Tue May 12 10:18:41 GMT+08:00 2020]
10:18:42,218 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.74, timeStamp=Tue May 12 10:18:42 GMT+08:00 2020]
10:18:43,219 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.68, timeStamp=Tue May 12 10:18:43 GMT+08:00 2020]
10:18:44,220 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.52, timeStamp=Tue May 12 10:18:44 GMT+08:00 2020]
10:18:45,220 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.60, timeStamp=Tue May 12 10:18:45 GMT+08:00 2020]
10:18:46,221 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.40, timeStamp=Tue May 12 10:18:46 GMT+08:00 2020]
10:18:46,781 INFO  fun.cosmozhu.session16.datasource.OrderDataSource             - OrderInfo [orderNo=20200512101846394, timeStamp=Tue May 12 10:18:46 GMT+08:00 2020, totalAmt=3319.07, goods=[Goods [goodsType=watermellon, unitPrice=78.05, currencyType=CNY, num=9], Goods [goodsType=grape, unitPrice=58.54, currencyType=CNY, num=10], Goods [goodsType=grape, unitPrice=28.34, currencyType=CNY, num=0], Goods [goodsType=pitaya, unitPrice=4.82, currencyType=CNY, num=15], Goods [goodsType=grape, unitPrice=99.16, currencyType=CNY, num=9], Goods [goodsType=grape, unitPrice=25.62, currencyType=CNY, num=10], Goods [goodsType=grape, unitPrice=55.31, currencyType=CNY, num=12], Goods [goodsType=apple, unitPrice=9.16, currencyType=CNY, num=16]], currencyType=CNY]
10:18:47,221 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.33, timeStamp=Tue May 12 10:18:47 GMT+08:00 2020]
3> 524.34
10:18:48,221 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.18, timeStamp=Tue May 12 10:18:48 GMT+08:00 2020]
10:18:49,222 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.26, timeStamp=Tue May 12 10:18:49 GMT+08:00 2020]
10:18:50,222 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.41, timeStamp=Tue May 12 10:18:50 GMT+08:00 2020]
10:18:51,223 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.79, timeStamp=Tue May 12 10:18:51 GMT+08:00 2020]
10:18:52,223 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.07, timeStamp=Tue May 12 10:18:52 GMT+08:00 2020]
10:18:53,224 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.86, timeStamp=Tue May 12 10:18:53 GMT+08:00 2020]
10:18:54,224 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.45, timeStamp=Tue May 12 10:18:54 GMT+08:00 2020]
10:18:55,224 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.86, timeStamp=Tue May 12 10:18:55 GMT+08:00 2020]
10:18:56,224 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.58, timeStamp=Tue May 12 10:18:56 GMT+08:00 2020]
10:18:56,786 INFO  fun.cosmozhu.session16.datasource.OrderDataSource             - OrderInfo [orderNo=20200512101856529, timeStamp=Tue May 12 10:18:56 GMT+08:00 2020, totalAmt=4235.92, goods=[Goods [goodsType=pear, unitPrice=48.05, currencyType=CNY, num=18], Goods [goodsType=apple, unitPrice=85.53, currencyType=CNY, num=14], Goods [goodsType=grape, unitPrice=33.07, currencyType=CNY, num=10], Goods [goodsType=pear, unitPrice=19.65, currencyType=CNY, num=16], Goods [goodsType=grape, unitPrice=90.81, currencyType=CNY, num=16], Goods [goodsType=apple, unitPrice=25.18, currencyType=CNY, num=3]], currencyType=CNY]
10:18:57,223 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.83, timeStamp=Tue May 12 10:18:57 GMT+08:00 2020]
3> 620.19
10:18:58,223 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.17, timeStamp=Tue May 12 10:18:58 GMT+08:00 2020]
10:18:59,223 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.72, timeStamp=Tue May 12 10:18:59 GMT+08:00 2020]
10:19:00,223 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.67, timeStamp=Tue May 12 10:19:00 GMT+08:00 2020]
10:19:01,224 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.69, timeStamp=Tue May 12 10:19:01 GMT+08:00 2020]
10:19:02,224 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.23, timeStamp=Tue May 12 10:19:02 GMT+08:00 2020]
10:19:03,224 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.74, timeStamp=Tue May 12 10:19:03 GMT+08:00 2020]
10:19:04,224 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.81, timeStamp=Tue May 12 10:19:04 GMT+08:00 2020]
10:19:05,224 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.44, timeStamp=Tue May 12 10:19:05 GMT+08:00 2020]
10:19:06,224 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.42, timeStamp=Tue May 12 10:19:06 GMT+08:00 2020]
10:19:06,787 INFO  fun.cosmozhu.session16.datasource.OrderDataSource             - OrderInfo [orderNo=20200512101906547, timeStamp=Tue May 12 10:19:06 GMT+08:00 2020, totalAmt=0, goods=[], currencyType=CNY]
10:19:07,223 INFO  fun.cosmozhu.session16.datasource.ExchangeRateDataSource      - ExchangeRateInfo [from=CNY, to=USD, coefficient=6.91, timeStamp=Tue May 12 10:19:07 GMT+08:00 2020]
3> 0.00

小结

intervalJoin(KeyedStream<T1, KEY> otherStream) 此函数是将两个流通过时间间隔关联。

between(Time lowerBound, Time upperBound) 设置关联的时间间隔。在此例中即为{ 订单.时间戳+lowerBound <= 汇率.时间戳 <= 订单.时间戳+upperBound }

upperBoundExclusive 代表不包含上线的边界值

lowerBoundExclusive 代表不包含下线的边界值

代码地址

代码语言:javascript
复制
https://github.com/chaoxxx/learn-flink-stream-api/blob/master/src/main/java/fun/cosmozhu/session16/main/StreamTest.java

作者:cosmozhu --90后的老父亲,专注于保护地球的程序员

个人网站:https://www.cosmozhu.fun

欢迎转载,转载时请注明出处。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 需求
  • 解决方案
  • 执行效果
  • 小结
  • 代码地址
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档