我正在使用连接到AWS实例(r5d.xlarge 4 vCPU 32 GiB)的pyspark,运行一个25 GB的数据库,当我运行一些表时,我得到了错误:
Py4JJavaError:调用o57.showString时出错。用法: org.apache.spark.SparkException:由于阶段失败而中止作业:阶段0.0中的任务0失败1次,最近一次失败:丢失阶段0.0中的任务0.0 (TID 0,localhost,执行器驱动程序):java.lang.OutOfMemoryError:超出GC开销限制
我试图自己找出错误,但不幸的是,关于这个问题的信息并不多。
代码
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').\
config('spark.jars.packages', 'mysql:mysql-connector-java:5.1.44').\
appName('test').getOrCreate()
df = spark.read.format('jdbc').\
option('url', 'jdbc:mysql://xx.xxx.xx.xxx:3306').\
option('driver', 'com.mysql.jdbc.Driver').\
option('user', 'xxxxxxxxxxx').\
option('password', 'xxxxxxxxxxxxxxxxxxxx').\
option('dbtable', 'dbname.tablename').\
load()
df.printSchema()
在这里,我得到了printSchema,但随后:
df_1 = df.select(['col1', 'col2', 'col3', 'col4',
'col4', 'col5', 'col6']).show()
Py4JJavaError: An error occurred while calling o57.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: GC
overhead limit exceeded
有人知道我该怎么解决这个问题吗?
发布于 2019-06-20 08:34:49
这是一种跨多个spark workers
并行执行串行JDBC读取的方法...您可以将此作为指南,根据您的源数据对其进行自定义...基本上,主要的前提条件是有某种唯一的键来拆分。
请参阅此文档,特别是参数partitionColumn, lowerBound, upperBound, numPartitions
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
一些代码示例:
# find min and max for column used to split on
from pyspark.sql.functions import min, max
minDF = df.select(min("id")).first()[0] # replace 'id' with your key col
maxDF = df.select(max("id")).first()[0] # replace 'id' with your key col
numSplits = 125 # you will need to tailor this value to your dataset ... you mentioned your source as 25GB so try 25000 MB / 200 MB = 125 partitions
print("df min: {}\df max: {}".format(minDF, maxDF))
# your code => add a few more parameters
df = spark.read.format('jdbc').\
option('url', 'jdbc:mysql://xx.xxx.xx.xxx:3306').\
option('driver', 'com.mysql.jdbc.Driver').\
option('user', 'xxxxxxxxxxx').\
option('password', 'xxxxxxxxxxxxxxxxxxxx').\
option('dbtable', 'dbname.tablename').\
option('partitionColumn', 'id').\ # col to split on
option('lowerBound', minDF).\ # min value
option('upperBound', maxDF).\ # max value
option('numPartitions', numSplits).\ # num of splits (partitions) spark will distribute across executor workers
load()
print(df.rdd.getNumPartitions())
另一个连接字符串示例=>如果您使用的是Spark2.4/请参考本文档,它使用了一些更干净的代码
https://docs.databricks.com/spark/latest/data-sources/sql-databases.html#manage-parallelism
sourceDF = spark.read.jdbc(
url=jdbcUrl,
table="dbname.tablename",
column='"id"',
lowerBound=minDF,
upperBound=maxDF,
numPartitions=125,
properties=connectionProps
)
https://stackoverflow.com/questions/56564160
复制相似问题