前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink进行Paimon写入源码分析

Flink进行Paimon写入源码分析

原创
作者头像
wenly
修改2023-08-15 10:27:41
1.7K0
修改2023-08-15 10:27:41
举报
文章被收录于专栏:湖仓一体

1. 前言

Paimon的前身是Flink-Table-Store,希望提供流批一体的存储,提供一定的OLAP查询能力(基于列式存储),做到毫秒级别的实时流式读取。Flink-Table-Store希望能够支持Flink SQL的全部概念,能够结合Flink SQL提供DB级别体验,并且支持大规模的更新。Flink-Table-Store希望能够结合Flink,实现完整的流批一体体验(计算+存储),同时拓展Flink-Table-Store的生态,升级为Paimon,来支持更多大数据引擎的查询/写入。如果我们希望深度使用Paimon,并充分利用Paimon的特性,那么了解Flilnk写入Paimon的过程十分重要,本文希望通过源码分析的方式带大家充分了解Flink写入Paimon的完整过程。

2. 源码阅读demo

阅读源码最有效的方式就是跟读最简单的代码,由于Paimon的定位是打造类似Database的使用体验,与SQL结合很紧密,通常可以使用Flink SQL进行写入,但这里为了方便跟读代码,我们采用调api的方式进行写入

代码语言:java
复制
// 构建表
TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().build());
String tableSql = String.format(
    "create table %s (uuid string, name string, age int, ts int, part string, primary key (part,uuid) NOT ENFORCED)  partitioned by (part) with ('connector' = 'paimon', 'bucket' = '2', 'bucket-key' = 'uuid', 'path' = '%s')"
    , tableName
    , filePath
);
tableEnv.executeSql(tableSql);
Catalog catalog = tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get();
String database = catalog.getDefaultDatabase();
ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) catalog.getTable(new ObjectPath(database, tableName));
ObjectIdentifier tablePath = ObjectIdentifier.of(catalogName, database, tableName);
FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(
    tablePath, 
    catalogTable,
    Collections.emptyMap(),
    Configuration.fromMap(catalogTable.getOptions()),
    Thread.currentThread().getContextClassLoader(), 
    false
);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(20000);
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 数据源
DataStream<RowData> dataStream = ...
// 将数据源写入Paimon表
((DataStreamSinkProvider) new FlinkTableFactory()
    .createDynamicTableSink(context)
    .getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)))
    .consumeDataStream(n -> Optional.empty(), dataStream);

env.execute("PaimonWriteDemoWithFlink");

3. Flink写入Paimon的完整流程

Paimon表的写入是通过FlinkTableSink实现DynamicTableSink接口来写入数据,核心逻辑位于getSinkRuntimeProvider方法中,后面我们会重点跟读getSinkRuntimeProvider方法中的内容。

这里也顺带讲一下通过Flink SQL进行写入的场景,对于Flink SQL而言,首先会通过calcite进行解析优化后生成JobGraph,再提交集群运行。我们希望了解Flink写入Paimon的完整过程,可以通过sql转化成JobGraph的过程中得到的Transformation列表去了解中间具体进行了哪些操作。下图是一条简单Flink SQL转化后的Transformation序列,其中红框内的Transformation序列是Paimon数据写入的完整过程,本质上是通过执行getSinkRuntimeProvider方法生成的。

图3-1 Flink SQL中Paimon表写入转化成Transformation序列
图3-1 Flink SQL中Paimon表写入转化成Transformation序列

接着,我们对Flink写入Paimon表的流程进行了梳理,整理出完整的类图,如下图所示

图3-2 Flink写入Paimon完整流程类图
图3-2 Flink写入Paimon完整流程类图

其中,核心入口FlinkTableSink的getSinkRuntimeProvider方法逻辑如下所示,

代码语言:java
复制
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    ...
    LogSinkProvider logSinkProvider = null;
    if (logStoreTableFactory != null) {
        // 当配置了log.system,则会生成一个log sink的提供器,目前只有kafka的日志存储
        logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
    }

    Options conf = Options.fromMap(table.options());
    // 生成log sink函数,当overwrite模式时,不配置log sink
    final LogSinkFunction logSinkFunction =
            overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
    return new PaimonDataStreamSinkProvider(
            (dataStream) ->
                    // FileStoreTable时FileStore的抽象层,提供InternalRow的读取和写入
                    new FlinkSinkBuilder((FileStoreTable) table)
                            .withInput(
                                    new DataStream<>(dataStream.getExecutionEnvironment(),dataStream.getTransformation()))
                            .withLogSinkFunction(logSinkFunction)
                            .withOverwritePartition(overwrite ? staticPartitions : null)
                                            .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
                            .build());
}

