首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >从显示OutOfMemoryError: Java堆空间的大型Pyspark创建字典

从显示OutOfMemoryError: Java堆空间的大型Pyspark创建字典
EN

Stack Overflow用户
提问于 2020-07-27 06:28:29
回答 1查看 268关注 0票数 3

我见过并尝试过许多关于这个问题的existing StackOverflow帖子,但都没有效果。我想我的JAVA堆空间并不像我的大型数据集预期的那么大,My包含650万行。我的Linux实例包含64 My的Ram,4核。根据这个suggestion,我需要修复我的代码,但是我认为用pyspark编写一个字典应该不太昂贵。如果有其他计算方法,请告诉我。

我只想用我的pyspark dataframe制作一本python字典,这是我的pyspark dataframe的内容,

property_sql_df.show()显示,

代码语言:javascript
运行
复制
+--------------+------------+--------------------+--------------------+
|            id|country_code|       name|          hash_of_cc_pn_li|
+--------------+------------+--------------------+--------------------+
|  BOND-9129450|          US|Scotron Home w/Ga...|90cb0946cf4139e12...|
|  BOND-1742850|          US|Sited in the Mead...|d5c301f00e9966483...|
|  BOND-3211356|          US|NEW LISTING - Com...|811fa26e240d726ec...|
|  BOND-7630290|          US|EC277- 9 Bedroom ...|d5c301f00e9966483...|
|  BOND-7175508|          US|East Hampton Retr...|90cb0946cf4139e12...|
+--------------+------------+--------------------+--------------------+

我想要的是制作一个字典,其中hash_of_cc_pn_li作为,id作为作为列表值。

预期输出

代码语言:javascript
运行
复制
{
  "90cb0946cf4139e12": ["BOND-9129450", "BOND-7175508"]
  "d5c301f00e9966483": ["BOND-1742850","BOND-7630290"]
}

到目前为止我已经尝试过了,

方法1:导致java.lang.OutOfMemoryError: Java堆空间

代码语言:javascript
运行
复制
%%time
duplicate_property_list = {}
for ind in property_sql_df.collect(): 
     hashed_value = ind.hash_of_cc_pn_li
     property_id = ind.id
     if hashed_value in duplicate_property_list:
         duplicate_property_list[hashed_value].append(property_id) 
     else:
         duplicate_property_list[hashed_value] = [property_id] 

方法2:由于缺少火花放电的本机偏移而无法工作。

代码语言:javascript
运行
复制
%%time
i = 0
limit = 1000000
for offset in range(0, total_record,limit):
    i = i + 1
    if i != 1:
        offset = offset + 1
        
    duplicate_property_list = {}
    duplicate_properties = {}
    
    # Preparing dataframe
    url = '''select id, hash_of_cc_pn_li from properties_df LIMIT {} OFFSET {}'''.format(limit,offset)  
    properties_sql_df = spark.sql(url)
    
    # Grouping dataset
    rows = properties_sql_df.groupBy("hash_of_cc_pn_li").agg(F.collect_set("id").alias("ids")).collect()
    duplicate_property_list = { row.hash_of_cc_pn_li: row.ids for row in rows }
    
    # Filter a dictionary to keep elements only where duplicate cound
    duplicate_properties = filterTheDict(duplicate_property_list, lambda elem : len(elem[1]) >=2)
    
    # Writing to file
    with open('duplicate_detected/duplicate_property_list_all_'+str(i)+'.json', 'w') as fp:
        json.dump(duplicate_property_list, fp)

我现在在控制台上得到的:

java.lang.OutOfMemoryError: Java堆空间

并在木星笔记本上显示此错误输出

代码语言:javascript
运行
复制
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33097)

,这是我在这里提出的后续问题: Creating dictionary from Pyspark dataframe showing OutOfMemoryError: Java heap space

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-07-27 08:48:44

为什么不把这么多的数据和处理保存在执行者中,而不是收集给驱动程序呢?如果我正确理解了这一点,您可以使用pyspark转换和聚合并直接保存到JSON,因此利用执行器,然后将该JSON文件(可能是分区的)作为字典加载回Python中。诚然,您引入了IO开销,但这应该允许您避免OOM堆空间错误。一步步地:

代码语言:javascript
运行
复制
import pyspark.sql.functions as f


spark = SparkSession.builder.getOrCreate()
data = [
    ("BOND-9129450", "90cb"),
    ("BOND-1742850", "d5c3"),
    ("BOND-3211356", "811f"),
    ("BOND-7630290", "d5c3"),
    ("BOND-7175508", "90cb"),
]
df = spark.createDataFrame(data, ["id", "hash_of_cc_pn_li"])

df.groupBy(
    f.col("hash_of_cc_pn_li"),
).agg(
    f.collect_set("id").alias("id")  # use f.collect_list() here if you're not interested in deduplication of BOND-XXXXX values
).write.json("./test.json")

检查输出路径:

代码语言:javascript
运行
复制
ls -l ./test.json

-rw-r--r-- 1 jovyan users  0 Jul 27 08:29 part-00000-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
-rw-r--r-- 1 jovyan users 50 Jul 27 08:29 part-00039-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
-rw-r--r-- 1 jovyan users 65 Jul 27 08:29 part-00043-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
-rw-r--r-- 1 jovyan users 65 Jul 27 08:29 part-00159-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
-rw-r--r-- 1 jovyan users  0 Jul 27 08:29 _SUCCESS
_SUCCESS

dict的形式加载到Python

代码语言:javascript
运行
复制
import json
from glob import glob

data = []
for file_name in glob('./test.json/*.json'):
    with open(file_name) as f:
        try:
            data.append(json.load(f))
        except json.JSONDecodeError:  # there is definitely a better way - this is here because some partitions might be empty
            pass

最后

代码语言:javascript
运行
复制
{item['hash_of_cc_pn_li']:item['id'] for item in data}

{'d5c3': ['BOND-7630290', 'BOND-1742850'],
 '811f': ['BOND-3211356'],
 '90cb': ['BOND-9129450', 'BOND-7175508']}

我希望这能帮到你!谢谢你的好问题!

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63109775

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档