首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >网关未设置replyChannel报头

网关未设置replyChannel报头
EN

Stack Overflow用户
提问于 2018-06-07 01:34:48
回答 1查看 109关注 0票数 1

我目前正在做一个用Spring Integration 4.3.14构建的项目,我们决定尝试使用DSL,但我在尝试集成不同的子流时遇到了麻烦。

我定义了以下IntegrationFlow:

代码语言:javascript
复制
@Bean
public IntegrationFlow mainFlow() {
    return IntegrationFlows
            .from(
                    databaseSource(),
                    c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
            .split()
            .log()
            .gateway(f -> f
                            .transform(Transformer::transform)
                            .transform(AnotherTransformer::transform),
                    e -> e
                            .errorChannel("transformErrorChannel"))
            .gateway(f -> f
                            .<MyEntity>handle((p, h) -> this.doSomething(p))
                            .<MyEntity>handle((p, h) -> this.doOtherThing(p)),
                    e -> e
                            .errorChannel("doErrorChannel"))
            .channel("nullChannel")
            .get();
}

所有调用的transformhandle方法都是非空的,并返回非空值。我们采用这种方法的主要原因是有两个不同的通道来处理错误,这取决于它们发生的那部分流,所以我们可以相应地采取行动。

然而,当我尝试运行这段代码时,我在DB上插入了一条记录,轮询器拿起了它,它永远不会超出第一个网关。我只有这几行日志:

代码语言:javascript
复制
2018-06-06 11:43:58.848  INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean    : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.848  INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@1863292e
2018-06-06 11:43:58.864  INFO 6492 --- [ask-scheduler-1] c.e.transformation.Transformer           : Performing transformation.
2018-06-06 11:43:58.864  INFO 6492 --- [ask-scheduler-1] c.e.transformation.AnotherTransformer    : Performing another transformation. 
2018-06-06 11:43:58.848  INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean    : started org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.944  INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean    : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f
2018-06-06 11:43:58.944  INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@433a796
2018-06-06 11:43:58.944  INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean    : started org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f

似乎很明显,消息确实到达了第一个网关,但显然它没有被传递到第二个网关。

在启动过程中,我看到SI创建了两个subFlows (#0和#1),并为每个通道创建了两个通道(我猜每个操作一个通道),每个通道都有一个订阅者。

我还尝试将定义更改为以下内容:

代码语言:javascript
复制
    @Bean
public IntegrationFlow getRecords() {
    return IntegrationFlows
            .from(
                    databaseSource(),
                    c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
            .split()
            .log()
            .gateway(f -> f
                            .transform(Transformer::transform)
                            .transform(AnotherTransformer::transform),
                    e -> e
                            .errorChannel("transformErrorChannel")
                            .replyChannel("doThingsChannel"))
            .get();
}

@Bean
public IntegrationFlow doThings() {
    return IntegrationFlows
            .from(
                    "doThingsChannel")
            .gateway(f -> f
                            .<MyEntity>handle((p, h) -> this.doSomehting(p))
                            .<MyEntity>handle((p, h) -> this.doOtherThing(p)),
                    e -> e
                            .errorChannel("doErrorChannel"))
            .get();
}

但最终遇到了同样的问题,要么在GatewayEndpointSpec上设置replyChannel,要么在网关之后向getRecords流添加显式.channel

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-06-07 03:59:03

我刚刚在Spring Integration Java DSL项目中完成了这个测试用例:

代码语言:javascript
复制
@Test
public void testGateways() {
    IntegrationFlow flow = f -> f
            .gateway(sf -> sf
                    .transform(p -> "foo#" + p)
                    .transform(p -> "bar#" + p))
            .gateway(sf -> sf
                    .handle((p, h) -> "handle1:" + p)
                    .handle((p, h) -> "handle2:" + p))
            .handle(System.out::println);

    IntegrationFlowRegistration flowRegistration = this.integrationFlowContext.registration(flow).register();

    flowRegistration.getInputChannel()
            .send(new GenericMessage<>("test"));

    flowRegistration.destroy();
}

我的输出如下:

代码语言:javascript
复制
GenericMessage [payload=handle2:handle1:bar#foo#test, headers={id=ae09df5c-f63e-4b68-d73c-29b85f3689a8, timestamp=1528314852110}]

因此,两个网关都按预期工作,并且应用了所有转换器和处理程序。另外,最后一个网关的结果被轮询到最后一个System.out步骤的主流。

不确定在您的情况下发生了什么:只有一种想法,即您的.transform(AnotherTransformer::transform)不返回值或发生任何其他事情。

关于replyChannel选项。它不是发送网关结果的地方。这是等待回复返回的地方:

代码语言:javascript
复制
/**
 * Specify the channel from which reply messages will be received; overrides the
 * encompassing gateway's default reply channel.
 * @return the channel name.
 */
String replyChannel() default "";
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50726353

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档