首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >java_gateway.py文件在send_command中无法为较大的有效负载发送命令

java_gateway.py文件在send_command中无法为较大的有效负载发送命令
EN

Stack Overflow用户
提问于 2022-06-07 21:41:32
回答 1查看 99关注 0票数 0

对问题的描述

我目前正试图在定义的窗口(窗口的大小: 30,步骤: 1)上运行一个UDF (UDF函数名: mean_of_assets_in_win),以从代码assets_with_yields_df中调用的数据文件中运行一个列。

通常,这个UDF计算列中浮点数的平均值。当我在包含在文本文件中的800条记录的数据集上测试逻辑时,一切都很好。但是,将文本文件增加到500'000条记录会导致程序完成时出现错误。错误日志列于下:

代码语言:javascript
运行
复制
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/E:/BigData/Spark/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
SUCCESS: The process with PID 12304 (child process of PID 32872) has been terminated.
SUCCESS: The process with PID 32872 (child process of PID 16792) has been terminated.
SUCCESS: The process with PID 16792 (child process of PID 26332) has been terminated.
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64278)
Traceback (most recent call last):
  File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1193, in send_command
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1033, in send_command
  File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1196, in send_command
py4j.protocol.Py4JNetworkError: Error while sending

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 977, in _get_connection
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1115, in start
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64278)
Traceback (most recent call last):
  File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 977, in _get_connection
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1115, in start
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64278)
Traceback (most recent call last):
  File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 977, in _get_connection
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1115, in start
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64278)
Traceback (most recent call last):
  File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 977, in _get_connection
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1115, in start
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

在我看来,这与我从这个长日志中提取的东西有关。

代码语言:javascript
运行
复制
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64278)
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1193, in send_command
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
py4j.protocol.Py4JNetworkError: Error while sending

我检查了java_gateway.py中的代码,这是向JVM发送命令的函数。这里还提到

代码语言:javascript
运行
复制
self.socket.sendall(command.encode("utf-8"))

只有当远程对大型有效载荷关闭或发送RST数据包(SO_LINGER)时才会失败。如果是这样的话,我不知道如何使远程打开大的有效载荷。你知道怎么做吗?

同样令我惊讶的是,在尝试再次运行这个程序之后,我得到了下面列出的不同错误。也许这是个提示什么是不对的..。

代码语言:javascript
运行
复制
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/E:/BigData/Spark/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/07 23:15:30 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
java.net.SocketException: Connection reset by peer: socket write error
        at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
        at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
        at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
        at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
        at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:465)
        at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:285)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:295)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:295)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
22/06/07 23:15:30 ERROR PythonRunner: This may have been caused by a prior exception:
java.net.SocketException: Connection reset by peer: socket write error
        at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
        at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
        at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
        at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
        at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:465)
        at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:285)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:295)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:295)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
22/06/07 23:15:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.net.SocketException: Connection reset by peer: socket write error
        at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
        at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
        at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
        at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
        at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:465)
        at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:285)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:295)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:295)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
22/06/07 23:15:30 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "F:\Studia\02_Magisterskie\04_Semestr4\03_PSD\01_Projekt\Portfel Inwestycyjny\Portfolio-Monitor\src\so.py", line 47, in <module>
    assets_with_yields_df = spark.createDataFrame(assets_with_yields_rdd).cache()
  File "E:\BigData\Spark\python\pyspark\sql\session.py", line 605, in createDataFrame
    return self._create_dataframe(data, schema, samplingRatio, verifySchema)
  File "E:\BigData\Spark\python\pyspark\sql\session.py", line 628, in _create_dataframe
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "E:\BigData\Spark\python\pyspark\sql\session.py", line 425, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio, names=schema)
  File "E:\BigData\Spark\python\pyspark\sql\session.py", line 396, in _inferSchema
    first = rdd.first()
  File "E:\BigData\Spark\python\pyspark\rdd.py", line 1464, in first
    rs = self.take(1)
  File "E:\BigData\Spark\python\pyspark\rdd.py", line 1446, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "E:\BigData\Spark\python\pyspark\context.py", line 1120, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1304, in __call__
  File "E:\BigData\Spark\python\pyspark\sql\utils.py", line 128, in deco
    return f(*a, **kw)
  File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, DESKTOP-5SER7G7, executor driver): java.net.SocketException: Connection reset by peer: socket write error
        at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
        at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
        at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
        at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
        at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:465)
        at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:285)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:295)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:295)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
        at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:154)
        at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
        at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
        at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
        at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
        at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
        at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:465)
        at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:285)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:295)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:295)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)


