前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >解决hudi hms catalog中flink建表,spark无法写入问题

解决hudi hms catalog中flink建表,spark无法写入问题

作者头像
从大数据到人工智能
发布2022-10-31 17:19:13
1.3K0
发布2022-10-31 17:19:13
举报
文章被收录于专栏:大数据-BigData大数据-BigData

问题描述

在hudi 0.12.0版本,flink和spark都可以基于hive metastore进行元数据管理,更多信息可参考:hudi HMS Catalog指南。也就是说基于hudi hms catalog,flink建表之后,flink或者spark都可以写,或者spark建表之后,spark或者flink都可以写。但是目前 hudi 0.12.0版本中存在一个问题,当使用flink hms catalog建hudi表之后,spark sql结合spark hms catalog将hive数据进行批量导入时存在无法导入的情况,具体复现方式与版本如下:

hudi 0.12.0

flink 1.13.6

spark 3.3.0

hive (HDP 3.1.4版本)

flink sql建表

代码语言:javascript
复制
create catalog hudi with(
'type' = 'hudi',
'mode' = 'hms',
'hive.conf.dir'='/etc/hive/conf'
);

--- 创建数据库供hudi使用
create database hudi.hudidb;

CREATE TABLE IF NOT EXISTS hudi.hudidb.store_returns_hudi_4(
  sr_returned_date_sk BIGINT NOT NULL,
  sr_return_time_sk BIGINT,
  sr_item_sk BIGINT,
  sr_customer_sk BIGINT,
  sr_cdemo_sk BIGINT,
  sr_hdemo_sk BIGINT,
  sr_addr_sk BIGINT,
  sr_store_sk BIGINT,
  sr_reason_sk BIGINT,
  sr_ticket_number BIGINT,
  sr_return_quantity INT,
  sr_return_amt DOUBLE,
  sr_return_tax DOUBLE,
  sr_return_amt_inc_tax DOUBLE,
  sr_fee DOUBLE,
  sr_return_ship_cost DOUBLE,
  sr_refunded_cash DOUBLE,
  sr_reversed_charge DOUBLE,
  sr_store_credit DOUBLE,
  sr_net_loss DOUBLE,
  insert_time STRING,
PRIMARY KEY(sr_returned_date_sk) NOT ENFORCED
)
 with (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ', 
  'hive_sync.conf.dir' = '/etc/hive/conf',
  'write.precombine.field' = 'sr_ticket_number',
  'hoodie.datasource.write.hive_style_partitioning'='false',
  'index.bootstrap.enabled' = 'true'
  );

hive中建表以及导入数据:

代码语言:javascript
复制
CREATE TABLE store_returns (
  sr_returned_date_sk BIGINT,
  sr_return_time_sk BIGINT,
  sr_item_sk BIGINT,
  sr_customer_sk BIGINT,
  sr_cdemo_sk BIGINT,
  sr_hdemo_sk BIGINT,
  sr_addr_sk BIGINT,
  sr_store_sk BIGINT,
  sr_reason_sk BIGINT,
  sr_ticket_number BIGINT,
  sr_return_quantity INT,
  sr_return_amt DOUBLE,
  sr_return_tax DOUBLE,
  sr_return_amt_inc_tax DOUBLE,
  sr_fee DOUBLE,
  sr_return_ship_cost DOUBLE,
  sr_refunded_cash DOUBLE,
  sr_reversed_charge DOUBLE,
  sr_store_credit DOUBLE,
  sr_net_loss DOUBLE)
TBLPROPERTIES (
  'bucketing_version' = '2',
  'transient_lastDdlTime' = '1658370656');

  insert into store_returns select 1,29496,26309,41563,325271,1894,196917,86,6,240001,43,79.55,4.77,84.32,63.0,212.42,62.84,0.83,15.88,280.19;

使用spark sql导入数据

代码语言:javascript
复制
insert into hudidb.store_returns_hudi_4(
  sr_returned_date_sk,
  sr_return_time_sk,
  sr_item_sk,
  sr_customer_sk,
  sr_cdemo_sk,
  sr_hdemo_sk,
  sr_addr_sk,
  sr_store_sk,
  sr_reason_sk,
  sr_ticket_number,
  sr_return_quantity,
  sr_return_amt,
  sr_return_tax,
  sr_return_amt_inc_tax,
  sr_fee,
  sr_return_ship_cost,
  sr_refunded_cash,
  sr_reversed_charge,
  sr_store_credit,
  sr_net_loss,
  insert_time
   )
   select   sr_returned_date_sk,
  sr_return_time_sk,
  sr_item_sk,
  sr_customer_sk,
  sr_cdemo_sk,
  sr_hdemo_sk,
  sr_addr_sk,
  sr_store_sk,
  sr_reason_sk,
  sr_ticket_number,
  sr_return_quantity,
  sr_return_amt,
  sr_return_tax,
  sr_return_amt_inc_tax,
  sr_fee,
  sr_return_ship_cost,
  sr_refunded_cash,
  sr_reversed_charge,
  sr_store_credit,
  sr_net_loss,cast(current_timestamp() as string);

