首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >火花多处理(PySpark)

火花多处理(PySpark)
EN

Stack Overflow用户
提问于 2017-09-27 13:05:35
回答 1查看 13.1K关注 0票数 12

该应用程序如下:

我有一个大的dataframe,其中有一个' user_id‘列(每个user_id都可以出现在许多行中)。我有一个用户列表my_users,我需要分析这些用户。

Groupbyfilter聚合可能是个不错的主意,但是pyspark中包含的可用聚合功能并不适合我的需求。在pyspark中,用户定义的聚合函数仍然不完全受支持,我决定暂时不支持它。

相反,我只需迭代my_users列表,过滤数据中的每个用户,并进行分析。为了优化这个过程,我决定对my_users中的每个用户使用python多处理池

进行分析(并传递给池)的函数有两个参数:user_id和到主dataframe的路径,我在其中执行所有的计算(PARQUET格式)。在该方法中,我加载数据,并处理它( dataframe本身不能作为参数传递)

在某些进程(每次运行过程中不同)上,我会得到各种奇怪的错误,这些错误看起来如下:

  • PythonUtils不存在于JVM中(当读取'parquet‘数据格式时)

  • KeyError:“C”没有找到(同时,在阅读“拼花”数据时也是如此。“c”到底是什么?

当我在没有任何多处理的情况下运行它时,一切都运行得很顺利,但速度很慢。

这些错误是从何而来的?

为了让事情变得更清楚,我会放一些代码示例:

代码语言:javascript
复制
PYSPRAK_SUBMIT_ARGS = '--driver-memory 4g --conf spark.driver.maxResultSize=3g --master local[*] pyspark-shell' #if it's relevant

# ....

def users_worker(df_path, user_id):
    df = spark.read.parquet(df_path) # The problem is here!
    ## the analysis of user_id in df is here

def user_worker_wrapper(args):
    users_worker(*args)

def analyse():
    # ...
    users_worker_args = [(df_path, user_id) for user_id in my_users]
    users_pool = Pool(processes=len(my_users))
    users_pool.map(users_worker_wrapper, users_worker_args)
    users_pool.close()
    users_pool.join()
EN

Stack Overflow用户

发布于 2017-09-27 14:22:00

事实上,正如@user6910411所评论的那样,当我将池更改为threadPool (multiprocessing.pool.ThreadPool包)时,一切正常工作,这些错误都消失了。

错误本身的根本原因现在也很清楚,如果你想让我分享它们,请在下面评论。

票数 9
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46448690

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档