前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Flink异步之矛-锋利的Async I/O

Flink异步之矛-锋利的Async I/O

作者头像
大数据真好玩
发布于 2020-02-11 09:51:24
发布于 2020-02-11 09:51:24
1.3K00
代码可运行
举报
文章被收录于专栏:暴走大数据暴走大数据
运行总次数:0
代码可运行

维表JOIN-绕不过去的业务场景

在Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字段。

例如:在电商场景中,需要一个商品的skuid去关联商品的一些属性,例如商品所属行业、商品的生产厂家、生产厂家的一些情况;在物流场景中,知道包裹id,需要去关联包裹的行业属性、发货信息、收货信息等等。

默认情况下,在Flink的MapFunction中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO阻塞,等待请求返回,然后继续发送下一个请求。这种同步交互的方式往往在网络等待上就耗费了大量时间。为了提高处理效率,可以增加MapFunction的并行度,但增加并行度就意味着更多的资源,并不是一种非常好的解决方式。

Async I/O异步非阻塞请求

Flink 在1.2中引入了Async I/O,在异步模式下,将IO操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。

Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,解决与外部系统交互时网络延迟成为了系统瓶颈的问题。

图中棕色的长条表示等待时间,可以发现网络等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式可以并发地处理多个请求和回复。也就是说,你可以连续地向数据库发送用户a、b、c等的请求,与此同时,哪个请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待,如上图右边所示。这也正是 Async I/O 的实现原理。

详细的原理可以参考文末给出的第一个链接,来自阿里巴巴云邪的分享。

