首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >以线程方式处理Quarkus反应性AMQP客户端

以线程方式处理Quarkus反应性AMQP客户端
EN

Stack Overflow用户
提问于 2022-02-21 17:03:06
回答 1查看 211关注 0票数 0

我试图使用Quarkus的AMQP (reactive-messaging-amqp)扩展将工作与原始REST请求分离开来。这个想法是REST呼叫将启动一个长时间的跑步动作,并可能稍后回来,以获得结果。

但是,在我的代码中,Quarkus在同一个线程中运行每个步骤,在从原始sendNewLRA()调用返回之前完成工作。我会假设消息将通过AMQP发送,从而使消息发送后的流程脱钩。为什么不是这种情况?目前,我没有任何AMQP/消息传递特定配置,只允许默认的TestContainer运行(由Quarkus管理)。

REST处理程序:

代码语言:javascript
运行
复制
    @Inject
    LRAMessenger messenger;
    
    @LRA(end = false)
    @GET
    @Path("start")
    @Produces(MediaType.TEXT_PLAIN)
    public Response hello(
        @HeaderParam(LRA_HTTP_CONTEXT_HEADER) URI lraId,
        @QueryParam("processTime") int processTime,
        @QueryParam("payload") String payload
    ) {
        log.info("Start. LRA ID: {}", lraId);
        
        StartMessage start = new StartMessage();
        start.setLraId(lraId);
        start.setProcessTime(processTime);
        start.setPayload(payload);
        
        this.messenger.sendNewLRA(start); // blocks here
        log.info("Sent lra processing message.");
        
        return Response.ok(lraId).build();
    }

消息传递代码:

代码语言:javascript
运行
复制
@ApplicationScoped
@Slf4j
public class LRAMessenger {
    
    @Inject
    NarayanaLRAClient lraClient;
    
    @Inject
    @Channel("lra-out")
    Emitter<StartMessage> startEmitter;
    
    /**
     * Method to kick off backend processing.
     * @param startMessage The mesage to send
     */
    @Incoming("lra-start")
    public void sendNewLRA(StartMessage startMessage) {
        startEmitter.send(startMessage);
    }
    
    @Incoming("lra-out")
    public void processLRA(StartMessage startMessage) throws InterruptedException {
        log.info("Got lra message in process step: {}", startMessage);
        lraClient.setCurrentLRA(startMessage.getLraId());
        
        int waitTime = startMessage.getProcessTime() / 10;
        
        for (int percent = 10; percent <= 100; percent += 10) {
            log.info("Waiting to simulate processing...");
            Thread.sleep(waitTime);
            log.info("Done waiting ({}%)", percent);
        }
        log.info("Waiting to simulate processing completed.");
        lraClient.closeLRA(startMessage.getLraId());
        log.info("Closed LRA.");
    }
}

输出:

代码语言:javascript
运行
复制
2022-02-21 11:48:28,723 INFO  [org.acm.cus.dem.end.LRAResourceTest] (main) testing LRA.
2022-02-21 11:48:29,192 INFO  [org.acm.cus.dem.end.LRAResource] (executor-thread-0) Start. LRA ID: http://localhost:49251/lra-coordinator/0_ffffac110006_b651_6213c25d_2
2022-02-21 11:48:29,200 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Got lra message in process step: StartMessage(processTime=10000, payload=null)
2022-02-21 11:48:29,200 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:30,201 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (10%)
2022-02-21 11:48:30,202 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:31,203 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (20%)
2022-02-21 11:48:31,204 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:32,205 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (30%)
2022-02-21 11:48:32,206 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:33,207 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (40%)
2022-02-21 11:48:33,207 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:34,208 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (50%)
2022-02-21 11:48:34,209 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:35,210 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (60%)
2022-02-21 11:48:35,211 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:36,211 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (70%)
2022-02-21 11:48:36,212 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:37,212 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (80%)
2022-02-21 11:48:37,213 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:38,214 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (90%)
2022-02-21 11:48:38,215 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:39,216 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (100%)
2022-02-21 11:48:39,216 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing completed.
2022-02-21 11:48:39,235 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Closed LRA.
2022-02-21 11:48:39,237 INFO  [org.acm.cus.dem.end.LRAResource] (executor-thread-0) Sent lra processing message.

注意:我希望Sent lra processing message.日志在这个过程的早期出现,可能在Got lra message in process step日志消息之前。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-02-22 16:52:29

找到了答案,或者至少解决了.

@Blocking添加到消息传递链的第二步似乎使流程解耦:

代码语言:javascript
运行
复制
    @Incoming("lra-out")
    @Blocking
    public void processLRA(StartMessage startMessage) throws InterruptedException {
        log.info("Got lra message in process step: {}", startMessage);
        // ...
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71210235

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档