对问题的描述
我目前正试图在定义的窗口(窗口的大小: 30,步骤: 1)上运行一个UDF (UDF函数名: mean_of_assets_in_win),以从代码assets_with_yields_df中调用的数据文件中运行一个列。
通常,这个UDF计算列中浮点数的平均值。当我在包含在文本文件中的800条记录的数据集上测试逻辑时,一切都很好。但是,将文本文件增加到500'000条记录会导致程序完成时出现错误。错误日志列于下:
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/E:/BigData/Spark/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
SUCCESS: The process with PID 12304 (child process of PID 32872) has been terminated.
SUCCESS: The process with PID 32872 (child process of PID 16792) has been terminated.
SUCCESS: The process with PID 16792 (child process of PID 26332) has been terminated.
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64278)
Traceback (most recent call last):
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1193, in send_command
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1033, in send_command
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1196, in send_command
py4j.protocol.Py4JNetworkError: Error while sending
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 977, in _get_connection
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1115, in start
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64278)
Traceback (most recent call last):
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 977, in _get_connection
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1115, in start
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64278)
Traceback (most recent call last):
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 977, in _get_connection
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1115, in start
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64278)
Traceback (most recent call last):
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 977, in _get_connection
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1115, in start
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
在我看来,这与我从这个长日志中提取的东西有关。
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64278)
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1193, in send_command
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
py4j.protocol.Py4JNetworkError: Error while sending
我检查了java_gateway.py中的代码,这是向JVM发送命令的函数。这里还提到
self.socket.sendall(command.encode("utf-8"))
只有当远程对大型有效载荷关闭或发送RST数据包(SO_LINGER)时才会失败。如果是这样的话,我不知道如何使远程打开大的有效载荷。你知道怎么做吗?
同样令我惊讶的是,在尝试再次运行这个程序之后,我得到了下面列出的不同错误。也许这是个提示什么是不对的..。
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/E:/BigData/Spark/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/07 23:15:30 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
java.net.SocketException: Connection reset by peer: socket write error
at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:465)
at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:285)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:295)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:295)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
22/06/07 23:15:30 ERROR PythonRunner: This may have been caused by a prior exception:
java.net.SocketException: Connection reset by peer: socket write error
at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:465)
at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:285)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:295)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:295)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
22/06/07 23:15:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.net.SocketException: Connection reset by peer: socket write error
at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:465)
at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:285)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:295)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:295)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
22/06/07 23:15:30 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
File "F:\Studia\02_Magisterskie\04_Semestr4\03_PSD\01_Projekt\Portfel Inwestycyjny\Portfolio-Monitor\src\so.py", line 47, in <module>
assets_with_yields_df = spark.createDataFrame(assets_with_yields_rdd).cache()
File "E:\BigData\Spark\python\pyspark\sql\session.py", line 605, in createDataFrame
return self._create_dataframe(data, schema, samplingRatio, verifySchema)
File "E:\BigData\Spark\python\pyspark\sql\session.py", line 628, in _create_dataframe
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "E:\BigData\Spark\python\pyspark\sql\session.py", line 425, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio, names=schema)
File "E:\BigData\Spark\python\pyspark\sql\session.py", line 396, in _inferSchema
first = rdd.first()
File "E:\BigData\Spark\python\pyspark\rdd.py", line 1464, in first
rs = self.take(1)
File "E:\BigData\Spark\python\pyspark\rdd.py", line 1446, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "E:\BigData\Spark\python\pyspark\context.py", line 1120, in runJob
sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1304, in __call__
File "E:\BigData\Spark\python\pyspark\sql\utils.py", line 128, in deco
return f(*a, **kw)
File "E:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, DESKTOP-5SER7G7, executor driver): java.net.SocketException: Connection reset by peer: socket write error
at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:465)
at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:285)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:295)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:295)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:154)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:465)
at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:285)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:295)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:295)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
F:\Studia\02_Magisterskie\04_Semestr4\03_PSD\01_Projekt\Portfel Inwestycyjny\Portfolio-Monitor>SUCCESS: The process with PID 22172 (child process of PID 25264) has been terminated.
SUCCESS: The process with PID 25264 (child process of PID 33120) has been terminated.
SUCCESS: The process with PID 33120 (child process of PID 26728) has been terminated.
我是怎么解决这些问题的?
为了解决这个问题,我尝试更改Spark会话配置:
通过更改驱动程序的memory
来实现
当前,已启用的选项在代码中可见。不幸的是,更改配置没有帮助,而且问题仍然存在。我也试图改变窗口的大小,但也没有帮助。
问题
你知道如何解决这些问题吗?如何使远程打开更大的有效载荷?
代码
import os
findspark.init('E:\BigData\Spark')
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import pandas_udf
import pandas as pd
from pyspark.sql.window import Window
import pyspark.sql.functions as F
DATA_FILE_PATHNAME = '\\'.join(os.path.dirname(__file__).split('\\')[:-1])+'\\'+'data'+'\\'+'yields_with_rounded_numbers_full_dataset.txt'
METRICS_FILE_PATHNAME = '\\'.join(os.path.dirname(__file__).split('\\')[:-1])+'\\'+'data'+'\\'+'metrics.txt'
ASSETS_NUM = 6
NUM_OF_RECORDS_PER_ASSET = 500000
NUM_OF_DATA_IN_FULL_PORTFOLIO = 6*NUM_OF_RECORDS_PER_ASSET
WINDOW_SIZE = 30
def mapper(line):
fields = line.split()
return Row(ID=int(fields[0]),asset_1 = float(fields[1]), asset_2 = float(fields[2]), asset_3 = float(fields[3]),asset_4 = float(fields[4]), asset_5 = float(fields[5]), asset_6 = float(fields[6]), grouper = 1)
if __name__ == "__main__":
# Spark config
spark = SparkSession.builder.master('local[*]')\
.config('spark.sql.execution.arrow.pyspark.enabled', True) \
.config('spark.sql.session.timeZone', 'UTC') \
.config('spark.driver.memory','30g') \
.config("spark.eventLog.enabled","true")\
.config('spark.ui.showConsoleProgress', True) \
.config("spark.memory.offHeap.enabled",True)\
.config("spark.memory.offHeap.size","10g") \
.config('spark.sql.repl.eagerEval.enabled', True).appName("SparkSQL").getOrCreate()
# .config("spark.dynamicAllocation.enabled", "true") \
# .config("spark.executor.cores", 4) \
# .config("spark.dynamicAllocation.minExecutors","1") \
# .config("spark.dynamicAllocation.maxExecutors","5") \
# .config("spark.executor.memory", "10g")\
#.config("spark.executor.instances", 4)\
#.config("spark.executor.memory", "1g")\
lines = spark.sparkContext.textFile(DATA_FILE_PATHNAME, minPartitions = 20)
# map values for columns
assets_with_yields_rdd = lines.map(mapper)
assets_with_yields_df = spark.createDataFrame(assets_with_yields_rdd).cache()
@pandas_udf("double")
def mean_of_assets_in_win(asset: pd.Series) -> float:
asset_mean = asset.mean()
return asset_mean
# WINDOWING
# sliding - window settings
sliding_window = Window.orderBy(F.col("ID")).rowsBetween(Window.currentRow, WINDOW_SIZE-1)
a1_mean_win = assets_with_yields_df.select(mean_of_assets_in_win('asset_1').over(sliding_window)).collect()
spark.stop()
发布于 2022-06-08 09:23:20
如果没有特定的理由只为计算平均值而使用自定义Pandas,请考虑使用standard提供的avg
函数。在我的例子中,这个简化的代码在1GB堆内存上执行需要5秒:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F
WINDOW_SIZE = 30
spark = SparkSession.builder.getOrCreate()
assets_with_yields_df = spark.range(500000) \
.withColumn('asset_1', F.col('id') * 10)
sliding_window = Window.orderBy(F.col("ID")).rowsBetween(Window.currentRow, WINDOW_SIZE-1)
a1_mean_win = assets_with_yields_df.select(F.avg('asset_1').over(sliding_window).alias('windowed_mean'))
a1_mean_win.write.parquet('results', mode='overwrite')
https://stackoverflow.com/questions/72537835
复制相似问题