一个简单的例子如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public classAsyncIOFunctionTest{
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        Properties p = new Properties();
        p.setProperty("bootstrap.servers", "localhost:9092");

        DataStreamSource<String> ds = env.addSource(new FlinkKafkaConsumer010<String>("order", new SimpleStringSchema(), p));
        ds.print();

        SingleOutputStreamOperator<Order> order = ds
                .map(new MapFunction<String, Order>() {
                    @Override
                    public Order map(String value) throws Exception {
                        return new Gson().fromJson(value, Order.class);
                    }
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() {
                    @Override
                    public long extractAscendingTimestamp(Order element) {
                        try {
                            return element.getOrderTime();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        return 0;
                    }
                })
                .keyBy(new KeySelector<Order, String>() {
                    @Override
                    public String getKey(Order value) throws Exception {
                        return value.getUserId();
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.minutes(10)))
                .maxBy("orderTime");

        SingleOutputStreamOperator<Tuple7<String, String, Integer, String, String, String, Long>> operator = AsyncDataStream
                .unorderedWait(order, new RichAsyncFunction<Order, Tuple7<String, String, Integer, String, String, String, Long>>() {

                    private Connection connection;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        Class.forName("com.mysql.jdbc.Driver");
                        connection = DriverManager.getConnection("url", "user", "pwd");
                        connection.setAutoCommit(false);
                    }

                    @Override
                    public void asyncInvoke(Order input, ResultFuture<Tuple7<String, String, Integer, String, String, String, Long>> resultFuture) throws Exception {
                        List<Tuple7<String, String, Integer, String, String, String, Long>> list = new ArrayList<>();
                        // 在 asyncInvoke 方法中异步查询数据库
                        String userId = input.getUserId();
                        Statement statement = connection.createStatement();
                        ResultSet resultSet = statement.executeQuery("select name,age,sex from user where userid=" + userId);
                        if (resultSet != null && resultSet.next()) {
                            String name = resultSet.getString("name");
                            int age = resultSet.getInt("age");
                            String sex = resultSet.getString("sex");
                            Tuple7<String, String, Integer, String, String, String, Long> res = Tuple7.of(userId, name, age, sex, input.getOrderId(), input.getPrice(), input.getOrderTime());
                            list.add(res);
                        }

                        // 将数据搜集
                        resultFuture.complete(list);
                    }

                    @Override
                    public void close() throws Exception {
                        super.close();
                        if (connection != null) {
                            connection.close();
                        }
                    }
                }, 5000, TimeUnit.MILLISECONDS,100);

        operator.print();


        env.execute("AsyncIOFunctionTest");
    }
}

上述代码中,原始订单流来自Kafka,去关联维度表将订单的用户信息取出来。从上面示例中可看到,我们在open()中创建连接对象,在close()方法中关闭连接,在RichAsyncFunction的asyncInvoke()方法中,直接查询数据库操作,并将数据返回出去。这样一个简单异步请求就完成了。

Async I/O的原理和基本用法

简单的来说,使用 Async I/O 对应到 Flink 的 API 就是 RichAsyncFunction 这个抽象类,继层这个抽象类实现里面的open(初始化),asyncInvoke(数据异步调用),close(停止的一些操作)方法,最主要的是实现asyncInvoke 里面的方法。

我们先来看一个使用Async I/O的模板方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
classAsyncDatabaseRequestextendsRichAsyncFunction<String, Tuple2<String, String>> {

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    publicvoidopen(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    publicvoidclose() throws Exception {
        client.close();
    }

    @Override
    publicvoidasyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

        // issue the asynchronous request, receive a future for result
        final Future<String> result = client.query(key);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

假设我们一个场景是需要进行异步请求其他数据库,那么要实现一个通过异步I/O来操作数据库还需要三个步骤:   

1、实现用来分发请求的AsyncFunction   

2、获取操作结果的callback,并将它提交到AsyncCollector中   

3、将异步I/O操作转换成DataStream

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
其中的两个重要的参数:Timeouttimeout 定义了异步操作过了多长时间后会被丢弃,这个参数是防止了死的或者失败的请求 Capacity 这个参数定义了可以同时处理多少个异步请求。虽然异步I/O方法会带来更好的吞吐量,但是算子仍然会成为流应用的瓶颈。超过限制的并发请求数量会产生背压。

几个需要注意的点:

  • 使用Async I/O,需要外部存储有支持异步请求的客户端。
  • 使用Async I/O,继承RichAsyncFunction(接口AsyncFunction的抽象类),重写或实现open(建立连接)、close(关闭连接)、asyncInvoke(异步调用)3个方法即可。
  • 使用Async I/O, 最好结合缓存一起使用,可减少请求外部存储的次数,提高效率。
  • Async I/O 提供了Timeout参数来控制请求最长等待时间。默认,异步I/O请求超时时,会引发异常并重启或停止作业。如果要处理超时,可以重写AsyncFunction#timeout方法。
  • Async I/O 提供了Capacity参数控制请求并发数,一旦Capacity被耗尽,会触发反压机制来抑制上游数据的摄入。
  • Async I/O 输出提供乱序和顺序两种模式。 乱序, 用AsyncDataStream.unorderedWait(...) API,每个并行的输出顺序和输入顺序可能不一致。 顺序, 用AsyncDataStream.orderedWait(...) API,每个并行的输出顺序和输入顺序一致。为保证顺序,需要在输出的Buffer中排序,该方式效率会低一些。

Flink 1.9 中的优化

由于新合入的 Blink 相关功能,使得 Flink 1.9 实现维表功能很简单。如果你要使用该功能,那就需要自己引入 Blink 的 Planner。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

然后我们只要自定义实现 LookupableTableSource 接口,同时实现里面的方法就可以进行,下面来分析一下 LookupableTableSource的代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interfaceLookupableTableSource<T> extendsTableSource<T> {
     TableFunction<T> getLookupFunction(String[] lookupKeys);
     AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);
     booleanisAsyncEnabled();
}

这三个方法分别是:

  • isAsyncEnabled 方法主要表示该表是否支持异步访问外部数据源获取数据,当返回 true 时,那么在注册到 TableEnvironment 后,使用时会返回异步函数进行调用,当返回 false 时,则使同步访问函数。
  • getLookupFunction 方法返回一个同步访问外部数据系统的函数,什么意思呢,就是你通过 Key 去查询外部数据库,需要等到返回数据后才继续处理数据,这会对系统处理的吞吐率有影响。
  • getAsyncLookupFunction 方法则是返回一个异步的函数,异步访问外部数据系统,获取数据,这能极大的提升系统吞吐率。

我们抛开同步访问函数不管。对于getAsyncLookupFunction会返回异步访问外部数据源的函数,如果你想使用异步函数,前提是 LookupableTableSource 的 isAsyncEnabled 方法返回 true 才能使用。使用异步函数访问外部数据系统,一般是外部系统有异步访问客户端,如果没有的话,可以自己使用线程池异步访问外部系统。例如:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public classMyAsyncLookupFunctionextendsAsyncTableFunction<Row> {
    private transient RedisAsyncCommands<String, String> async;
    @Override
    publicvoidopen(FunctionContext context) throws Exception {
        RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
        StatefulRedisConnection<String, String> connection = redisClient.connect();
        async = connection.async();
    }
    publicvoideval(CompletableFuture<Collection<Row>> future, Object... params) {
        redisFuture.thenAccept(new Consumer<String>() {
            @Override
            publicvoidaccept(String value) {
                future.complete(Collections.singletonList(Row.of(key, value)));
            }
        });
    }
}

一个完整的例子如下:

Main方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.Test;

import java.util.Properties;

public classLookUpAsyncTest{

    @Test
    public void test() throws Exception {
        LookUpAsyncTest.main(new String[]{});
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        final ParameterTool params = ParameterTool.fromArgs(args);
        String fileName = params.get("f");
        DataStream<String> source = env.readTextFile("hdfs://172.16.44.28:8020" + fileName, "UTF-8");

        TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG};
        String[] fields = new String[]{"id", "user_click", "time"};
        RowTypeInfo typeInformation = new RowTypeInfo(types, fields);

        DataStream<Row> stream = source.map(new MapFunction<String, Row>() {
            private static final long serialVersionUID = 2349572543469673349L;

            @Override
            public Row map(String s) {
                String[] split = s.split(",");
                Row row = new Row(split.length);
                for (int i = 0; i < split.length; i++) {

                    Object value = split[i];
                    if (types[i].equals(Types.STRING)) {
                        value = split[i];
                    }
                    if (types[i].equals(Types.LONG)) {
                        value = Long.valueOf(split[i]);
                    }
                    row.setField(i, value);
                }
                return row;
            }
        }).returns(typeInformation);

        tableEnv.registerDataStream("user_click_name", stream, String.join(",", typeInformation.getFieldNames()) + ",proctime.proctime");

        RedisAsyncLookupTableSource tableSource = RedisAsyncLookupTableSource.Builder.newBuilder()
                .withFieldNames(new String[]{"id", "name"})
                .withFieldTypes(new TypeInformation[]{Types.STRING, Types.STRING})
                .build();
        tableEnv.registerTableSource("info", tableSource);

        String sql = "select t1.id,t1.user_click,t2.name" +
                " from user_click_name as t1" +
                " join info FOR SYSTEM_TIME AS OF t1.proctime as t2" +
                " on t1.id = t2.id";

        Table table = tableEnv.sqlQuery(sql);

        DataStream<Row> result = tableEnv.toAppendStream(table, Row.class);

        DataStream<String> printStream = result.map(new MapFunction<Row, String>() {
            @Override
            public String map(Row value) throws Exception {
                return value.toString();
            }
        });

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9094");
        FlinkKafkaProducer011<String> kafkaProducer = new FlinkKafkaProducer011<>(
                "user_click_name",
                new SimpleStringSchema(),
                properties);
        printStream.addSink(kafkaProducer);

        tableEnv.execute(Thread.currentThread().getStackTrace()[1].getClassName());
    }
}

RedisAsyncLookupTableSource方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;

public classRedisAsyncLookupTableSourceimplementsStreamTableSource<Row>, LookupableTableSource<Row> {

    private final String[] fieldNames;
    private final TypeInformation[] fieldTypes;

    publicRedisAsyncLookupTableSource(String[] fieldNames, TypeInformation[] fieldTypes) {
       this.fieldNames = fieldNames;
        this.fieldTypes = fieldTypes;
    }

    //同步方法
    @Override
    public TableFunction<Row> getLookupFunction(String[] strings) {
        return null;
    }

    //异步方法
    @Override
    public AsyncTableFunction<Row> getAsyncLookupFunction(String[] strings) {
        return MyAsyncLookupFunction.Builder.getBuilder()
                .withFieldNames(fieldNames)
                .withFieldTypes(fieldTypes)
                .build();
    }

    //开启异步
    @Override
    publicbooleanisAsyncEnabled() {
        return true;
    }

    @Override
    public DataType getProducedDataType() {
        return TypeConversions.fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, fieldNames));
    }

    @Override
    public TableSchema getTableSchema() {
        return TableSchema.builder()
                .fields(fieldNames, TypeConversions.fromLegacyInfoToDataType(fieldTypes))
                .build();
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment environment) {
        throw new UnsupportedOperationException("do not support getDataStream");
    }

    public static final classBuilder{
        private String[] fieldNames;
        private TypeInformation[] fieldTypes;

        privateBuilder() {
        }

        publicstatic Builder newBuilder() {
            return new Builder();
        }

        public Builder withFieldNames(String[] fieldNames) {
            this.fieldNames = fieldNames;
            return this;
        }

        public Builder withFieldTypes(TypeInformation[] fieldTypes) {
            this.fieldTypes = fieldTypes;
            return this;
        }

        public RedisAsyncLookupTableSource build() {
            return new RedisAsyncLookupTableSource(fieldNames, fieldTypes);
        }
    }
}

