首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >非阻塞终结点:向调用方返回一个操作ID --想了解您对我的实现的看法吗?

非阻塞终结点:向调用方返回一个操作ID --想了解您对我的实现的看法吗?
EN

Stack Overflow用户
提问于 2017-08-11 18:11:44
回答 1查看 186关注 0票数 0

后备军,

我最近开始在春季启动程序,我偶然发现了一个问题,我想了解你的意见。

我想要达到的目标:

  • 我创建了一个Controller,它公开了一个名为nonBlockingEndpointnonBlockingEndpoint端点。这个nonBlockingEndpoint执行一个相当长的操作,它的资源很重,可以在20到40秒之间运行。(在附加的代码中,它是由Thread.sleep()模拟的)
  • 每当调用nonBlockingEndpoint时,spring应用程序都应该注册该调用,并立即向调用方返回一个操作ID。
  • 然后,调用方可以使用此ID在另一个端点queryOpStatus上查询此操作的状态。在开始时,它将被启动,一旦控制器服务于reuqest,它将是一个代码(如SERVICE_OK )。然后调用方知道他的请求已在服务器上成功完成。

我找到的解决方案:

  • 我有以下控制器(请注意,它是明确的,而不是带有@异步标记的)
  • 它使用一个APIOperationsManager注册一个新的操作已经启动。
  • 我使用CompletableFuture java构造将长时间运行的代码作为一个新的异步进程使用CompletableFuture.supplyAsync(() -> {}提供。
  • 我不假思索地给打电话的人回了个电话,告诉他手术正在进行中。
  • 异步任务完成后,我使用cf.thenRun()通过更新操作状态

以下是代码:

代码语言:javascript
运行
复制
    @GetMapping(path="/nonBlockingEndpoint")
public @ResponseBody ResponseOperation nonBlocking() {

    // Register a new operation
    APIOperationsManager apiOpsManager = APIOperationsManager.getInstance();
    final int operationID = apiOpsManager.registerNewOperation(Constants.OpStatus.PROCESSING);
    ResponseOperation response = new ResponseOperation();

    response.setMessage("Triggered non-blocking call, use the operation id to check status");
    response.setOperationID(operationID);
    response.setOpRes(Constants.OpStatus.PROCESSING);

    CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> {

        try {
            // Here we will 
            Thread.sleep(10000L);
        } catch (InterruptedException e) {}

        // whatever the return value was
        return true;
    });
    cf.thenRun(() ->{ 
        // We are done with the super long process, so update our Operations Manager
        APIOperationsManager a = APIOperationsManager.getInstance();
        boolean asyncSuccess = false;

        try {asyncSuccess = cf.get();} 
        catch (Exception e) {}

        if(true == asyncSuccess) {
            a.updateOperationStatus(operationID, Constants.OpStatus.OK);
            a.updateOperationMessage(operationID, "success: The long running process has finished and this is your result: SOME RESULT" );
        } 
        else {
            a.updateOperationStatus(operationID, Constants.OpStatus.INTERNAL_ERROR);
            a.updateOperationMessage(operationID, "error: The long running process has failed."); 
        }
    });

    return response;
}

以下也是用于完整性的APIOperationsManager.java:

代码语言:javascript
运行
复制
public class APIOperationsManager {

    private static APIOperationsManager instance = null;

    private Vector<Operation> operations;
    private int currentOperationId;

    private static final Logger log = LoggerFactory.getLogger(Application.class);

    protected APIOperationsManager() {}

    public static APIOperationsManager getInstance() {
        if(instance == null) {
            synchronized(APIOperationsManager.class) {
                if(instance == null) {
                    instance = new APIOperationsManager();
                    instance.operations = new Vector<Operation>();
                    instance.currentOperationId = 1;
                }
            }
        }
        return instance;
    }


    public synchronized int registerNewOperation(OpStatus status) {
        cleanOperationsList();

        currentOperationId = currentOperationId + 1;
        Operation newOperation = new Operation(currentOperationId, status);
        operations.add(newOperation);
        log.info("Registered new Operation to watch: " + newOperation.toString());
        return newOperation.getId();
    }

