首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用mapGroupsWithState的有状态处理-多次输出相同的键

使用mapGroupsWithState的有状态处理-多次输出相同的键
EN

Stack Overflow用户
提问于 2020-11-02 23:58:58
回答 1查看 198关注 0票数 0

我在我的用例中使用了Spark Structured Streaming。我的用例需要有状态的处理。我的用例如下。我正在使用mapGroupswithStae,但是当同一批客户出现两次时,问题就出现了,它随机选择一个客户Id,而忽略另一个客户Id。

例如:

客户Id,Ts 123,09/03/2020T15:20:28.989Z-05:00 123,09/03/2020T15:26:48.989Z-05:00

预期运维客户Id,Flg,Ts 123,I,09/03/2020T15:20:28.989Z-05:00 123,U,09/03/2020T15:26:48.989Z-05:00

实际运维客户Id,Flg,Ts 123,I,09/03/2020T15:26:28.989Z-05:00

如果客户Id是新的,则将客户Id标记为创建,如果客户Id存在,则执行转换并写入主题,然后将客户Id标记为更新,执行转换并写入主题

代码语言:javascript
运行
复制
case class CustIdStatus (var CustId: String, var existsFlag: String, var Ts: String)
var df:Dataset[String] = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootStrapServer).option("subscribe", Topic).option("startingOffsets", autooffset).load().selectExpr("CAST(value AS STRING) as value").as[String]
df.map(processsData)
.groupByKey(_.CustId.toString())
.mapGroupsWithState(GroupStateTimeout.NoTimeout()) (
isCustExists)
.writeStream
.format("console")
.option("truncate", false)
.option("checkpointLocation", "/tmp/output/checkpoint")
.outputMode("update")

def isCustExists (CustId:String, inputs:Iterator[InptObj], state:GroupState[CustIdStatus]):CustIdStatus = 
{
var newval:CustIdStatus = if (state.exists) CustIdStatus(state.get.CustId,"Y",state.get.Ts) else PtyIdStatus(CustId,"N","")
state.update(newval) 
newval
}
EN

回答 1

Stack Overflow用户

发布于 2020-11-03 04:24:13

您需要用更灵活的flatMapGroupsWithState来交换mapGroupWithState

flatMapGroupsWithStatemapGroupWithState的泛化,克服了您面临的一些限制:

  • 返回类型是迭代,而不是单个对象输出模式,它采用另一个参数,称为运算符输出模式,该参数定义输出记录是可以追加的新记录(OutputMode.Append)还是更新的键/值记录(OutputMode.Update).
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64648973

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档