我在Spring Integration流程中通过Java DSL配置了一个聚合器,我想抛出一个异常,当发生组超时时,该异常会转到全局错误通道。
丢弃通道对我没有好处,因为丢弃消息是在组成员级别,而不是整个组。
我尝试使用应用程序事件发布器,如下所示,但publisher对象似乎没有被调用:
.aggregate(new Consumer<AggregatorSpec>() {
@Override
public void accept(AggregatorSpec aggregatorSpec) {
try {
aggregatorSpec
.outputProcessor(groupPublishStrategy())
.correlationStrategy(groupPublishStrategy())
.releaseStrategy(groupPublishStrategy())
.groupTimeout(groupTimeout)
.get().getT2().setApplicationEventPublisher(myGroupExpirationPublisher());
}
catch (Exception e) {
e.printStackTrace();
}
}
})
有没有推荐的方法来获得这个用例的通知?你知道为什么上面的方法似乎不起作用吗?
我想我可以扩展AggregatorSpec
类来按照我想要的方式配置消息处理程序,但是我想看看是否可以使用常用的SI类来实现这一点。
发布于 2016-08-11 03:36:59
刚刚进行了测试,并且在bean初始化阶段正确地填充了ApplicationEventPublisher
。
expireGroup(Object correlationKey, MessageGroup group)
足够大,可以在这里演示它,但是您可以在GitHub上找到它的代码。因此,当我们达到这个方法时,MessageGroupExpiredEvent
总是被发布的。当然,如果discardMessage(message);
没有抛出异常的话。
只有在这种情况下,才能访问exprireGroup()
:
if (this.releaseStrategy.canRelease(groupNow)) {
completeGroup(correlationKey, groupNow);
}
else {
expireGroup(correlationKey, groupNow);
}
所以,请确保你的groupPublishStrategy()
有正确的逻辑,当你的群还没有完成时,不要返回true
。
好吧,如果你根据你的用例调试AbstractCorrelatingMessageHandler
会更好。如果您确定您的组在某些groupTimeout
期间没有完成,那么forceComplete(MessageGroup group)
是一个开始调试的好地方。
否则,当您认为必须发出事件时,请共享org.springframework.integration
类别的DEBUG
日志。
https://stackoverflow.com/questions/38880986
复制相似问题