首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >集成模式:如何同步从多个系统接收的处理消息

集成模式:如何同步从多个系统接收的处理消息
EN

Stack Overflow用户
提问于 2017-09-16 13:39:00
回答 3查看 224关注 0票数 1

我正在构建一个系统,它将通过Message (目前是JMS)从不同的系统接收消息。来自所有发件人系统的所有消息都有一个deviceId,并且在接收消息时没有顺序。例如,系统A可以用deviceId=1发送消息,系统b可以用deviceId=2发送消息。

我的目标是不开始处理有关同一个deviceId的消息,除非我从所有具有相同deviceId的发件人那里获得所有消息。

例如,如果我有3个系统A、B和C向我的系统发送消息:

代码语言:javascript
运行
复制
System A sends messageA1 with deviceId=1
System B sends messageB1 with deviceId=1
System C sends messageC1 with deviceId=3
System C sends messageC2 with deviceId=1 <--- here I should start processing of messageA1, messageB1 and messageC2 because they are having the same deviceID 1.

这个问题应该通过在我的系统中使用某种同步机制、消息代理还是像spring- integration /apache camel这样的集成框架来解决呢?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2017-09-18 19:34:29

使用聚合器( @Artem Bilan )的类似解决方案也可以用自定义AggregationStrategy在Camel中实现,并通过使用Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP属性来控制聚合器的完成。

以下可能是一个很好的起点。(您可以在这里找到带有测试的示例项目。)

路由:

代码语言:javascript
运行
复制
from("direct:start")
    .log(LoggingLevel.INFO, "Received ${headers.system}${headers.deviceId}")
    .aggregate(header("deviceId"), new SignalAggregationStrategy(3))
    .log(LoggingLevel.INFO, "Signaled body: ${body}")
    .to("direct:result");

SignalAggregationStrategy.java

代码语言:javascript
运行
复制
public class SignalAggregationStrategy extends GroupedExchangeAggregationStrategy implements Predicate {

    private int numberOfSystems;

    public SignalAggregationStrategy(int numberOfSystems) {
        this.numberOfSystems = numberOfSystems;
    }

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        Exchange exchange = super.aggregate(oldExchange, newExchange);

        List<Exchange> aggregatedExchanges = exchange.getProperty("CamelGroupedExchange", List.class);

        // Complete aggregation if we have "numberOfSystems" (currently 3) different messages (where "system" headers are different)
        // https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#completing-current-group-decided-from-the-aggregationstrategy
        if (numberOfSystems == aggregatedExchanges.stream().map(e -> e.getIn().getHeader("system", String.class)).distinct().count()) {
            exchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
        }

        return exchange;
    }

    @Override
    public boolean matches(Exchange exchange) {
        // make it infinite (4th bullet point @ https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#about-completion)
        return false;
    }
}

希望能帮上忙!

票数 2
EN

Stack Overflow用户

发布于 2017-09-16 14:53:55

您可以在Apache中使用缓存组件来实现这一点。我认为有EHCache组件。

实质上:

  1. 您将收到一条带有给定deviceId的消息,例如deviceId1。
  2. 您可以在缓存中查找是否收到了deviceId1的哪些消息。
  3. 只要您还没有收到所有这三个,就可以将当前的系统/消息添加到缓存中。
  4. 一旦所有消息都在那里,您将处理并清除缓存。

然后,您可以偏离路线,将每个传入的消息路由到特定的基于deviceId的队列中,以便进行临时存储。这可以是JMS、ActiveMQ或类似的东西。

票数 1
EN

Stack Overflow用户

发布于 2017-09-18 14:22:17

为这类任务提供了组件--在收集完整个组之前不要发出。它的名字叫集料器。您的deviceId绝对是一个correlationKeyreleaseStrategy实际上可能是基于系统的数量--在进入下一步之前等待了多少deviceId1消息。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46254511

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档