首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往
您找到你想要的搜索结果了吗?
是的
没有找到

Flink通过异步IO实现redis维表join

使用flink做实时数仓的公司越来越多了,浪尖这边也是很早就开发了一个flink 全sql平台来实现实时数仓的功能。说到实时数仓,两个表的概念大家一定会知道的:事实表和维表。...当flink 事实表需要 使用维表来进行染色的时候,就需要flink 与维表进行join,这是需要注意与外部系统的通信延迟不会影响流应用程序的整体工作。...; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.AsyncDataStream...; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010...AsyncIOSideTableJoinRedis.class.getCanonicalName()); } private static class SampleAsyncFunction extends RichAsyncFunction

3.4K40

2021年大数据Flink(四十六):扩展阅读 异步IO

; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction...return category;     } } /**  * 方式一:Java-vertx中提供的异步client实现异步IO  */ class ASyncIOFunction1 extends RichAsyncFunction...Collections.singleton(input));     } } /**  * 方式二:同步调用+线程池模拟异步IO  */ class ASyncIOFunction2 extends RichAsyncFunction...; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.slf4j.Logger; import...效率比连接池要高  * 1)在java版本中可以直接使用  * 2)如果在scala版本中使用的话,需要scala的版本是2.12+  */ class AsyncRedisByVertx extends RichAsyncFunction

1.3K20

flink维表关联系列之Hbase维表关联:LRU策略

