首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >分裂后平行富集

分裂后平行富集
EN

Stack Overflow用户
提问于 2016-07-22 10:07:14
回答 1查看 278关注 0票数 2

这是购物车样品的延续,我们有一个允许从购物车签出的外部API。综上所述,我们有一个流程,其中我们创建一个空的购物,添加一行项目,最后结帐。上述所有操作都是通过对外部服务的HTTP调用进行丰富的。我们希望同时添加行项(作为添加行项的一部分)调用。我们当前的配置如下所示:

代码语言:javascript
运行
复制
@Bean
public IntegrationFlow fullCheckoutFlow() {
    return f -> f.channel("inputChannel")
            .transform(fromJson(ShoppingCart.class))
            .enrich(e -> e.requestChannel(SHOPPING_CART_CHANNEL))
            .split(ShoppingCart.class, ShoppingCart::getLineItems)
            .enrich(e -> e.requestChannel(ADD_LINE_ITEM_CHANNEL))
            .aggregate(aggregator -> aggregator
                    .outputProcessor(g -> g.getMessages()
                            .stream()
                            .map(m -> (LineItem) m.getPayload())
                            .map(LineItem::getName)
                            .collect(joining(", "))))
            .enrich(e -> e.requestChannel(CHECKOUT_CHANNEL))
            .<String>handle((p, h) -> Message.called("We have " + p + " line items!!"));
}

    @Bean
    public IntegrationFlow addLineItem(Executor executor) {
        return f -> f.channel(MessageChannels.executor(ADD_LINE_ITEM_CHANNEL, executor).get())
                .handle(outboundGateway("http://localhost:8080/api/add-line-item", restTemplate())
                        .httpMethod(POST)
                        .expectedResponseType(String.class));
    }

    @Bean
    public Executor executor(Tracer tracer, TraceKeys traceKeys, SpanNamer spanNamer) {
        return new TraceableExecutorService(newFixedThreadPool(10), tracer, traceKeys, spanNamer);
    }

要并行地添加行项,我们使用的是执行器通道。然而,当在zipkin中看到它们时,它们似乎仍在按顺序处理:

我们做错什么了?整个项目的来源是在github上供参考。

谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-07-22 14:11:03

首先,Spring的主要特性是MessageChannel,但我仍然不清楚为什么人们在端点定义之间缺少.channel()操作符。

我是说,就你的情况而言,这一定是:

代码语言:javascript
运行
复制
.split(ShoppingCart.class, ShoppingCart::getLineItems)
.channel(c -> c.executor(executor()))
.enrich(e -> e.requestChannel(ADD_LINE_ITEM_CHANNEL))

关于你的特殊问题。

看,ContentEnricher (.enrich())是request-reply组件:http://docs.spring.io/spring-integration/reference/html/messaging-transformation-chapter.html#payload-enricher

因此,它向其requestChannel发送请求并等待回复。它是独立于requestChannel类型完成的。

我是原始Java,我们可以用下面的代码片段演示这样的行为:

代码语言:javascript
运行
复制
for (Object item: items) {
    Data data = sendAndReceive(item);
}

您应该看到,ADD_LINE_ITEM_CHANNEL作为一个ExecutorChannel没有多大价值,因为无论如何,我们在循环中都会被拦截。

.split()执行完全类似的循环,但是由于默认情况下它是使用DirectChannel的,所以迭代是在同一个线程中进行的。因此,每个下一项都等待上一项的答复。

这就是为什么您肯定应该将其作为.enrich()的输入,就在.split()之后。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38523705

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档