我在我的用例中使用了Spark Structured Streaming。我的用例需要有状态的处理。我的用例如下。我正在使用mapGroupswithStae,但是当同一批客户出现两次时,问题就出现了,它随机选择一个客户Id,而忽略另一个客户Id。
例如:
客户Id,Ts 123,09/03/2020T15:20:28.989Z-05:00 123,09/03/2020T15:26:48.989Z-05:00
预期运维客户Id,Flg,Ts 123,I,09/03/2020T15:20:28.989Z-05:00 123,U,09/03/2020T15:26:48.989Z-05:00
实际运维客户Id,Flg,Ts 123,I,09/03/2020T15:26:28.989Z-05:00
如果客户Id是新的,则将客户Id标记为创建,如果客户Id存在,则执行转换并写入主题,然后将客户Id标记为更新,执行转换并写入主题
case class CustIdStatus (var CustId: String, var existsFlag: String, var Ts: String)
var df:Dataset[String] = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootStrapServer).option("subscribe", Topic).option("startingOffsets", autooffset).load().selectExpr("CAST(value AS STRING) as value").as[String]
df.map(processsData)
.groupByKey(_.CustId.toString())
.mapGroupsWithState(GroupStateTimeout.NoTimeout()) (
isCustExists)
.writeStream
.format("console")
.option("truncate", false)
.option("checkpointLocation", "/tmp/output/checkpoint")
.outputMode("update")
def isCustExists (CustId:String, inputs:Iterator[InptObj], state:GroupState[CustIdStatus]):CustIdStatus =
{
var newval:CustIdStatus = if (state.exists) CustIdStatus(state.get.CustId,"Y",state.get.Ts) else PtyIdStatus(CustId,"N","")
state.update(newval)
newval
}
发布于 2020-11-03 04:24:13
您需要用更灵活的flatMapGroupsWithState
来交换mapGroupWithState
。
flatMapGroupsWithState
是mapGroupWithState
的泛化,克服了您面临的一些限制:
OutputMode.Append
)还是更新的键/值记录(OutputMode.Update
).https://stackoverflow.com/questions/64648973
复制相似问题