前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkSQL真的不支持存储NullType类型数据到Parquet吗?

SparkSQL真的不支持存储NullType类型数据到Parquet吗?

作者头像
大数据学习与分享
发布2020-11-03 11:27:23
2.6K0
发布2020-11-03 11:27:23
举报
文章被收录于专栏:大数据学习与分享

最近后台有小伙伴提了一些实际工作中使用Spark遇到的问题,笔者挑选了几个相对常见的问题,分别从场景模拟/问题现象、问题分析、解决方案三个层面,来深入分析这些问题,并且提供一个解决类似问题的思路。

>> 问题1

使用SparkSQL(2.4版本)往存储格式为parquet的Hive分区表中存储NullType类型的数据时报错:

代码语言:javascript
复制
org.apache.spark.sql.AnalysisException: Parquet data source does not support null data type.

虽然在Stack OverFlow上找到了类似的问题,但没有具体阐明到底是什么原因导致了这种问题以及如何解决?

1. 场景模拟

1)创建temp view:test_view

代码语言:javascript
复制
sparkSession.sql(
      """
        |select 1 as id, null as name
      """.stripMargin
      ).createOrReplaceTempView("test_view")

2)打印test_view的schema信息

代码语言:javascript
复制
-- id为integer类型,name对应到Spark SQL内部字段数据类型即位NullType
root
 |-- id: integer (nullable = false)
 |-- name: null (nullable = true)

3)将test_tab中数据存入Hive分区表test_partition_tab的分区partitionCol=20201009中

代码语言:javascript
复制
df.write.mode(SaveMode.Overwrite).format("parquet").save("/bigdatalearnshare/test_partition_tab/partitionCol=20201009")

4)报错信息

2. 问题分析

根据报错信息,提示Parquet数据源不支持null type类型的数据。既然是保存数据,我们很容易联想到FileFormatWriter,再结合错误信息:

代码语言:javascript
复制
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:100

对应的源码为:

代码语言:javascript
复制
DataSourceUtils.verifyWriteSchema(fileFormat, dataSchema)

debug进去,看看这个方法究竟干了什么?

根据源码分析可知,上述程序中SparkSQL在保存数据时会对数据的schema进行校验,并且不同的存储格式(parquet、csv、json等)支持的数据类型会有所不同,以parquet为例,查看源码:

3. 解决方案

代码语言:javascript
复制
-- 使用insert sql进行数据的保存
insert overwrite table test_partition_tab partition(partitionCol=20201009) select * from test_view;

>> 问题2

1. 问题现象

在利用Spark和Kafka处理数据时,同时在maven pom中引入Spark和Kafka的相关依赖。但是当利用SparkSQL处理数据生成的DataSet/DataFrame进行collect或者show等操作时,抛出以下异常信息:

代码语言:javascript
复制
in stage 3.0 (TID 403, localhost, executor driver): java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V
  at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)
  at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)
  at org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:124)
  at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
  at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:453)
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)

2. 问题分析

错误信息提示找不到方法:

代码语言:javascript
复制
net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V

根据经验,找不到某个方法,一般主要有两个原因造成:

  • 没有相应的jar包依赖
  • jar包依赖冲突

经过排查发现导致本问题发生的原因是:Spark内部使用的包net.jpountz.lz4和Kafka中包产生冲突

3. 解决方案

排除Kafka中net.jpountz.lz4的依赖包:

代码语言:javascript
复制
  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.10.0.0</version>
      <exclusions>
          <exclusion>
             <groupId>net.jpountz.lz4</groupId>
             <artifactId>lz4</artifactId>
          </exclusion>
      </exclusions>
  </dependency>

>> 问题3

通过SparkSQL,对两个存在map类型字段的Hive表进行union操作,报如下错误:

代码语言:javascript
复制
org.apache.spark.sql.AnalysisException: Cannot have map type columns in DataFrame which calls set operations(intersect, except, etc.), but the type of column map is map<string,string>;

1. 场景模拟

1)通过函数str_to_map/map生成map类型的字段,然后进行union操作

代码语言:javascript
复制
select 1 id, str_to_map("k1:v1,k2:v2") map

union

select 2 id, map("k1","v1","k2","v2") map

2)报错信息

代码语言:javascript
复制
org.apache.spark.sql.AnalysisException: Cannot have map type columns in DataFrame which calls set operations(intersect, except, etc.), but the type of column map is map<string,string>;;
Distinct
+- Union
   :- Project [1 AS id#116, str_to_map(k1:v1,k2:v2, ,, :) AS map#117]
   :  +- OneRowRelation
   +- Project [2 AS id#118, map(k1, v1, k2, v2) AS map#119]
      +- OneRowRelation

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:364)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:85)

2. 问题分析

根据报错信息,我们查看org.apache.spark.sql.catalyst.analysis.CheckAnalysis的checkAnalysis方法,第362行源码处理逻辑(错误信息是不是很熟悉呢?):

关键看mapColumnInSetOperation中对逻辑计划的匹配:

针对逻辑计划中有Intersect、Except、Distinct的output"返回"的属性(Attribute)有map类型,或者Deduplicate的keys(也是Attribute)包含map字段类型,都会导致上述问题。

而union导致上述报错,是因为union会对结果去重,即distinct

3. 解决方案

询问后台小伙伴儿,目前的业务场景是考验不需要去重处理的。

那么我们都知道,union和union all的主要区别就是,前者会对结果去重,后者则不会。那么将union改为union all就好了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-10-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据学习与分享 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档