我已经将@MessagingGateway配置为使用错误通道,其工作方式与预期一致。
@MessagingGateway(errorChannel = "DefaultInboundErrorHandlerChannel")
public interface InboundMessagingGateway {
@Gateway(requestChannel = "InboundEntryChannel")
void receive(XferRes response);
}
在流中,我将对象传递给一个转换器,如下所示:
第1步:
@Transformer(inputChannel = "InboundEntryChannel", outputChannel = "TransmissionLogChannel")
public CassandraEntity createEntity(
org.springframework.messaging.Message<XferRes> message) throws ParseException {
XferRes response = message.getPayload();
CassandraEntity entity = new CassandraEntity();
// ... getters & setter ommitted for brevity
return entity;
}
接下来,我按如下方式更新实体:步骤2:
@ServiceActivator(inputChannel = "TransmissionLogChannel", outputChannel="PublishChannel")
public XferRes updateCassandraEntity(
org.springframework.messaging.Message<XferRes> message) {
XferRes response = message.getPayload();
this.cassandraServiceImpl.update(response);
return response;
}
最后,我发布了一个Kafka主题,如下所示:Step 3:
@ServiceActivator(inputChannel = "PublishChannel")
public void publish(org.springframework.messaging.Message<XferRes> message){
XferRes response = message.getPayload();
publisher.post(response);
}
在发生错误的情况下,我将消息发布到一个服务,该服务发布错误对象以记录摄取:
@ServiceActivator(inputChannel="defaultInboundErrorHandlerChannel")
public void handleInvalidRequest(org.springframework.messaging.Message<MessageHandlingException> message) throws ParseException {
XferRes originalRequest = (XferRes) message.getPayload().getFailedMessage().getPayload();
this.postToErrorBoard(originalRequest)
}
如果在Step 2: in updating the DB时出现错误,那么我还想调用Step 3。一种简单的方法是删除步骤2 &从步骤1调用更新数据库。
在Spring Integration中有没有其他方法可以调用Step 3,而不管是否发生错误。
https://stackoverflow.com/questions/50610217
复制相似问题