前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文搞懂 FlinkSQL 的 KafkaSource

一文搞懂 FlinkSQL 的 KafkaSource

作者头像
shengjk1
发布2021-02-01 12:02:10
1.4K0
发布2021-02-01 12:02:10
举报
文章被收录于专栏:码字搬砖码字搬砖码字搬砖
背景

前面我们了解了 写给大忙人看的Flink 消费 Kafka,今天我们一起来看一下 FlinkSQL Kafka 是如何与 Flink Streaming Kafka 结合起来的

正文

创建 kafka source

CREATE TABLE orders
(
    status      int,
    courier_id  bigint,
    id          bigint,
    finish_time BIGINT,
    place_time  BIGINT,
    PRIMARY KEY (id) NOT ENFORCED
)
    WITH (
        'connector' = 'kafka','topic' = 'test',
        'properties.bootstrap.servers' = 'xxx','properties.group.id' = 'testGroup',
        'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'latest-offset');

经过 Apache Calcite 的一系列转化( 具体转化的过程后续会写 ),最终达到 CatalogSourceTable 类,此类继承自 FlinkPreparingTableBase,负责将 Calcite 的 RelOptTable 转化为 Flink 的 TableSourceTable

@Override
    //入口方法  SqlToRelConverter toRel 方法
    public RelNode toRel(ToRelContext toRelContext) {
        final RelOptCluster cluster = toRelContext.getCluster();
        final List<RelHint> hints = toRelContext.getTableHints();// sql Hint
        final FlinkContext context = ShortcutUtils.unwrapContext(cluster);
        final FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(cluster);
        final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(cluster, relOptSchema);

        // 0. finalize catalog table
        final Map<String, String> hintedOptions = FlinkHints.getHintedOptions(hints);
        final CatalogTable catalogTable = createFinalCatalogTable(context, hintedOptions);

        // 1. create and prepare table source
        final DynamicTableSource tableSource = createDynamicTableSource(context, catalogTable);
        prepareDynamicSource(
                schemaTable.getTableIdentifier(),
                catalogTable,
                tableSource,
                schemaTable.isStreamingMode(),
                context.getTableConfig());

        // 2. push table scan
        pushTableScan(relBuilder, cluster, catalogTable, tableSource, typeFactory, hints);

        // 3. push project for non-physical columns
        final TableSchema schema = catalogTable.getSchema();
        if (!TableSchemaUtils.containsPhysicalColumnsOnly(schema)) {
            pushMetadataProjection(relBuilder, typeFactory, schema);
            pushGeneratedProjection(context, relBuilder, schema);
        }

        // 4. push watermark assigner
        if (schemaTable.isStreamingMode() && !schema.getWatermarkSpecs().isEmpty()) {
            pushWatermarkAssigner(context, relBuilder, schema);
        }

        return relBuilder.build();
    }

0-4 转化完成。这篇 blog 主要关心部分是 1 ,我们继续追踪到 FactoryUtil.createTableSource 方法

public static DynamicTableSource createTableSource(
            @Nullable Catalog catalog, //GenericlnMemoryCatalog
            ObjectIdentifier objectIdentifier,//`default_catalog`.`default_database`.`orders`
            CatalogTable catalogTable,//CatalogTableImpl
            ReadableConfig configuration,
            ClassLoader classLoader,
            boolean isTemporary) {
        final DefaultDynamicTableContext context =
                new DefaultDynamicTableContext(
                        objectIdentifier, catalogTable, configuration, classLoader, isTemporary);
        try {
            final DynamicTableSourceFactory factory = // 找到 KafkaDynamicTableFactory
                    getDynamicTableFactory(DynamicTableSourceFactory.class, catalog, context);
            return factory.createDynamicTableSource(context);
        } catch (Throwable t) {
            throw new ValidationException(
                    String.format(
                            "Unable to create a source for reading table '%s'.\n\n"
                                    + "Table options are:\n\n"
                                    + "%s",
                            objectIdentifier.asSummaryString(),
                            catalogTable.getOptions().entrySet().stream()
                                    .map(e -> stringifyOption(e.getKey(), e.getValue()))
                                    .sorted()
                                    .collect(Collectors.joining("\n"))),
                    t);
        }
    }

我们到 KafkaDynamicTableFactory 的 createDynamicTableSource 方法

@Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

        final ReadableConfig tableOptions = helper.getOptions();//with 里的配置信息

        // 通过 format (SPI)
        final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
                getKeyDecodingFormat(helper);

        final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =//SSCanalJsonFormatFactory
                getValueDecodingFormat(helper);

        // 一些类的校验 validate
        helper.validateExcept(PROPERTIES_PREFIX);

        validateTableSourceOptions(tableOptions);

        validatePKConstraints(
                context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);

        final StartupOptions startupOptions = getStartupOptions(tableOptions);

        //获取 kafka 本身的一些配置 servers、group.id 等
        final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());

        // add topic-partition discovery
        properties.setProperty(
                FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
                String.valueOf(
                        tableOptions
                                .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
                                .map(Duration::toMillis)
                                .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));

        final DataType physicalDataType =//ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL
                context.getCatalogTable().getSchema().toPhysicalRowDataType();

        final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);

        final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);

        final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);

        return createKafkaTableSource(
                physicalDataType,
                keyDecodingFormat.orElse(null),
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                KafkaOptions.getSourceTopics(tableOptions),
                KafkaOptions.getSourceTopicPattern(tableOptions),
                properties,
                startupOptions.startupMode,
                startupOptions.specificOffsets,
                startupOptions.startupTimestampMillis);
    }

首先做了一些校验,然后传入一些配置来创建 tableSource ,如下

protected KafkaDynamicSource createKafkaTableSource(
            DataType physicalDataType,//要查询的字段 ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL
            @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,//SSCanalJsonFormatFactory
            int[] keyProjection,
            int[] valueProjection,
            @Nullable String keyPrefix,
            @Nullable List<String> topics,// topics
            @Nullable Pattern topicPattern,//topicPattern
            Properties properties,// kafka 的一些配置信息,servers、group.id 等
            StartupMode startupMode,
            Map<KafkaTopicPartition, Long> specificStartupOffsets,
            long startupTimestampMillis) {
        return new KafkaDynamicSource(
                physicalDataType,
                keyDecodingFormat,
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                topics,
                topicPattern,
                properties,
                startupMode,
                specificStartupOffsets,
                startupTimestampMillis,
                false);
    }

继续执行

 prepareDynamicSource(
                schemaTable.getTableIdentifier(),
                catalogTable,
                tableSource,
                schemaTable.isStreamingMode(),
                context.getTableConfig());

会调用 KafkaDynamicSource.getScanRuntimeProvider 方法,创建 FlinkKafkaConsumer 成功

其他

关于 'format' = 'ss-canal-json' 的一些事情可以参考 FlinkSQL 平台

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-01-26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 正文
  • 其他
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档