当我迭代地将500多个列添加到我的pyspark中时,我遇到了堆栈溢出错误。所以我包括了检查点。检查站帮不上忙。因此,我创建了下面的玩具应用程序来测试我的检查点是否正常工作。在这个例子中,我所做的就是一次又一次地复制原始列来迭代地创建列。我坚持,检查点和计数每10个迭代。我注意到我的dataframe.rdd.isCheckpointed()总是返回False。我可以验证检查点文件夹确实是在磁盘上创建和填充的。我在用哥库德的dataproc
这是我的代码:
from pyspark import SparkContext, SparkConf
from pyspark import Stora
我使用星火集群在本地模式下运行PySpark,并且我试图编写一个流DataFrame到一个Kafka主题。
当我运行查询时,会收到以下消息:
java.lang.IllegalStateException: Set(topicname-0) are gone. Some data may have been missed..
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may
在使用apache-时,我试图对一些流数据应用"reduceByKeyAndWindow()“转换,并得到以下错误:
pyspark.sql.utils.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
是否需要设置检查点目录?
如果是的话,最简单的设置方法是什么?
您能告诉我如何根据IBM的数据科学经验为PySpark会话设置检查点目录吗?
之所以需要这样做,是因为我必须从GraphFrames运行connectedComponents(),它会引发以下错误
Py4JJavaError: An error occurred while calling o221.run.
: java.io.IOException: Checkpoint directory is not set. Please set it first using sc.setCheckpointDir().
有人能帮我解释一下德尔塔罗格检查站是怎么工作的吗?我面临一个问题,即我试图在固定的时间间隔内创建常规检查点,但如果自上次创建检查点以来对delta表没有任何更改,我将得到以下错误:
An error was encountered:
java.lang.IllegalStateException: State of the checkpoint doesn't match that of the snapshot.
at org.apache.spark.sql.delta.Checkpoints$.writeCheckpoint(Checkpoints.scala:328)
我们的HSQLDB数据库有一个FK约束,从PAYMENTS表到USERS表。我们在这里做的错误是创建一个约束,而不给它指定一个特定的名称。这将导致HSQLDB为您生成一个名称,例如SYS_FK_10985。
我所做的就是为Liquibase编写一个自定义更改集,它将找到索引的名称并删除它。脚本所做的非常简单:
SELECT constraint_name FROM INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE WHERE TABLE_NAME= 'PAYMENTS' AND COLUMN_NAME= 'USER_ID';