我有一个集成流程,它需要基于某种条件运行一个转换器或另一个转换器,然后在出站网关上发布一个http请求。
@Bean
public IntegrationFlow messageFromKafka() {
return flow -> flow
.publishSubscribeChannel(s -> s
.subscribe(f1 -> f1
.<AttachmentEvent>filter(validator::isCondition1)
.transform(transformer1)
)
.subscribe(fl -> fl
.<AttachmentEvent>filter(validator::isCondition2)
.transform(transformer2)
.split()
)
)
.publishSubscribeChannel(s -> s
.subscribe(fl1 -> fl1
.transform(httpTransformer)
.<String, String>route(transformedMessage -> getFlowType(transformedMessage), mapping -> mapping
.subFlowMapping("operation1", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test"))
)
.subFlowMapping("operation2", sf -> sf
.<String>filter(message -> isVendorStatusDescNotCancelled(message))
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test2"))
)
.subFlowMapping("operation3", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test3"))
)
)
)
.subscribe(fl2 -> fl2
.handle(getKafkaHandler())
)
);
} 这是我的尝试,但我得到了这个错误信息“没有输出通道或replyChannel头可用”,我认为我理解为什么,但不确定如何实现我所需要的。
谢谢。
发布于 2020-08-24 20:06:32
在集成中,使用router模式:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/message-routing.html#messaging-routing-chapter处理条件流
尽管看起来您的问题与条件解决完全无关。
我认为您的每个handle(getOAuth2Handler(...))都返回一些在这些子流中不作为答复处理的值。如果您对这个答复不感兴趣,可以考虑在nullChannel之后为那些子流配置一个handle()。
https://stackoverflow.com/questions/63567762
复制相似问题