该应用程序如下:
我有一个大的dataframe,其中有一个' user_id‘列(每个user_id都可以出现在许多行中)。我有一个用户列表my_users,我需要分析这些用户。
Groupby、filter和聚合可能是个不错的主意,但是pyspark中包含的可用聚合功能并不适合我的需求。在pyspark中,用户定义的聚合函数仍然不完全受支持,我决定暂时不支持它。
相反,我只需迭代my_users列表,过滤数据中的每个用户,并进行分析。为了优化这个过程,我决定对my_users中的每个用户使用python多处理池。
进行分析(并传递给池)的函数有两个参数:user_id和到主dataframe的路径,我在其中执行所有的计算(PARQUET格式)。在该方法中,我加载数据,并处理它( dataframe本身不能作为参数传递)
在某些进程(每次运行过程中不同)上,我会得到各种奇怪的错误,这些错误看起来如下:

当我在没有任何多处理的情况下运行它时,一切都运行得很顺利,但速度很慢。
这些错误是从何而来的?
为了让事情变得更清楚,我会放一些代码示例:
PYSPRAK_SUBMIT_ARGS = '--driver-memory 4g --conf spark.driver.maxResultSize=3g --master local[*] pyspark-shell' #if it's relevant
# ....
def users_worker(df_path, user_id):
df = spark.read.parquet(df_path) # The problem is here!
## the analysis of user_id in df is here
def user_worker_wrapper(args):
users_worker(*args)
def analyse():
# ...
users_worker_args = [(df_path, user_id) for user_id in my_users]
users_pool = Pool(processes=len(my_users))
users_pool.map(users_worker_wrapper, users_worker_args)
users_pool.close()
users_pool.join()发布于 2017-09-27 14:22:00
事实上,正如@user6910411所评论的那样,当我将池更改为threadPool (multiprocessing.pool.ThreadPool包)时,一切正常工作,这些错误都消失了。
错误本身的根本原因现在也很清楚,如果你想让我分享它们,请在下面评论。
https://stackoverflow.com/questions/46448690
复制相似问题