继续跟到FlinkSinkBuilder的build方法,会根据Paimon表的bucket模式生成不同类型的Sink

代码语言:java
复制
public DataStreamSink<?> build() {
    BucketMode bucketMode = table.bucketMode();
    switch (bucketMode) {
        case FIXED:
            // 生成固定bucket个数的Sink,这种方式结构固定,本文着重分析这一路径
            return buildForFixedBucket();
        case DYNAMIC:
            // 生成动态变动Bucket个数的Sink,
            return buildDynamicBucketSink();
        case UNAWARE:
            return buildUnawareBucketSink();
        default:
            throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
    }
}

private DataStreamSink<?> buildForFixedBucket() {
    // 首先根据分区信息、bucket字段进行bucket分组
    DataStream<RowData> partitioned =
            partition(
                    input,
                    new RowDataChannelComputer(table.schema(), logSinkFunction != null),
                    parallelism);
    // FileStoreSink实现将记录写入Paimon,FileStoreSink提供了生成写入算子的方法
    FileStoreSink sink = new FileStoreSink(table, overwritePartition, logSinkFunction);
    return sink.sinkFrom(partitioned);
}

接着继续跟读到FileStoreSink的sinkFrom的方法

代码语言:java
复制
public DataStreamSink<?> sinkFrom(DataStream<T> input) {
    String initialCommitUser = UUID.randomUUID().toString();
    return sinkFrom(input, initialCommitUser);
}

public DataStreamSink<?> sinkFrom(DataStream<T> input, String initialCommitUser) {
    // 执行真正的写入操作,在这个阶段不会进行提交,相当于两阶段提交的第一阶段,进行数据写入,不会有snapshot生成
    SingleOutputStreamOperator<Committable> written =
                doWrite(input, initialCommitUser, input.getParallelism());
    // 执行提交操作,会生成snapshot,下游可见,如果日志配置
    return doCommit(written, initialCommitUser);
}

public SingleOutputStreamOperator<Committable> doWrite(
    DataStream<T> input, String commitUser, Integer parallelism) {
    ...
    // 是否只是writeOnly,如果是,则会忽略compact和snapshot过期,这个配置需要结合专门的compact任务执行,不然会造成小文件剧增,同时降低数据读的性能
    Boolean writeOnly = table.coreOptions().writeOnly();
    SingleOutputStreamOperator<Committable> written =
        input.transform(
           (writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME) + " -> " + table.name(),
            new CommittableTypeInfo(),
            // 生成写入算子,上游的数据会通过这个算子将数据写入到Paimon表
            createWriteOperator(
                createWriteProvider(env.getCheckpointConfig(), isStreaming), commitUser)
        ).setParallelism(parallelism == null ? input.getParallelism() : parallelism);
    ...
    Options options = Options.fromMap(table.options());
    if (options.get(SINK_USE_MANAGED_MEMORY)) {
        // Flink会创建一个独立的内存分配器用于merge tree的数据写入操作
        // 否则会使用TM的管理内存支持写入操作
        MemorySize memorySize = options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY);
        written.getTransformation()
            .declareManagedMemoryUseCaseAtOperatorScope(
                ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes());
    }
    return written;
}
    