具体报错如下:

代码语言:javascript
复制
Error: org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.spark.sql.AnalysisException: Cannot write incompatible data to table 'hudidb.store_returns_hudi_4':
- Cannot write nullable values to non-null column 'sr_returned_date_sk'
        at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:43)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:325)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:230)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:230)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:225)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:239)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.AnalysisException: Cannot write incompatible data to table 'hudidb.store_returns_hudi_4':
- Cannot write nullable values to non-null column 'sr_returned_date_sk'
        at org.apache.spark.sql.errors.QueryCompilationErrors$.cannotWriteIncompatibleDataToTableError(QueryCompilationErrors.scala:1535)
        at org.apache.spark.sql.catalyst.analysis.TableOutputResolver$.resolveOutputColumns(TableOutputResolver.scala:59)
        at org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.resolveOutputColumns(HoodieSpark3CatalystPlanUtils.scala:38)
        at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.coerceQueryOutputColumns(InsertIntoHoodieTableCommand.scala:159)
        at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.alignQueryOutput(InsertIntoHoodieTableCommand.scala:142)
        at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:99)
        at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:60)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:291)
        ... 16 more (state=,code=0)

问题分析

通过分析代码以及查看表属性,发现flink建表对应的hive metastore中spark.sql.sources.schema.part.0配置对应的value中字段sr_returned_date_sk的nullable属性为false,而如果通过spark建上述表的话,该字段属性是true的。

可判断flink在创建hive metastore中创建hudi表时,构建的给spark用的参数存在问题,也就是对应

HoodieHiveCatalog.instantiateHiveTable中的

serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys));

其中translateFlinkTableProperties2Spark方法如下

代码语言:javascript
复制
  public static Map<String, String> translateFlinkTableProperties2Spark(
      CatalogTable catalogTable,
      Configuration hadoopConf,
      Map<String, String> properties,
      List<String> partitionKeys) {
    Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
    MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
    String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
    Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(
        partitionKeys,
        sparkVersion,
        4000,
        messageType);
    properties.putAll(sparkTableProperties);
    return properties.entrySet().stream()
        .filter(e -> KEY_MAPPING.containsKey(e.getKey()) && !catalogTable.getOptions().containsKey(KEY_MAPPING.get(e.getKey())))
        .collect(Collectors.toMap(e -> KEY_MAPPING.get(e.getKey()),
            e -> e.getKey().equalsIgnoreCase(FlinkOptions.TABLE_TYPE.key()) ? VALUE_MAPPING.get(e.getValue()) : e.getValue()));
  }

于是,我们可以考虑将spark.sql.sources.schema.part.0对应的value中字段的nullable属性改为true,即对上述方法进行如下修改即可:

代码语言:javascript
复制
  public static Map<String, String> translateFlinkTableProperties2Spark(
      CatalogTable catalogTable,
      Configuration hadoopConf,
      Map<String, String> properties,
      List<String> partitionKeys) {
    Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
    MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
    String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
    Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(
        partitionKeys,
        sparkVersion,
        4000,
        messageType);
    properties.putAll(replaceSparkTableProperties(sparkTableProperties));
    return properties.entrySet().stream()
        .filter(e -> KEY_MAPPING.containsKey(e.getKey()) && !catalogTable.getOptions().containsKey(KEY_MAPPING.get(e.getKey())))
        .collect(Collectors.toMap(e -> KEY_MAPPING.get(e.getKey()),
            e -> e.getKey().equalsIgnoreCase(FlinkOptions.TABLE_TYPE.key()) ? VALUE_MAPPING.get(e.getValue()) : e.getValue()));
  }

  public static Map<String, String> replaceSparkTableProperties(Map<String, String> originProperties) {
    for (String key : originProperties.keySet()) {
      if (key.contains("spark.sql.sources.schema.part.")) {
        originProperties.put(key, originProperties.get(key)
                .replaceAll(" ", "").replaceAll("\"nullable\":false", "\"nullable\":true"));
      }
    }
    return originProperties;
  }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题描述
  • 问题分析
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档