我有以下Spark streaming示例:
val conf = new SparkConf().setAppName("Name").setMaster("local")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))
val directoryStream = ssc.textFileStream("""file:///C:/Users/something/something""")
directo
如果exe文件的时间戳在最新版本之前,我正在尝试运行批处理文件来更新一些软件。为此,我使用了一个众所周知的for循环。
当我这样做的时候:
set file=%AppData%\spark\spark.properties
for %%a in (%file%) do set olddate=%%~ta
echo %olddate%
返回时间戳。
当我像这样运行它时:
set spark_exe=%ProgramFiles%\Spark\Spark.exe
for %%a in (%spark_exe%) do set olddate=%%~ta
echo %olddate%
不返回时间戳。(
我正在使用spark从Kafka Stream接收数据,以接收有关正在发送定期健康更新的物联网设备的状态,以及有关设备中存在的各种传感器的状态。我的Spark应用程序侦听单个主题,使用Spark direct流接收来自Kafka流的更新消息。我需要根据每个设备的传感器状态触发不同的警报。然而,当我使用Kakfa添加更多向spark发送数据的IOT设备时,Spark并没有扩展,尽管添加了更多的机器和执行器的数量也增加了。下面我给出了我的Spark应用程序的拆卸版本,其中删除了通知触发部分,并出现了相同的性能问题。
// Method for update the Device state
我开始学习火花,并有一个困难的时间理解背后的合理性结构化流在星火。结构化流将到达的所有数据视为无界输入表,其中数据流中的每个新项都被视为表中的新行。下面的代码将在传入的文件中读取到csvFolder。
val spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
val csvSchema = new StructType().add("street", "string").add("city", "string")
.add("z
当使用mapGroupsWithState进行火花结构化流时,我非常努力地理解超时设置。
下面的链接有非常详细的规范,但我不确定我是否正确地理解了它,特别是GroupState.setTimeoutTimeStamp()选项。也就是说,当将状态到期设置为某种程度上与事件时间相关时。
我在这里复制了它们:
With EventTimeTimeout, the user also has to specify the the the event time watermark in the query using Dataset.withWatermark().
With this settin