维表关联系列目录: 一、维表服务与Flink异步IO 二、Mysql维表关联:全量加载 三、Hbase维表关联:LRU策略 四、Redis维表关联:实时查询 五、kafka维表关联:广播方式 六、自定义异步查询...在Flink中做维表关联时,如果维表的数据比较大,无法一次性全部加载到内存中,而在业务上也允许一定数据的延时,那么就可以使用LRU策略加载维表数据。...可配置淘汰策略 非常适用于Flink维表关联LRU策略,使用方式: cache = CacheBuilder.newBuilder() .maximumSize(1000...LRU方式读取Hbase 实现思路: 使用Flink 异步IO RichAsyncFunction去异步读取hbase的数据,那么需要hbase 客户端支持异步读取,默认hbase客户端是同步,可使用hbase...dependency> demo版: class HbaseAsyncLRU(zk: String, tableName: String, maxSize: Long, ttl: Long) extends RichAsyncFunction

1.1K21

Flink异步之矛-锋利的Async IO

String, String, String, Long>> operator = AsyncDataStream .unorderedWait(order, new RichAsyncFunction...从上面示例中可看到,我们在open()中创建连接对象,在close()方法中关闭连接,在RichAsyncFunction的asyncInvoke()方法中,直接查询数据库操作,并将数据返回出去。...Async I/O的原理和基本用法 简单的来说,使用 Async I/O 对应到 Flink 的 API 就是 RichAsyncFunction 这个抽象类,继层这个抽象类实现里面的open(初始化)...使用Async I/O,继承RichAsyncFunction(接口AsyncFunction的抽象类),重写或实现open(建立连接)、close(关闭连接)、asyncInvoke(异步调用)3个方法即可...Flink 1.9 中的优化 由于新合入的 Blink 相关功能,使得 Flink 1.9 实现维表功能很简单。如果你要使用该功能,那就需要自己引入 Blink 的 Planner。

1.1K20

Flink异步之矛-锋利的Async IO

String, String, String, Long>> operator = AsyncDataStream .unorderedWait(order, new RichAsyncFunction...从上面示例中可看到,我们在open()中创建连接对象,在close()方法中关闭连接,在RichAsyncFunction的asyncInvoke()方法中,直接查询数据库操作,并将数据返回出去。...Async I/O的原理和基本用法 简单的来说,使用 Async I/O 对应到 Flink 的 API 就是 RichAsyncFunction 这个抽象类,继层这个抽象类实现里面的open(初始化)...使用Async I/O,继承RichAsyncFunction(接口AsyncFunction的抽象类),重写或实现open(建立连接)、close(关闭连接)、asyncInvoke(异步调用)3个方法即可...Flink 1.9 中的优化 由于新合入的 Blink 相关功能,使得 Flink 1.9 实现维表功能很简单。如果你要使用该功能,那就需要自己引入 Blink 的 Planner。

1.2K20

数栈技术分享:用短平快的方式告诉你Flink-SQL的扩展实现

一、我们为什么扩展Flink-SQL? 由于Flink 本身SQL语法并不提供在对接输入源和输出目的的SQL语法。...我们的目的是在使用Flink-SQL的时候只需要关心做什么,而不需要关心怎么做。不需要过多的关心程序的实现,专注于业务逻辑。 接下来,我们一起来看下Flink-SQL的扩展实现吧!...二、扩展了哪些flink相关sql 1、创建源表语句 ​ 2、创建输出表语句 ​ 3、创建自定义函数 ​ 4、维表关联 ​ 三、各个模块是如何翻译到flink的实现 1、如何将创建源表的sql语句转换为...flink的operator Flink中表的都会映射到Table这个类。...这里我们选择阿里贡献给flink社区的算子RichAsyncFunction。该算子使用异步的方式从外部数据源获取数据,大大减少了花费在网络请求上的时间。

2.4K00

数栈技术分享:开源·数栈-扩展FlinkSQL实现流与维表的join

在去年,袋鼠云数栈V3.0版本研发期间,当时最新版本——flink1.6中FlinkSQL,已经将SQL的优势应用到Flink引擎中,但还未支持流与维表的JOIN。...FlinkSQL于2017年7月开始面向阿里巴巴集团开放流计算服务的,虽然是一个非常年轻的产品,但是到双11期间已经支撑了数千个作业,在双11期间,Blink 作业的处理峰值达到了5+亿每秒,而其中仅 Flink...三、FlinkSQL实现流与维表的join分步走 1、用Flink api实现维表的功能 要实现维表功能就要用到 Flink Aysnc I/O 这个功能,是由阿里巴巴贡献给Apache Flink的。...具体介绍可以看这篇文章:http://wuchong.me/blog/2017/05/17/flink-internals-async-io/ 对应到Flink 的api就是RichAsyncFunction...SQL解析的工具就是用Apache calcite,Flink也是用这个框架做SQL解析的。所以所有语法都是可以解析的。

85330

Flink重点难点:维表关联理论和Join实战

在阅读本文之前,你应该阅读过的系列: 《Flink重点难点:时间、窗口和流Join》 《Flink重点难点:网络流控和反压》 Flink官方文档中公开的信息 1 Join 的概念 在阅读之前请一定要先了解...案例你可以参考:《Flink重点难点:时间、窗口和流Join》 1.2 基于窗口的Join 顾名思义,基于窗口的Join需要用到Flink中的窗口机制。...; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import java.sql.DriverManager...orderedResult.print(); unorderedResult.print(); env.execute("joinDemo"); } //定义个类,继承RichAsyncFunction...实现异步查询存储在mysql里的维表 //输入用户名、城市ID,返回 Tuple3 static class JoinDemo3AyncFunction extends RichAsyncFunction

3.8K20

Flink异步IO第一讲

注意:通过增加MapFunction的到一个较大的并行度也是可以改善吞吐量的,但是这就意味着更高的资源开销:更多的MapFunction实例意味着更多的task,线程,flink内部网络连接,数据库的链接...前提 正确的实现flink的异步IO功能,需要所连接的数据库支持异步客户端。幸运的是很多流行的数据库支持这样的客户端。...异步IO API flink异步IO的API支持用户在data stream中使用异步请求客户端。API自身处理与数据流的整合,消息顺序,时间时间,容错等。...the 'AsyncFunction' that sends requests and sets the callback. */ class AsyncDatabaseRequest extends RichAsyncFunction...为了控制结果发送的顺序,flink提供了两种模式: 1). Unordered 结果记录在异步请求结束后立刻发送。流中的数据在经过该异步IO操作后顺序就和以前不一样了。

1.3K40

Flink1.4 用于外部数据访问的异步IO

通过扩展 MapFunction 到一个很高的并发度来提高吞吐量在一定程度上是可行的,但是常常会导致很高的资源成本:有更多的并行 MapFunction 实例意味着更多的任务、线程、Flink内部网络连接...Async I/O API Flink 的异步 I/O API允许用户在数据流中使用异步请求客户端。API处理与数据流的集成,以及处理顺序,事件时间,容错等。...callback with Futures that have the // interface of Java 8's futures (which is the same one followed by Flink's...the 'AsyncFunction' that sends requests and sets the callback. */ class AsyncDatabaseRequest extends RichAsyncFunction...为了控制结果记录发出的顺序,Flink 提供了两种模式: Unordered:异步请求结束后立即输出结果记录。在经过异步I/O算子之后,流中记录的顺序与之前会不一样。

88220
领券