    public synchronized Operation getOperation(int id) {

        for(Iterator<Operation> iterator = operations.iterator(); iterator.hasNext();) {
            Operation op = iterator.next();
            if(op.getId() == id) {
                return op;
            }
        }

        Operation notFound = new Operation(-1, OpStatus.INTERNAL_ERROR);
        notFound.setCrated(null);

        return notFound;
    }

    public synchronized void updateOperationStatus (int id, OpStatus newStatus) {

        iteration : for(Iterator<Operation> iterator = operations.iterator(); iterator.hasNext();) {
            Operation op = iterator.next();
            if(op.getId() == id) {
                op.setStatus(newStatus);
                log.info("Updated Operation status: " + op.toString());
                break iteration;
            }
        }
    }

    public synchronized void updateOperationMessage (int id, String message) {

        iteration : for(Iterator<Operation> iterator = operations.iterator(); iterator.hasNext();) {
            Operation op = iterator.next();
            if(op.getId() == id) {
                op.setMessage(message);
                log.info("Updated Operation status: " + op.toString());
                break iteration;
            }
        }
    }

    private synchronized void cleanOperationsList() {
        Date now = new Date();

        for(Iterator<Operation> iterator = operations.iterator(); iterator.hasNext();) {
            Operation op = iterator.next();
            if((now.getTime() - op.getCrated().getTime()) >= Constants.MIN_HOLD_DURATION_OPERATIONS ) {
                log.info("Removed operation from watchlist: " + op.toString());
                iterator.remove();
            }
        }
    }
}

我有的问题

  • 这个概念是否也是一个有效的概念?还有什么可以改进的?
  • 我会遇到并发问题/种族条件吗?
  • 是否有更好的方法来实现同样的启动弹簧,但我只是还没有找到呢?(可能使用@异步指令?)

我很乐意得到你的反馈。

太感谢你了,彼得·P

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-08-12 18:27:39

使用一个请求提交一个长时间运行的任务是一种有效的模式,返回允许客户机稍后请求结果的id。

但有一些事情我建议重新考虑:

  • 不要将Integer用作id,因为它允许攻击者猜测id并获得这些id的结果。相反,使用一个随机UUID。
  • 如果需要重新启动应用程序,所有ids及其结果都将丢失。您应该将它们保存到数据库中。
  • 您的解决方案将无法在集群中使用您的应用程序的许多实例,因为每个实例只知道它自己的in和结果。这也可以通过将它们持久化到数据库或Reddis存储来解决。
  • 使用CompletableFuture的方式使您无法控制用于异步操作的线程数量。可以用标准Java实现这一点,但我建议使用Spring配置线程池
  • 用@异步注释控制器方法不是一个选项,这是行不通的。相反,将所有异步操作放入一个简单的服务中,并使用@异步对其进行注释。这有一些优点:
    • 您也可以同步使用此服务,这使得测试更加容易。
    • 可以用Spring配置线程池

  • /nonBlockingEndpoint不应该返回id,而应该返回到queryOpStatus的完整链接,包括id。客户端可以直接使用此链接而不需要任何其他信息。

此外,您还可能希望更改一些低级别的实现问题:

  • 不要使用向量,它在每个操作上都同步。用一个列表代替。迭代一个列表也要容易得多,您可以使用-循环或流。
  • 如果您需要查找一个值,不要在Vector列表上迭代,而是使用Map。
  • APIOperationsManager是一个单身人士。在Spring应用程序中,这是没有意义的。让它成为一个普通的PoJo,并创建一个bean,让它自动进入控制器。缺省情况下,Spring bean是单子。
  • 您应该避免在控制器方法中执行复杂的操作。相反,可以将任何内容移动到服务中(可以用@异步进行注释)。这使得测试更容易,因为您可以在没有web上下文的情况下测试此服务。

希望这能有所帮助。

我需要使数据库访问是事务性的吗?

只要只编写/更新一行,就不需要进行事务处理,因为这确实是“原子的”。

如果您同时写入/更新多个行,则应将其设置为事务性以保证所有行都更新或不更新。

但是,如果两个操作(可能来自两个客户端)更新同一行,则最后一个操作总是会获胜的。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45641432

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档