前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PySpark数据类型转换异常分析

PySpark数据类型转换异常分析

作者头像
Fayson
修改2018-04-01 19:20:26
5K0
修改2018-04-01 19:20:26
举报
文章被收录于专栏:Hadoop实操Hadoop实操

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

1.问题描述


在使用PySpark的SparkSQL读取HDFS的文本文件创建DataFrame时,在做数据类型转换时会出现一些异常,如下:

1.在设置Schema字段类型为DoubleType,抛“name 'DoubleType' is not defined”异常;

2.将读取的数据字段转换为DoubleType类型时抛“Double Type can not accept object u'23' in type <type 'unicode'>”异常;

3.将字段定义为StringType类型,SparkSQL也可以对数据进行统计如sum求和,非数值的数据不会被统计。

具体异常如下:

异常一:

代码语言:txt
复制
NameError: name 'DoubleType' is not defined
NameErrorTraceback (most recent call last)
in engine
1 schema = StructType([StructField("person_name", StringType(), False),
                       ----> 2                     StructField("person_age", DoubleType(), False)])

NameError: name 'DoubleType' is not defined

异常二:

代码语言:txt
复制
Py4JJavaError: An error occurred while calling o152.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-172-31-26-102.ap-southeast-1.compute.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/sql/session.py", line 509, in prepare
verify_func(obj, schema)
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1360, in _verify_type
_verify_type(v, f.dataType, f.nullable)
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1324, in _verify_type
raise TypeError("%s can not accept object %r in type %s" % (dataType, obj, type(obj)))
TypeError: DoubleType can not accept object u'23' in type <type 'unicode'>

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)

2.解决方法


  • 异常一:
代码语言:txt
复制
NameError: name 'DoubleType' is not defined

问题原因:

由于在Python代码中未引入pyspark.sql.types为DoubleType的数据类型导致

解决方法:

代码语言:txt
复制
from pyspark.sql.types import *

或者

代码语言:txt
复制
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType, DoubleType
  • 异常二:
代码语言:txt
复制
TypeError: DoubleType can not accept object u'23' in type <type 'unicode'>
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)

问题原因:

由于Python默认的字符编码集为unicode,如果需要将字段转为Double类型,则需要进行转换。

解决方法:

代码语言:txt
复制
# Schema with two fields - person_name and person_age
schema = StructType([StructField("person_name", StringType(), False),
                     StructField("person_age", DoubleType(), False)])

lines = spark.read.text("/tmp/test/").rdd \
    .map(lambda x:x[0].split(",")) \
    .map(lambda x: (x[0], float(x[1])))

增加标红部分代码,将需要转换的字段转换为float类型。

转换完成后代码正常运行。

SparkSQL和DataFrame支持的数据类型参考官网:http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types

3.总结


1.在上述测试代码中,如果x1列的数据中有空字符串或者非数字字符串则会导致转换失败,因此在指定字段数据类型的时候,如果数据中存在“非法数据”则需要对数据进行剔除,否则不能正常执行。

测试数据如下:

代码执行报错如下:

代码语言:txt
复制
Py4JJavaError: An error occurred while calling o291.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-172-31-26-102.ap-southeast-1.compute.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-1-29d7e609de57>", line 1, in <lambda>
    ValueError: invalid literal for float(): 23q

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)

2.若不对“非法数据”进行剔除,则需要将该字段数据类型定义为StringType,可以正常对字段进行统计,对于非数字的数据则不进行统计。

测试数据:

醉酒鞭名马,少年多浮夸! 岭南浣溪沙,呕吐酒肆下!挚友不肯放,数据玩的花! 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。


推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-10-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Hadoop实操 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档