我能想象的最好的方法是使用能够访问状态存储的转换器,但是在聚合状态之前就这样做,这样我就可以在状态更新之前看到状态的值。(在可能的情况下更新了额外的状态后,所讨论的转换器只会返回原始消息。)(InternalTopologyBuilder.java:615) at org.apache.kafka.streams.kstream.internals.KStreamImpl.transformValues(KStreamImpl.java:546) at org.apache.kafka.streams.k
例如:
at org.apache.kafka.streams.kstream.internals.KTableFilter.access$300(KTableFilter.java(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.T