首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Spring拆分器/聚合器处理异常

Spring拆分器/聚合器处理异常
EN

Stack Overflow用户
提问于 2014-07-31 13:46:48
回答 1查看 2.1K关注 0票数 1

版本: spring-integration-core - 2.2.3

以下是我的拆分器/聚合器设置的简化版本。

代码语言:javascript
复制
<task:executor id="taskExecutor" pool-size="${pool.size}"
               queue-capacity="${queue.capacity}" 
       rejection-policy="CALLER_RUNS" keep-alive="120"/>

<int:channel id="service-requests"/>
<int:channel id="service-request"/>
<int:channel id="channel-1">
    <int:dispatcher task-executor="taskExecutor" failover="false"/>
</int:channel>
<int:channel id="channel-2">
    <int:dispatcher task-executor="taskExecutor" failover="false"/>
</int:channel>


<int:gateway id="myServiceRequestor" default-reply-timeout="${reply.timeout}"
             default-reply-channel="service-aggregated-reply"
             default-request-channel="service-request"
             service-interface="com.blah.blah.MyServiceRequestor"/>

<int:splitter input-channel="service-request"
              ref="serviceSplitter" output-channel="service-requests"/>

<!-- To split the request and return a java.util.Collection of Type1 and Type2 -->
<bean id="serviceSplitter" class="com.blah.blah.ServiceSplitter"/>


<int:payload-type-router input-channel="service-requests" resolution-required="true">
    <int:mapping
            type="com.blah.blah.Type1"
            channel="channel-1"/>
    <int:mapping
            type="com.blah.blah.Type2"
            channel="channel-2"/>
</int:payload-type-router>

<!-- myService is a bean where processType1 & processType2 method is there to process the payload -->
<int:service-activator input-channel="channel-1"
                       method="processType1" output-channel="service-reply" requires-reply="true"
                       ref="myService"/>

<int:service-activator input-channel="channel-2"
                       method="processType2" output-channel="service-reply" requires-reply="true"
                       ref="myService"/>

<int:publish-subscribe-channel id="service-reply" task-executor="taskExecutor"/>

<!-- myServiceAggregator has a aggregate method which takes a Collection as argument(aggregated response from myService) -->
<int:aggregator input-channel="service-reply"
                method="aggregate" ref="myServiceAggregator"
                output-channel="service-aggregated-reply"
                send-partial-result-on-expiry="false"
                message-store="myResultMessageStore"
                expire-groups-upon-completion="true"/>

<bean id="myResultMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />

<bean id="myResultMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="myResultMessageStore" />
    <property name="timeout" value="2000" />
</bean>

<task:scheduled-tasks>
    <task:scheduled ref="myResultMessageStoreReaper" method="run" fixed-rate="10000" />
</task:scheduled-tasks>

如果mySevice中的processType2 1/processType2 2方法抛出一个RuntimeException,那么它将尝试将消息发送到一个错误通道(我相信spring默认会这样做),并且错误通道中的消息有效负载保持在堆中,而不被垃圾收集。

更新了更多信息:,以了解我对错误通道的评论。我对代码进行了调试,发现ErrorHandlingTaskExecutor试图使用一个MessagePublishingErrorHandler,它反过来将消息发送到由ErrorHandlingTaskExecutor方法返回的通道。

来自ErrorHandlingTaskExecutor.java的代码片段

代码语言:javascript
复制
public void execute(final Runnable task) {
    this.executor.execute(new Runnable() {
        public void run() {
            try {
                task.run();
            }
            catch (Throwable t) {
                errorHandler.handleError(t);   /// This is the part which sends the message in to error channel.
            }
        }
    });
}

来自MessagePublishingErrorHandler.java的代码片段

代码语言:javascript
复制
public final void handleError(Throwable t) {
    MessageChannel errorChannel = this.resolveErrorChannel(t);
    boolean sent = false;
    if (errorChannel != null) {
        try {
            if (this.sendTimeout >= 0) {
                sent = errorChannel.send(new ErrorMessage(t), this.sendTimeout);
.....

当我使用堆转储时,我总是看到对有效负载消息的引用(我相信它是在上面的通道中维护的),并且没有得到GC‘’ed。

想知道处理这种情况的正确方法是什么,或者我是否在配置中遗漏了什么?另外,在服务激活器方法引发任何异常的情况下,可以告诉spring丢弃有效负载(而不是将其发送到错误通道)吗?

期待你的投入。

谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2014-07-31 15:48:45

您的网关上没有定义一个error-channel,所以我们不会将它发送到那里,我们只会向调用者抛出一个异常。

但是,部分组位于聚合器中,永远不会完成。您需要配置一个 as shown in the reference manual (或者在SpringIntegration4.0.X中设置一个group-timeout )来丢弃部分组。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/25060523

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档