本文基于 Apache Flink 1.16 和 Apache Paimon 1.0.1,详细介绍如何配置 Paimon 环境、创建和使用追加表(Append-Only Table)与主键表(Primary-Key Table)的特点和使用。
一. IDEA 环境准备
下载官方paimon-1.0.1版本,本地编译选择对应的flink版本

把对应的jar包安装到本地仓库:
mvn install:install-file -DgroupId=org.apache.paimon -DartifactId=paimon-flink-1.16 -Dversion=1.0.1 -Dpackaging=jar -Dfile=paimon-flink-1.16-1.0.1.jar在工程里引用对应的依赖
<dependency> <groupId>org.apache.paimon</groupId> <artifactId>paimon-flink-1.16</artifactId> <version>1.0.1</version> <scope>provided</scope> </dependency>二. 追加表
追加表是 Apache Paimon 中没有定义主键的表类型,默认情况下,表如果未指定主键,就被视为追加表。这种表主要用于追加数据的场景,类似于日志数据同步或不需要频繁更新的数据湖存储。
特点:
1.1. 无主键:追加表不定义主键,因此无法直接通过主键进行数据更新或删除操作。
1.2. 分为Scalable 表 和 Queue 表:Scalable 表没有桶的概念,无需考虑数据顺序、无需对数据进行hash partitioning;Queue 表按 bucket-key 分发,每个分桶中数据的消费顺序与数据写入Paimon表的顺序一致;
创建基本本地文件的 Paimon catalog,无主键表,定义 'bucket' = '-1'
/** * @author BigData共享 */public class AppendTable { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(10000l); env.getCheckpointConfig().setCheckpointStorage(new MemoryStateBackend(20 * 1024 * 1024)); TableEnvironment tableEnv = StreamTableEnvironment.create(env); // 创建 paimon catalog tableEnv.executeSql("CREATE CATALOG paimon_catalog WITH (\n" + " 'type'='paimon',\n" + " 'warehouse'='file:///tmp/paimon'\n" + ")"); tableEnv.executeSql("USE CATALOG paimon_catalog"); // 创建 scalable table tableEnv.executeSql("CREATE TABLE IF NOT EXISTS scalable_table (\n" + " word STRING,\n" + " dt STRING\n" + ") PARTITIONED BY (dt) with (\n" + " 'bucket' = '-1'\n" + ")"); tableEnv.executeSql("CREATE TEMPORARY TABLE IF NOT EXISTS word_table (\n" + " id INT,\n" + " word STRING\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'fields.id.kind' = 'random',\n" + " 'fields.id.min' = '1',\n" + " 'fields.id.max' = '100',\n" + " 'fields.word.length' = '1'\n" + ")"); // 插入数据 tableEnv.executeSql("INSERT INTO scalable_table SELECT word, '20250816' FROM word_table"); env.execute(); }}创建对应的元数据文件,mainfest,schema, snapshot 和对应的分区数据 dt=20250816

创建基本本地文件的 Paimon catalog,无主键表,定义 'bucket' = '3', 'bucket-key' = 'id'
/** * @author BigData共享 */public class AppendTable { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(10000l); env.getCheckpointConfig().setCheckpointStorage(new MemoryStateBackend(20 * 1024 * 1024)); TableEnvironment tableEnv = StreamTableEnvironment.create(env); // 创建 paimon catalog tableEnv.executeSql("CREATE CATALOG paimon_catalog WITH (\n" + " 'type'='paimon',\n" + " 'warehouse'='file:///tmp/paimon'\n" + ")");
tableEnv.executeSql("USE CATALOG paimon_catalog"); // 创建 queue table tableEnv.executeSql("CREATE TABLE IF NOT EXISTS queue_table (\n" + " id INT,\n" + " word STRING,\n" + " dt STRING\n" + ") PARTITIONED BY (dt) with (\n" + " 'bucket' = '3',\n" + " 'bucket-key' = 'id'\n" + ")"); tableEnv.executeSql("CREATE TEMPORARY TABLE IF NOT EXISTS word_table (\n" + " id INT,\n" + " word STRING\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'fields.id.kind' = 'random',\n" + " 'fields.id.min' = '1',\n" + " 'fields.id.max' = '100',\n" + " 'fields.word.length' = '1'\n" + ")"); // 插入数据 tableEnv.executeSql("INSERT INTO queue_table SELECT id, word, '20250816' FROM word_table"); env.execute(); }}创建对应的元数据文件,mainfest,schema, snapshot 和对应的分区数据 dt=20250816,分桶数据:bucket-0, bucket-1, bucket-2

三. Primary-Key 表
主键表是 Apache Paimon 中定义了主键的表类型,支持高效的 upsert 操作和变更日志生成,适合需要实时更新和复杂数据处理的应用场景。
特点:
1.1. 主键约束:主键由一组列组成,保证每条记录的唯一性。Paimon 在每个 bucket 内按主键排序,优化基于主键的查询性能。
1.2. 支持 upsert 语义:支持插入、更新和删除操作,适合处理动态变化的数据(如数据库的 binlog)。
1.3. 合并引擎:主键表通过合并引擎(Merge Engine)处理具有相同主键的记录,支持以下类型:Deduplicate/Partial Update/Aggregation/First Row。
1.4. 分桶模式:
1.5. Changelog 生成:支持生成完整的变更日志(INSERT、UPDATE、DELETE),通过 changelog-producer 配置:none/input/lookup/full-compaction。
创建基本本地文件的 Paimon catalog,定义分区键 PRIMARY KEY (dt, id) NOT ENFORCED;
/** * @author BigData共享 */public class PrimaryKeyTable { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(10000l); env.getCheckpointConfig().setCheckpointStorage(new MemoryStateBackend(20 * 1024 * 1024)); TableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.getConfig().set("table.exec.sink.upsert-materialize", "NONE"); // 创建 paimon catalog tableEnv.executeSql("CREATE CATALOG paimon_catalog WITH (\n" + " 'type'='paimon',\n" + " 'warehouse'='file:///tmp/paimon'\n" + ")"); tableEnv.executeSql("USE CATALOG paimon_catalog"); // 创建 primary-key table tableEnv.executeSql("CREATE TABLE IF NOT EXISTS primary_key_table (\n" + " id INT,\n" + " word STRING,\n" + " dt STRING,\n" + " PRIMARY KEY (dt, id) NOT ENFORCED\n" + ")\n" + "PARTITIONED BY (dt) with (\n" + " 'bucket' = '-1'\n" + ")"); tableEnv.executeSql("CREATE TEMPORARY TABLE IF NOT EXISTS word_table (\n" + " id INT,\n" + " word STRING\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'fields.id.kind' = 'random',\n" + " 'fields.id.min' = '1',\n" + " 'fields.id.max' = '100',\n" + " 'fields.word.length' = '1'\n" + ")"); // 插入数据 tableEnv.executeSql("INSERT INTO primary_key_table (SELECT id, word, '20250816' FROM word_table UNION SELECT id, word, '20250817' FROM word_table)"); env.execute(); }}创建对应的元数据文件,mainfest,schema, snapshot,需要注意的是,下面元数据文件中多了个index文件; 对应的分区数据 dt=20250816,dt=20250817

多个insert into table_xx 语句写入同一张表会有问题,可以先把多条select 语句 union 后再插入;或者表新增参数:'write-only'='true',然后再单独启动一个Dedicated Compaction Job。
后面的文章会对主键表的特性做详细分析。
更多大数据干货,欢迎关注我的微信公众号—BigData共享
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。