我正在构建一个系统,它将通过Message (目前是JMS)从不同的系统接收消息。来自所有发件人系统的所有消息都有一个deviceId,并且在接收消息时没有顺序。例如,系统A可以用deviceId=1发送消息,系统b可以用deviceId=2发送消息。
我的目标是不开始处理有关同一个deviceId的消息,除非我从所有具有相同deviceId的发件人那里获得所有消息。
例如,如果我有3个系统A、B和C向我的系统发送消息:
System A sends messageA1 with deviceId=1
System B sends messageB1 with deviceId=1
System C sends messageC1 with deviceId=3
System C sends messageC2 with deviceId=1 <--- here I should start processing of messageA1, messageB1 and messageC2 because they are having the same deviceID 1.这个问题应该通过在我的系统中使用某种同步机制、消息代理还是像spring- integration /apache camel这样的集成框架来解决呢?
发布于 2017-09-18 19:34:29
使用聚合器( @Artem Bilan )的类似解决方案也可以用自定义AggregationStrategy在Camel中实现,并通过使用Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP属性来控制聚合器的完成。
以下可能是一个很好的起点。(您可以在这里找到带有测试的示例项目。)
路由:
from("direct:start")
    .log(LoggingLevel.INFO, "Received ${headers.system}${headers.deviceId}")
    .aggregate(header("deviceId"), new SignalAggregationStrategy(3))
    .log(LoggingLevel.INFO, "Signaled body: ${body}")
    .to("direct:result");SignalAggregationStrategy.java
public class SignalAggregationStrategy extends GroupedExchangeAggregationStrategy implements Predicate {
    private int numberOfSystems;
    public SignalAggregationStrategy(int numberOfSystems) {
        this.numberOfSystems = numberOfSystems;
    }
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        Exchange exchange = super.aggregate(oldExchange, newExchange);
        List<Exchange> aggregatedExchanges = exchange.getProperty("CamelGroupedExchange", List.class);
        // Complete aggregation if we have "numberOfSystems" (currently 3) different messages (where "system" headers are different)
        // https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#completing-current-group-decided-from-the-aggregationstrategy
        if (numberOfSystems == aggregatedExchanges.stream().map(e -> e.getIn().getHeader("system", String.class)).distinct().count()) {
            exchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
        }
        return exchange;
    }
    @Override
    public boolean matches(Exchange exchange) {
        // make it infinite (4th bullet point @ https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#about-completion)
        return false;
    }
}希望能帮上忙!
发布于 2017-09-16 14:53:55
您可以在Apache中使用缓存组件来实现这一点。我认为有EHCache组件。
实质上:
然后,您可以偏离路线,将每个传入的消息路由到特定的基于deviceId的队列中,以便进行临时存储。这可以是JMS、ActiveMQ或类似的东西。
发布于 2017-09-18 14:22:17
为这类任务提供了组件--在收集完整个组之前不要发出。它的名字叫集料器。您的deviceId绝对是一个correlationKey。releaseStrategy实际上可能是基于系统的数量--在进入下一步之前等待了多少deviceId1消息。
https://stackoverflow.com/questions/46254511
复制相似问题