从星火结构的流文档中,包含以下内容
不支持流数据集上的不同操作。
但是,API中有一个distinct()方法,我也可以在流DateSet之后调用distinct()。
public final class JavaStructuredNetworkWordDistinct {
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir","C://hadoop" );
SparkSession spa
我正在编写一个库,将Apache与自定义环境集成起来。我正在实现自定义流源和流编写器。
我正在开发的一些源代码是不可恢复的,至少在应用程序崩溃之后是如此。如果应用程序重新启动,则需要重新加载所有数据。因此,我们希望避免用户不得不显式设置'checkpointLocation‘选项。但是,如果没有提供该选项,我们将看到以下错误:
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ..
为了学习的目的,我试图在检查点上做一些实验/测试。
但是我的选择有限,我只能看到内部的运作。我正试着从套接字上读。
val lines: DataFrame = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 12345)
.load()
并使用它执行一些需要检查点的状态操作。
Q1。使用检查点位置作为我的本地系统时,它无法读取检查点,并且会出现错误。
This query does not suppor
我使用的是Java。
我正在接收Kafka消息上的文件路径。我需要将这个文件加载到spark RDD中,对其进行处理,然后将其转储到HDFS。
我能够从Kafka消息中检索到文件路径。我希望在此文件上创建一个dataset / RDD。
我不能在Kafka消息数据集上运行map函数。由于sparkContext在worker上不可用,因此出现NPE错误。
我不能在Kafka消息数据集上运行foreach。它会出错,并显示以下消息:
Queries with streaming sources must be executed with writeStream.start();"
我不