我想要执行一个流,在这个流中,来自流的输出在相同的操作中被用作同一流的源。
目前,我使用队列执行这类操作;我删除一个项,处理它,并将任何需要进一步处理的结果添加回队列。以下是这类事情的两个例子:
Queue<WorkItem> workQueue = new Queue<>(workToDo);
while(!workQueue.isEmpty()){
WorkItem item = workQueue.remove();
item.doOneWorkUnit();
if(!item.isDone()) workQueue.add(item);
}
Queue<Node> nodes = new Queue<>(rootNodes);
while(!nodesLeft.isEmpty()){
Node node = nodes.remove();
process(node);
nodes.addAll(node.children());
}
我可以想象,第一项工作可以同时进行,例如:
try {
LinkedBlockingQueue<WorkItem> workQueue = new LinkedBlockingQueue<>();
Stream<WorkItem> reprocess = Stream.generate(() -> workQueue.remove()).parallel();
Stream.concat(workToDo.parallelstream(), reprocess)
.filter(item -> {item.doOneWorkUnit(); return !item.isDone();})
.collect(Collectors.toCollection(() -> workQueue));
} catch (NoSuchElementException e){}
第二项是:
try {
LinkedBlockingQueue<Node> reprocessQueue = new LinkedBlockingQueue<>();
Stream<WorkItem> reprocess = Stream.generate(() -> nodes.remove()).parallel();
Stream.concat(rootNodes.parallelStream(), reprocess)
.filter(item -> {process(item); return true;})
.flatMap(node -> node.children().parallelStream())
.collect(Collectors.toCollection(() -> reprocessQueue));
} catch (NoSuchElementException e){}
然而,这些感觉就像一个乱七八糟的解决方案,我不喜欢使用异常。有人有更好的方法来做这种事吗?
发布于 2014-02-11 20:00:02
为了使工作并行,我将使用标准的java.util.concurrent.Executor
。若要将任务返回到工作队列,请在每个任务的代码末尾添加executor.execute(this)
。
https://stackoverflow.com/questions/21318304
复制相似问题