后备军,
我最近开始在春季启动程序,我偶然发现了一个问题,我想了解你的意见。
我想要达到的目标:
nonBlockingEndpoint的nonBlockingEndpoint端点。这个nonBlockingEndpoint执行一个相当长的操作,它的资源很重,可以在20到40秒之间运行。(在附加的代码中,它是由Thread.sleep()模拟的)nonBlockingEndpoint时,spring应用程序都应该注册该调用,并立即向调用方返回一个操作ID。queryOpStatus上查询此操作的状态。在开始时,它将被启动,一旦控制器服务于reuqest,它将是一个代码(如SERVICE_OK )。然后调用方知道他的请求已在服务器上成功完成。我找到的解决方案:
APIOperationsManager注册一个新的操作已经启动。CompletableFuture java构造将长时间运行的代码作为一个新的异步进程使用CompletableFuture.supplyAsync(() -> {}提供。cf.thenRun()通过更新操作状态以下是代码:
@GetMapping(path="/nonBlockingEndpoint")
public @ResponseBody ResponseOperation nonBlocking() {
// Register a new operation
APIOperationsManager apiOpsManager = APIOperationsManager.getInstance();
final int operationID = apiOpsManager.registerNewOperation(Constants.OpStatus.PROCESSING);
ResponseOperation response = new ResponseOperation();
response.setMessage("Triggered non-blocking call, use the operation id to check status");
response.setOperationID(operationID);
response.setOpRes(Constants.OpStatus.PROCESSING);
CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> {
try {
// Here we will
Thread.sleep(10000L);
} catch (InterruptedException e) {}
// whatever the return value was
return true;
});
cf.thenRun(() ->{
// We are done with the super long process, so update our Operations Manager
APIOperationsManager a = APIOperationsManager.getInstance();
boolean asyncSuccess = false;
try {asyncSuccess = cf.get();}
catch (Exception e) {}
if(true == asyncSuccess) {
a.updateOperationStatus(operationID, Constants.OpStatus.OK);
a.updateOperationMessage(operationID, "success: The long running process has finished and this is your result: SOME RESULT" );
}
else {
a.updateOperationStatus(operationID, Constants.OpStatus.INTERNAL_ERROR);
a.updateOperationMessage(operationID, "error: The long running process has failed.");
}
});
return response;
}以下也是用于完整性的APIOperationsManager.java:
public class APIOperationsManager {
private static APIOperationsManager instance = null;
private Vector<Operation> operations;
private int currentOperationId;
private static final Logger log = LoggerFactory.getLogger(Application.class);
protected APIOperationsManager() {}
public static APIOperationsManager getInstance() {
if(instance == null) {
synchronized(APIOperationsManager.class) {
if(instance == null) {
instance = new APIOperationsManager();
instance.operations = new Vector<Operation>();
instance.currentOperationId = 1;
}
}
}
return instance;
}
public synchronized int registerNewOperation(OpStatus status) {
cleanOperationsList();
currentOperationId = currentOperationId + 1;
Operation newOperation = new Operation(currentOperationId, status);
operations.add(newOperation);
log.info("Registered new Operation to watch: " + newOperation.toString());
return newOperation.getId();
}
public synchronized Operation getOperation(int id) {
for(Iterator<Operation> iterator = operations.iterator(); iterator.hasNext();) {
Operation op = iterator.next();
if(op.getId() == id) {
return op;
}
}
Operation notFound = new Operation(-1, OpStatus.INTERNAL_ERROR);
notFound.setCrated(null);
return notFound;
}
public synchronized void updateOperationStatus (int id, OpStatus newStatus) {
iteration : for(Iterator<Operation> iterator = operations.iterator(); iterator.hasNext();) {
Operation op = iterator.next();
if(op.getId() == id) {
op.setStatus(newStatus);
log.info("Updated Operation status: " + op.toString());
break iteration;
}
}
}
public synchronized void updateOperationMessage (int id, String message) {
iteration : for(Iterator<Operation> iterator = operations.iterator(); iterator.hasNext();) {
Operation op = iterator.next();
if(op.getId() == id) {
op.setMessage(message);
log.info("Updated Operation status: " + op.toString());
break iteration;
}
}
}
private synchronized void cleanOperationsList() {
Date now = new Date();
for(Iterator<Operation> iterator = operations.iterator(); iterator.hasNext();) {
Operation op = iterator.next();
if((now.getTime() - op.getCrated().getTime()) >= Constants.MIN_HOLD_DURATION_OPERATIONS ) {
log.info("Removed operation from watchlist: " + op.toString());
iterator.remove();
}
}
}
}我有的问题
我很乐意得到你的反馈。
太感谢你了,彼得·P
发布于 2017-08-12 18:27:39
使用一个请求提交一个长时间运行的任务是一种有效的模式,返回允许客户机稍后请求结果的id。
但有一些事情我建议重新考虑:
此外,您还可能希望更改一些低级别的实现问题:
希望这能有所帮助。
我需要使数据库访问是事务性的吗?
只要只编写/更新一行,就不需要进行事务处理,因为这确实是“原子的”。
如果您同时写入/更新多个行,则应将其设置为事务性以保证所有行都更新或不更新。
但是,如果两个操作(可能来自两个客户端)更新同一行,则最后一个操作总是会获胜的。
https://stackoverflow.com/questions/45641432
复制相似问题