MyAsyncLookupFunction

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.types.Row;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

public classMyAsyncLookupFunctionextendsAsyncTableFunction<Row> {

    private final String[] fieldNames;
    private final TypeInformation[] fieldTypes;

    private transient RedisAsyncCommands<String, String> async;

    publicMyAsyncLookupFunction(String[] fieldNames, TypeInformation[] fieldTypes) {
        this.fieldNames = fieldNames;
        this.fieldTypes = fieldTypes;
    }

    @Override
    publicvoidopen(FunctionContext context) {
        //配置redis异步连接
        RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
        StatefulRedisConnection<String, String> connection = redisClient.connect();
        async = connection.async();
    }

    //每一条流数据都会调用此方法进行join
    publicvoideval(CompletableFuture<Collection<Row>> future, Object... paramas) {
        //表名、主键名、主键值、列名
        String[] info = {"userInfo", "userId", paramas[0].toString(), "userName"};
        String key = String.join(":", info);
        RedisFuture<String> redisFuture = async.get(key);

        redisFuture.thenAccept(new Consumer<String>() {
            @Override
            publicvoidaccept(String value) {
                future.complete(Collections.singletonList(Row.of(key, value)));
            }
        });
    }

    @Override
    public TypeInformation<Row> getResultType() {
        return new RowTypeInfo(fieldTypes, fieldNames);
    }

    public static final classBuilder{
        private String[] fieldNames;
        private TypeInformation[] fieldTypes;

        private Builder() {
        }

        public static Builder getBuilder() {
            return new Builder();
        }

        public Builder withFieldNames(String[] fieldNames) {
            this.fieldNames = fieldNames;
            return this;
        }

        public Builder withFieldTypes(TypeInformation[] fieldTypes) {
            this.fieldTypes = fieldTypes;
            return this;
        }

        public MyAsyncLookupFunction build() {
            return new MyAsyncLookupFunction(fieldNames, fieldTypes);
        }
    }
}

