首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Spring Integration聚合器基于上次修改的发布策略

Spring Integration聚合器基于上次修改的发布策略
EN

Stack Overflow用户
提问于 2019-04-15 02:15:07
回答 2查看 352关注 0票数 0

我正在尝试实现以下场景:

  1. 我得到了一堆具有通用文件模式的文件,即doc0001_page0001、doc0001_page0002、doc0001_page0003、doc0002_page0001 (其中doc0001是我需要合并的由3页组成的文档,doc0002只有1页)
  2. 我想要聚合它们的方式是,仅当收集了特定文档的所有文件时才释放一组文件(拾取3个文件后的doc0001,1个文件后的doc0002 )

我的想法是按字母顺序读取文件,并在上次修改组以释放它后等待2秒(g.getLastModified()小于当前时间减去2秒)

我尝试过以下几种方法,但都没有成功:

return IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
                                  .patternFilter("*.json")
                                  .useWatchService(true)
                                  .watchEvents(FileReadingMessageSource.WatchEventType.CREATE,
                                          FileReadingMessageSource.WatchEventType.MODIFY),
        e -> e.poller(Pollers.fixedDelay(100)
                             .errorChannel("filePollingErrorChannel")))
                       .enrichHeaders(h -> h.headerExpression("CORRELATION_PATTERN", "headers[" + FileHeaders.FILENAME + "].substring(0,7)")) // docxxxx.length()
                       .aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
                                        .releaseStrategy(g -> g.getLastModified() < System.currentTimeMillis() - 2000))                       .channel(MessageChannels.queue("fileReadingResultChannel"))
                       .get();

将发布策略更改为以下内容也不起作用:

.aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
                .releaseStrategy(g -> {
                    Stream<Message<?>> stream = g.getMessages()
                                                 .stream();
                    Long timestamp = (Long) stream.skip(stream.count() - 1)
                                                  .findFirst()
                                                  .get()
                                                  .getHeaders()
                                                  .get(MessageHeaders.TIMESTAMP);
                    System.out.println("Timestamp: " + timestamp);
                    return timestamp.longValue() < System.currentTimeMillis() - 2000;

                }))

我是否误解了发布策略的概念?

另外,是否可以从releaseStrategy块中打印出一些内容?我想比较一下时间戳(参见System.out.println("Timestamp: " + timestamp);)

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-04-15 04:32:40

我用一种不同的方法找到了解决方案。我仍然不明白为什么上面的那个不能工作。

我还发现了一种定义相关函数的更简洁的方法。

IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
                                  .patternFilter("*.json")
                                  .useWatchService(true)
                                  .watchEvents(FileReadingMessageSource.WatchEventType.CREATE, FileReadingMessageSource.WatchEventType.MODIFY), e -> e
        .poller(Pollers.fixedDelay(100)))
                       .enrichHeaders(h -> h.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, m -> ((String) m
                               .getHeaders()
                               .get(FileHeaders.FILENAME)).substring(0, 17)))
                       .aggregate(a -> a.groupTimeout(2000)
                                        .sendPartialResultOnExpiry(true))
                       .channel(MessageChannels.queue("fileReadingResultChannel"))
                       .get();
票数 0
EN

Stack Overflow用户

发布于 2019-04-15 23:39:26

是的,因为您不知道消息组的整个序列,所以除了使用groupTimeout之外,您没有任何其他选择。仅当消息到达聚合器时,常规releaseStrategy才起作用。因为在一条消息中,你没有足够的信息来释放群,它将永远留在群商店中。

groupTimeout选项已经被引入到聚合器中,特别是对于这种用例,当我们确实想要释放一个没有足够消息来正常分组的组时。

您可以考虑使用groupTimeoutExpression而不是基于常量的groupTimeoutMessageGroup是SpEL的根评估上下文对象,因此您将能够访问提到的lastModified

在这里处理.sendPartialResultOnExpiry(true)是正确的选择。

在文档中查看更多信息:https://docs.spring.io/spring-integration/reference/html/#agg-and-group-to

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55678494

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档