如果id列在20天内重复,我们需要插入最早的事件时间。20天可能有100-150亿行。我们不想使用dropDuplicates,因为状态可能很大。我们正在考虑使用Cassandra表来存储状态(比如目前为止的id和min time )。每次触发微批次时,我们使用微批次中的in查找存储状态的Cassandra表。20天的Ids也在100-150亿级别,换句话说,Cassandra的状态表在</
而且我不能设置从Kafka到Cassandra的流。当它是一个数据时,它是可以的,但当它是流时,它就不起作用了。有人能在我的脚本中帮助我了解一下foreachBatch吗?我只想有人向我展示在这个脚本上的foreachBatch在Pyspark的例子,如果它是可以尝试。文档中写道: # Transform and
我获取csv文件,读取这些文件并将它们写入Cassandra。我对大量数据(每天大约1000万行)执行此操作,文件本身非常小(从100到1000行)。我要做的是在将它们写入数据库之前检查我要插入的主键是否已经存在。我知道我可以用Select count(*) from table where primary key1 = something and key2 is something else做到。但这很慢,我想在一次检查整个文件,如果它将影响已在Cassandra中的数据,我希望(需要)它是快的。有没有