使用Async十分需要注意的几个点:

1、 外部数据源必须是异步客户端:如果是线程安全的,你可以不加 transient 关键字,初始化一次。否则,你需要加上 transient,不对其进行初始化,而在 open 方法中,为每个 Task 实例初始化一个。

2、eval 方法中多了一个 CompletableFuture,当异步访问完成时,需要调用其方法进行处理。比如上面例子中的:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
redisFuture.thenAccept(new Consumer<String>() {
            @Override
            public void accept(String value) {
                future.complete(Collections.singletonList(Row.of(key, value)));
            }
        });

3、社区虽然提供异步关联维度表的功能,但事实上大数据量下关联外部系统维表仍然会成为系统的瓶颈,所以一般我们会在同步函数和异步函数中加入缓存。综合并发、易用、实时更新和多版本等因素考虑,Hbase可能是最理想的外部维表。

参考文章:

http://wuchong.me/blog/2017/05/17/flink-internals-async-io/#

https://www.jianshu.com/p/d8f99d94b761

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673

https://www.jianshu.com/p/7ce84f978ae0

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-01-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
聊聊flink的Async I/O
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
code4it
2019/01/19
3.5K0
聊聊flink的Async I/O
聊聊flink的JDBCAppendTableSink
flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
code4it
2019/02/02
1.5K0
聊聊flink的JDBCAppendTableSink
聊聊flink的CsvTableSink
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSink.scala
code4it
2019/02/06
1.5K0
聊聊flink的CsvTableSink
聊聊flink的CsvTableSource
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/TableSource.scala
code4it
2019/02/05
1.4K0
聊聊flink的CsvTableSource
聊聊flink的JDBCOutputFormat
flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
code4it
2018/12/04
2.2K0
聊聊flink的JDBCOutputFormat
Flink中DataStream和Table互相转换
前言 Flink 为处理一列转多列的场景提供了两种返回类型 Tuple 和 Row Tuple 只支持1~25个字段,且不能为null,不支持拓展 Row 支持null同时也无限制字段数,但如果需要使用Row,必须重载实现getResultType方法 DataStream=>Table import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInfo
码客说
2023/01/13
1.8K0
2021年大数据Flink(四十六):扩展阅读 异步IO
Apache Flink 1.12 Documentation: Asynchronous I/O for External Data Access
Lansonli
2021/10/11
1.4K0
flink系列(5)-kafka源码分析
最近一直在弄flink sql相关的东西,第一阶段的目标是从解决kafka的消费和写入的问题。不过也有些同学并不是很了解,今天我们来详细分析一下包的继承层次。
yiduwangkai
2019/09/17
7120
flink系列(5)-kafka源码分析
Flink通过异步IO实现redis维表join
使用flink做实时数仓的公司越来越多了,浪尖这边也是很早就开发了一个flink 全sql平台来实现实时数仓的功能。说到实时数仓,两个表的概念大家一定会知道的:事实表和维表。
Spark学习技巧
2019/12/27
3.6K0
Flink通过异步IO实现redis维表join
Flink开发-Mysql数据导入Hive中
Mysql中ResultSet默认会将一次查询的结果存入内存中。如果数据量比较大,就会占用大量的内存。如果内存不够,就会报错。
码客说
2023/03/06
1.9K0
六大方法彻底解决Flink Table & SQL维表Join
随着 Flink Table & SQL的发展,Flink SQL中用于进行维表Join也成为了很多场景的选择。
大数据真好玩
2021/11/16
3.9K0
Flink实战系列之自定义RetractStreamTableSink
Flink Table/SQL 中对于流表TableSink的定义有三类:AppendStreamTable、RetractStreamTableSink 、UpsertStreamTableSink ,这三类主要区别对应不同的流类型,在我看来可以归纳为两种模式:
Flink实战剖析
2022/04/18
4400
Flink实战系列之自定义UpsertStreamTableSink
在Flink实战系列之自定义RetractStreamTableSink中介绍了如何编写自定义RetractStreamTableSink,Flink 中提供了另外一种可Redo模式的UpsertStreamTableSink,与RetractStreamTableSink不同的是:
Flink实战剖析
2022/04/18
5890
Flink重点难点:维表关联理论和Join实战
数据流操作的另一个常见需求是对两条数据流中的事件进行联结(connect)或Join。Flink DataStream API中内置有两个可以根据时间条件对数据流进行Join的算子:基于间隔的Join和基于窗口的Join。本节我们会对它们进行介绍。
王知无-import_bigdata
2021/09/22
4.6K0
零基础学Flink:Data Source & Data Sink
在上一篇讲述CEP的文章里,直接使用了自定义Source和Sink,我翻阅了一下以前的文章,似乎没有对这部分进行一个梳理,那么今天我们来就这上次的代码,来说说 Data Source 和 Data Sink吧。
麒思妙想
2020/07/10
2.4K0
14-Flink-Table-&-SQL实战
Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用Table API分析模式,或者可以在预处理上运行Gelly图算法之前使用SQL查询扫描,过滤和聚合批处理表数据。
王知无-import_bigdata
2019/03/04
1.3K0
14-Flink-Table-&-SQL实战
聊聊flink的Table API及SQL Programs
序 本文主要研究一下flink的Table API及SQL Programs flink-forward-sf-2017-timo-walther-table-sql-api-unified-apis-for-batch-and-stream-processing-8-638.jpg 实例 // for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment StreamExecutionEnvironm
code4it
2019/01/21
2.1K0
聊聊flink的Table API及SQL Programs
使用Calcite解析Sql做维表关联(二)
继上一篇中使用Calcite解析Sql做维表关联(一) 介绍了建表语句解析方式以及使用calcite解析解析流表join维表方法,这一篇将会介绍如何使用代码去实现将sql变为可执行的代码。
Flink实战剖析
2022/04/18
5990
Apache Flink Table API的Catalog
“ Apache Flink的Table API提供了对数据注册为Table的方式, 实现把数据通过SQL的方式进行计算。Table API与SQL API实现了Apache Flink的批流统一的实现方式。Table API与SQL API的核心概念就是TableEnviroment。TableEnviroment对象提供方法注册数据源与数据表信息。那么数据源与数据表的信息则存储在CataLog中。所以,CataLog是TableEnviroment的重要组成部分。”
CainGao
2020/04/14
1.9K0
flink异步io 转
讨论主题:http:  //apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-for-Asynchronous-IO-in-FLINK-tt13497.html
stys35
2019/03/20
1.3K0
flink异步io
                                                                            转
相关推荐
聊聊flink的Async I/O
更多 >
领券
社区富文本编辑器全新改版!诚邀体验~
全新交互,全新视觉,新增快捷键、悬浮工具栏、高亮块等功能并同时优化现有功能,全面提升创作效率和体验
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验