F:\Studia\02_Magisterskie\04_Semestr4\03_PSD\01_Projekt\Portfel Inwestycyjny\Portfolio-Monitor>SUCCESS: The process with PID 22172 (child process of PID 25264) has been terminated.
SUCCESS: The process with PID 25264 (child process of PID 33120) has been terminated.
SUCCESS: The process with PID 33120 (child process of PID 26728) has been terminated. 

我是怎么解决这些问题的?

为了解决这个问题,我尝试更改Spark会话配置:

通过更改驱动程序的memory

  • enabling offHeap的大小与其内存allocated

  • enabling动态分配执行器

来实现

当前,已启用的选项在代码中可见。不幸的是,更改配置没有帮助,而且问题仍然存在。我也试图改变窗口的大小,但也没有帮助。

问题

你知道如何解决这些问题吗?如何使远程打开更大的有效载荷?

代码

代码语言:javascript
运行
复制
import os
findspark.init('E:\BigData\Spark')

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import pandas_udf 
import pandas as pd
from pyspark.sql.window import Window
import pyspark.sql.functions as F

DATA_FILE_PATHNAME = '\\'.join(os.path.dirname(__file__).split('\\')[:-1])+'\\'+'data'+'\\'+'yields_with_rounded_numbers_full_dataset.txt'
METRICS_FILE_PATHNAME = '\\'.join(os.path.dirname(__file__).split('\\')[:-1])+'\\'+'data'+'\\'+'metrics.txt'
ASSETS_NUM = 6
NUM_OF_RECORDS_PER_ASSET = 500000
NUM_OF_DATA_IN_FULL_PORTFOLIO = 6*NUM_OF_RECORDS_PER_ASSET
WINDOW_SIZE = 30


def mapper(line):
   fields = line.split()
   return Row(ID=int(fields[0]),asset_1 = float(fields[1]), asset_2 = float(fields[2]), asset_3 = float(fields[3]),asset_4 = float(fields[4]), asset_5 = float(fields[5]), asset_6 = float(fields[6]), grouper = 1)


if __name__ == "__main__":
   # Spark config
   spark = SparkSession.builder.master('local[*]')\
                               .config('spark.sql.execution.arrow.pyspark.enabled', True) \
                               .config('spark.sql.session.timeZone', 'UTC') \
                               .config('spark.driver.memory','30g') \
                               .config("spark.eventLog.enabled","true")\
                               .config('spark.ui.showConsoleProgress', True) \
                               .config("spark.memory.offHeap.enabled",True)\
                               .config("spark.memory.offHeap.size","10g") \
                               .config('spark.sql.repl.eagerEval.enabled', True).appName("SparkSQL").getOrCreate()
                               # .config("spark.dynamicAllocation.enabled", "true") \
                               # .config("spark.executor.cores", 4) \
                               # .config("spark.dynamicAllocation.minExecutors","1") \
                               # .config("spark.dynamicAllocation.maxExecutors","5") \
                                                               # .config("spark.executor.memory", "10g")\

                               #.config("spark.executor.instances", 4)\
                               #.config("spark.executor.memory", "1g")\
                               
   lines = spark.sparkContext.textFile(DATA_FILE_PATHNAME, minPartitions = 20)
   # map values for columns 
   assets_with_yields_rdd = lines.map(mapper)          
   assets_with_yields_df = spark.createDataFrame(assets_with_yields_rdd).cache()

   @pandas_udf("double")
   def mean_of_assets_in_win(asset: pd.Series) -> float:
       asset_mean = asset.mean()
       return asset_mean

  
   # WINDOWING
   # sliding - window settings
   sliding_window = Window.orderBy(F.col("ID")).rowsBetween(Window.currentRow, WINDOW_SIZE-1)
   a1_mean_win = assets_with_yields_df.select(mean_of_assets_in_win('asset_1').over(sliding_window)).collect()

   spark.stop()
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-06-08 09:23:20

如果没有特定的理由只为计算平均值而使用自定义Pandas,请考虑使用standard提供的avg函数。在我的例子中,这个简化的代码在1GB堆内存上执行需要5秒:

代码语言:javascript
运行
复制
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

WINDOW_SIZE = 30
spark = SparkSession.builder.getOrCreate()

assets_with_yields_df = spark.range(500000) \
        .withColumn('asset_1', F.col('id') * 10)

sliding_window = Window.orderBy(F.col("ID")).rowsBetween(Window.currentRow, WINDOW_SIZE-1)
a1_mean_win = assets_with_yields_df.select(F.avg('asset_1').over(sliding_window).alias('windowed_mean'))
a1_mean_win.write.parquet('results', mode='overwrite')
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72537835

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档