前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink-1.9流计算开发:十五、join函数

Flink-1.9流计算开发:十五、join函数

作者头像
cosmozhu
修改2020-06-15 09:31:53
8150
修改2020-06-15 09:31:53
举报
文章被收录于专栏:cosmozhu技术篇cosmozhu技术篇

flink join,Flink是下一代大数据计算平台,可处理流计算和批量计算。《Flink-1.9流计算开发:十五、join函数》cosmozhu写的本系列文章的第十五篇。通过简单的DEMO来演示join函数执行的效果 。

需求

当前有一个订单流(每秒1个订单),一个人民币-美元汇率流(每10秒发布一个新汇率)。实时计算一个汇率窗口期(10秒)内,订单的外汇金额。

解决方案

代码语言:javascript
复制
public class StreamTest {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTest.class);

    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //CNY -> USD 汇率流
        SingleOutputStreamOperator<ExchangeRateInfo> usdToCny = env.addSource(new ExchangeRateDataSource(CurrencyType.CNY, CurrencyType.USD, 7, 6),"USD-CNY")
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ExchangeRateInfo>(Time.milliseconds(100)) {
                    @Override
                    public long extractTimestamp(ExchangeRateInfo element) {
                        return element.getTimeStamp().getTime();
                    }
                });
        //订单流
        SingleOutputStreamOperator<OrderInfo> orderDs = env.addSource(new OrderDataSource())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderInfo>(Time.milliseconds(100)) {
                    @Override
                    public long extractTimestamp(OrderInfo element) {
                        return element.getTimeStamp().getTime();
                    }
                });

        //订单流inner join 汇率流
        orderDs.join(usdToCny)
        .where(new KeySelector<OrderInfo, CurrencyType>() {
            private static final long serialVersionUID = 1L;
            @Override
            public CurrencyType getKey(OrderInfo value) throws Exception {
                return value.getCurrencyType();
            }
        })
        .equalTo(new KeySelector<ExchangeRateInfo, CurrencyType>() {
            private static final long serialVersionUID = 1L;
            @Override
            public CurrencyType getKey(ExchangeRateInfo value) throws Exception {
                return value.getFrom();
            }
        })
        //转换20s窗口期内的订单金额为美元
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        //将订单金额人民币转换为美元
        .apply(new JoinFunction<OrderInfo, ExchangeRateInfo, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public String join(OrderInfo first, ExchangeRateInfo second) throws Exception {
                return "$"+first.getTotalAmt().divide(second.getCoefficient(), 2,BigDecimal.ROUND_HALF_UP).toPlainString();
            }
        })
        .print();

        env.execute("Flink Streaming Java API Skeleton");
    }
}

执行效果

代码语言:javascript
复制
3> $125.05
3> $625.79
3> $95.79
3> $587.56
3> $387.35
3> $278.63
3> $40.45
3> $578.36
3> $15.48
3> $236.96

小结

Flink的join和传统数据库的join有相似之处。上面代码如果用SQL语句可以表示为:

代码语言:javascript
复制
select * from OrderData a,ExchangeRate b where a.CurrencyType = b.From;

本例在窗口期(10秒)中对金额做转换的流程如下图所示:

代码地址

代码语言:javascript
复制
https://github.com/chaoxxx/learn-flink-stream-api/blob/master/src/main/java/fun/cosmozhu/session15/main/StreamTest.java

作者:cosmozhu --90后的老父亲,专注于保护地球的程序员

个人网站:https://www.cosmozhu.fun

欢迎转载,转载时请注明出处。

相关文章

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 需求
  • 解决方案
  • 执行效果
  • 小结
  • 代码地址
    • 相关文章
    相关产品与服务
    对象存储
    对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档