首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在Spring Cloud Kafka Streams应用中执行flatTransform?

如何在Spring Cloud Kafka Streams应用中执行flatTransform?
EN

Stack Overflow用户
提问于 2020-06-04 16:09:01
回答 1查看 316关注 0票数 1

我正在尝试在Spring Cloud Kafka Streams应用程序中执行flatTransform。但是我不确定将KafkaStreamsStateStore注解放在哪里。目前我得到的错误是:Invalid topology: StateStore activeInstruments is not added yet。如果有人能给我一些指导,我将不胜感激。

代码语言:javascript
运行
复制
@SpringBootApplication
public class InstrumentsApp {

  public static void main(String[] args) {
    SpringApplication.run(InstrumentsApp.class, args);
  }

  public static class InstrumentsConsumer {

    @Bean
    public Serde<InstrumentsRes> instrumentsResSerde() {
      ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
      mapper.registerModule(new JavaTimeModule());
      mapper.configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
      return new JsonSerde<>(InstrumentsRes.class, mapper);
    }

    @Bean
    public Serde<Instrument> instrumentSerde() {
      ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
      mapper.registerModule(new JavaTimeModule());
      mapper.configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
      mapper.configure(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
      return new JsonSerde<>(Instrument.class, mapper);
    }

    @Bean
    @KafkaStreamsStateStore(name = "activeInstruments",
        type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
    public Consumer<KStream<String, InstrumentsRes>> process() {
      return instruments -> instruments.flatTransform(
          new TransformerSupplier<String, InstrumentsRes, Iterable<KeyValue<String, Instrument>>>() {
            public Transformer<String, InstrumentsRes, Iterable<KeyValue<String, Instrument>>> get() {
              return new Transformer<String, InstrumentsRes, Iterable<KeyValue<String, Instrument>>>() {

                private ProcessorContext context;
                private KeyValueStore<String, InstrumentsRes> state;

                @SuppressWarnings("unchecked")
                @Override
                public void init(ProcessorContext context) {
                  this.context = context;
                  this.state = (KeyValueStore<String, InstrumentsRes>) context
                      .getStateStore("activeInstruments");
                }

                @Override
                public Iterable<KeyValue<String, Instrument>> transform(String key,
                    InstrumentsRes value) {
                  List<KeyValue<String, Instrument>> result = new ArrayList<>();
                  for (Instrument instrument : value.result) {
                    result.add(KeyValue.pair(instrument.instrumentName, instrument));
                  }
                  InstrumentsRes prevValue = state.get(key);
                  if (prevValue != null) {
                    HashSet<String> prevInstrumentNames = value.getInstrumentNames();
                    HashSet<String> newInstrumentNames = value.getInstrumentNames();
                    prevInstrumentNames.removeAll(newInstrumentNames);
                    for (String instrumentName : prevInstrumentNames) {
                      result.add(KeyValue.pair(instrumentName, null));
                    }
                  }
                  state.put(key, value);
                  return result;
                }

                public void close() {
                }
              };
            }
          }, "activeInstruments");
    }

  }

}
EN

Stack Overflow用户

发布于 2020-06-05 00:12:26

为了回答我自己的问题,我相信KafkaStreamsStateStore注解现在已被弃用:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/676#issuecomment-502923103

现在,我已经创建了一个StoreBuilder类型的bean,正如注释中所建议的那样,一切都按预期工作。

票数 1
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62189787

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档