protected DataStreamSink<?> doCommit(DataStream<Committable> written, String commitUser) {
    ...
    SingleOutputStreamOperator<?> committed =
        written.transform(
                GLOBAL_COMMITTER_NAME + " -> " + table.name(),
                new CommittableTypeInfo(),
                // 执行提交操作的算子,该算子会生成snapshot,下游可见
                new CommitterOperator<>(
                    streamingCheckpointEnabled,
                    commitUser,
                    createCommitterFactory(streamingCheckpointEnabled),
                    createCommittableStateManager()))
            .setParallelism(1)
            .setMaxParallelism(1);
    return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

至此,我们可以发现,Flink写入Paimon的所有Transformation已经构建完成了,接下来,我们可以跟着写入算子的processElement方法了解数据是如何成为Paimon底层的LSM-tree数据结构的。

4. Flink写入数据

Flink写入Paimon的算子是RowDataStoreWriteOperator,算子是预提交算子,会将数据flush的磁盘,但不会执行commit操作,核心代码如下

代码语言:java
复制
public void processElement(StreamRecord<RowData> element) throws Exception {
    sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;
    
    SinkRecord record;
    try {
        // 将数据写入Paimon的文件系统
        record = write.write(new FlinkRowWrapper(element.getValue()));
    } catch (Exception e) {
        throw new IOException(e);
    }

    if (logSinkFunction != null) {
        // 将数据双写到日志存储层,目前只支持写入到kafka
        SinkRecord logRecord = write.toLogRecord(record);
        logSinkFunction.invoke(logRecord, sinkContext);
    }
}

上述算子中的write生成是通过FlieStoreTable.store().newWrite()的逻辑生成的

FileStoreTable是对Paimon表的文件存储的抽象层,提供底层数据的读写api,Paimon会根据WriteMode和是否包含主键来决定生成哪一种FileStoreTable:

  • 当WriteMode = APPEND_ONLY,则会生成AppendOnlyFileStoreTable,这种表的写入流程很简单,当写入的记录数达到阈值且缓存数据量达到单文件大小时就会进行flush操作;
  • 当WriteMode = CHANGE_LOG 且不包含主键,则会生成ChangelogValueCountFileStoreTable,当从内存池拉不到内存块的时候,会进行flush操作,在flush操作的时候,会记录flush的数据的统计值;
  • 当WriteMode = CHANGE_LOG 且包含主键,则会生成ChangelogWithKeyFileStoreTable,与ChangelogValueCountFileStoreTable的flush时机一致,只是在flush中会根据主键进行merge操作;
  • 当WriteMode = AUTO,则会根据是否有主键生成AppendOnlyFileStoreTable还是ChangelogWithKeyFileStoreTable。

接下来,我们会分析FileStoreTable为ChangelogWithKeyFileStoreTable的情况。根据图3-2的类图会发现,最终会调用到MergeTreeWriter的write()方法。该方法会先将数据缓存在内存缓存中(SortBuffer),该部分缓存数据为排序数据,当缓存满了之后,会将缓存中的数据flush到磁盘。

代码语言:java
复制
public void write(KeyValue kv) throws Exception {
    long sequenceNumber =
            kv.sequenceNumber() == KeyValue.UNKNOWN_SEQUENCE
                    ? newSequenceNumber()
                    : kv.sequenceNumber();
    // 将数据put到内存缓存中
    boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
    if (!success) {
        // 缓存满了,会flush缓存数据到磁盘
        flushWriteBuffer(false, false);
        success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
        if (!success) {
            throw new RuntimeException("Mem table is too small to hold a single element.");
        }
    }
}

flush的代码如下

代码语言:java
复制
private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception {
    if (writeBuffer.size() > 0) {
        if (compactManager.shouldWaitForLatestCompaction()) {
            // 判断是否需要进行等待上一次compact结束
            waitForLatestCompaction = true;
        }
        // 根据changelogProducer的类型生成changelog文件写入器
        final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =
            changelogProducer == ChangelogProducer.INPUT ? writerFactory.createRollingChangelogFileWriter(0) : null;
        // 生成flush数据的MergeTree文件写入器,根据文件大小进行rolling
        final RollingFileWriter<KeyValue, DataFileMeta> dataWriter = writerFactory.createRollingMergeTreeFileWriter(0);
        try {
            // 将缓存中的所有数据flush到磁盘
            writeBuffer.forEach(
                keyComparator,
                mergeFunction,
                changelogWriter == null ? null : changelogWriter::write,
                dataWriter::write);
        } finally {
            if (changelogWriter != null) {
                changelogWriter.close();
            }
            dataWriter.close();
        }

        if (changelogWriter != null) {
            // 将changelog新增文件缓存在算子中,供算子在进行checkpoint的时候将所有的flush下发到下游算子(提交算子),下发是在prepareSnapshotPreBarrier()方法中进行的,所以会在下游算子进行checkpoint之前接收所有的flush信息
            newFilesChangelog.addAll(changelogWriter.result());
        }

        for (DataFileMeta fileMeta : dataWriter.result()) {
            // 将新增的文件缓存在算子中,供算子在进行checkpoint的时候将所有的提交下发到下游算子(提交算子)
            newFiles.add(fileMeta);
            compactManager.addNewFile(fileMeta);
        }

        writeBuffer.clear();
    }
    // 尝试同步上一次compact结果
    trySyncLatestCompaction(waitForLatestCompaction);
    // 尝试去触发一次新的compact
    compactManager.triggerCompaction(forcedFullCompaction);
}

在writeBuffer进行flush磁盘的时候,会对数据进行merge操作,当FileStoreTable为ChangelogWithKeyFileStoreTable时,会根据merge-engine配置来生成特定的merge引擎,也就是3-2类图中的merge function,具体代码如下,

代码语言:java
复制
...
// 获取merge-engine配置
CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
KeyValueFieldsExtractor extractor = ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;

MergeFunctionFactory<KeyValue> mfFactory;
switch (mergeEngine) {
    case DEDUPLICATE:
        // merge操作时,只会保留最新的一条
        mfFactory = DeduplicateMergeFunction.factory();
        break;
    case PARTIAL_UPDATE:
        // 可以更新部分非空字段,可以设置字段级的sequence来进行局部比较更新
        // 不支持retract类型的操作
        mfFactory = PartialUpdateMergeFunction.factory(conf, rowType);
        break;
    case AGGREGATE:
        // merge是对字段做聚合操作
        mfFactory =
            AggregateMergeFunction.factory(
                conf,
                tableSchema.fieldNames(),
                rowType.getFieldTypes(),
                tableSchema.primaryKeys());
        break;
    case FIRST_ROW:
        // 只保留第一条
        mfFactory =
            FirstRowMergeFunction.factory(
                new RowType(extractor.keyFields(tableSchema)), rowType);
            break;
    default:
        throw new UnsupportedOperationException(
            "Unsupported merge engine: " + mergeEngine);
}

if (options.changelogProducer() == ChangelogProducer.LOOKUP) {
    // 当change log的生成方式为LOOKUP的时候,merge则为封装
    mfFactory =
        LookupMergeFunction.wrap(
            mfFactory,
            new RowType(extractor.keyFields(tableSchema)),
            rowType);
}

在写入后,会进行尝试同步上一次compact的结果,同时也会尝试去触发一次新的compact,Paimon的Compact原理争取通过下一篇文章来分析。

代码语言:txt
复制
这里,与Flink写入Hudi的过程一样,Flink写入Paimon是如何保证Exactly-Once语义的呢?

5. Flink提交写入状态

提交算子CommitterOperator首先会接收上游算子下发的flush信息

代码语言:java
复制
public void processElement(StreamRecord<CommitT> element) {
    output.collect(element);
    // 缓存上游下发的flush信息在算子内部
    this.inputs.add(element.getValue());
}

public void notifyCheckpointComplete(long checkpointId) throws Exception {
    super.notifyCheckpointComplete(checkpointId);
    // 在算子进行checkpoint完成后进行commit操作
    commitUpToCheckpoint(endInput ? Long.MAX_VALUE : checkpointId);
}

private void commitUpToCheckpoint(long checkpointId) throws Exception {
    NavigableMap<Long, GlobalCommitT> headMap =
            committablesPerCheckpoint.headMap(checkpointId, true);
    // 将算子内部缓存的flush信息整合进行commit操作
    committer.commit(committables(headMap));
    headMap.clear();
}

commiter.commit的逻辑是将所有的flush信息进行一次commit操作,最终flush信息会被序列化成json信息保存在snapshot文件中,当commit成功后,Paimon表会新增一次snapshot。详细的commit过程的代码比较简单,可以直接跟读,这里就不再赘述。

6. 最后

本文通过跟读源码的方式对Flink写入Paimon的核心流程进行了解析,相信通过对Flink写入Paimon流程细节的梳理,对理解Paimon的特性及性能优化都是有极大的助力。由于本人能力有限,文章中出现的错误在所难免,希望大家发现后帮忙指正,万分感谢。

最后总结一下,本文主要解析了Flink写入Paimon的核心流程:

代码语言:txt
复制
1. 介绍了Flink SQL/api的方式构建写入流程DAG的完整过程;
2. 介绍了写入算子RowDataStoreWriteOperator处理数据的完整流程;
3. 介绍了提交算子CommitterOperator进行snapshot提交的完整流程。

当然,本文由于篇幅有限,没有对Flink和Paimon架构和概念进行详细的介绍,同时对Flink写入Paimon的Compact过程及性能优化也没有涉及,后续会加上这些方面的解析。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 前言
  • 2. 源码阅读demo
  • 3. Flink写入Paimon的完整流程
  • 4. Flink写入数据
  • 5. Flink提交写入状态
  • 6. 最后
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档