首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Py4JJavaError:调用o57.showString时出错。:org.apache.spark.SparkException:

Py4JJavaError:调用o57.showString时出错。:org.apache.spark.SparkException:
EN

Stack Overflow用户
提问于 2019-06-12 22:07:12
回答 1查看 4K关注 0票数 0

我正在使用连接到AWS实例(r5d.xlarge 4 vCPU 32 GiB)的pyspark,运行一个25 GB的数据库,当我运行一些表时,我得到了错误:

Py4JJavaError:调用o57.showString时出错。用法: org.apache.spark.SparkException:由于阶段失败而中止作业:阶段0.0中的任务0失败1次,最近一次失败:丢失阶段0.0中的任务0.0 (TID 0,localhost,执行器驱动程序):java.lang.OutOfMemoryError:超出GC开销限制

我试图自己找出错误,但不幸的是,关于这个问题的信息并不多。

代码

代码语言:javascript
复制
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').\
     config('spark.jars.packages', 'mysql:mysql-connector-java:5.1.44').\
     appName('test').getOrCreate()

df = spark.read.format('jdbc').\
        option('url', 'jdbc:mysql://xx.xxx.xx.xxx:3306').\
        option('driver', 'com.mysql.jdbc.Driver').\
        option('user', 'xxxxxxxxxxx').\
        option('password', 'xxxxxxxxxxxxxxxxxxxx').\
        option('dbtable', 'dbname.tablename').\
        load()

  df.printSchema()

在这里,我得到了printSchema,但随后:

代码语言:javascript
复制
df_1 = df.select(['col1', 'col2', 'col3', 'col4', 
                  'col4', 'col5', 'col6']).show()

Py4JJavaError: An error occurred while calling o57.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task            
  in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 
  0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: GC 
  overhead limit exceeded

有人知道我该怎么解决这个问题吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-06-20 08:34:49

这是一种跨多个spark workers并行执行串行JDBC读取的方法...您可以将此作为指南,根据您的源数据对其进行自定义...基本上,主要的前提条件是有某种唯一的键来拆分。

请参阅此文档,特别是参数partitionColumn, lowerBound, upperBound, numPartitions

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

一些代码示例:

代码语言:javascript
复制
# find min and max for column used to split on
from pyspark.sql.functions import min, max

minDF = df.select(min("id")).first()[0] # replace 'id' with your key col
maxDF = df.select(max("id")).first()[0] # replace 'id' with your key col
numSplits = 125 # you will need to tailor this value to your dataset ... you mentioned your source as 25GB so try 25000 MB / 200 MB = 125 partitions

print("df min: {}\df max: {}".format(minDF, maxDF))

# your code => add a few more parameters
df = spark.read.format('jdbc').\
        option('url', 'jdbc:mysql://xx.xxx.xx.xxx:3306').\
        option('driver', 'com.mysql.jdbc.Driver').\
        option('user', 'xxxxxxxxxxx').\
        option('password', 'xxxxxxxxxxxxxxxxxxxx').\
        option('dbtable', 'dbname.tablename').\
        option('partitionColumn', 'id').\ # col to split on
        option('lowerBound', minDF).\ # min value
        option('upperBound', maxDF).\ # max value
        option('numPartitions', numSplits).\ # num of splits (partitions) spark will distribute across executor workers
        load()

print(df.rdd.getNumPartitions())

另一个连接字符串示例=>如果您使用的是Spark2.4/请参考本文档,它使用了一些更干净的代码

https://docs.databricks.com/spark/latest/data-sources/sql-databases.html#manage-parallelism

代码语言:javascript
复制
sourceDF = spark.read.jdbc(
  url=jdbcUrl, 
  table="dbname.tablename",
  column='"id"',
  lowerBound=minDF,
  upperBound=maxDF,
  numPartitions=125,
  properties=connectionProps
)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56564160

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档