我正在学习rxjava,并转换我的一些代码库来了解它是如何工作的。目前,我正试图使用可观察或可完成的方法来完成执行任务(任务执行没有相关的返回值),其依赖关系如下:
执行
任务: A、B、C、D、E
因此,任务执行可能如下所示:
execute A, D
D completes -> execute E
A completes -> execute B
B completes -> execute C (B, D both completed)
问题
发布于 2018-09-21 16:32:43
以下是如何做到这一点:
package rxtest;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;
public class RxJavaDagTest {
private static final Logger logger = LoggerFactory.getLogger(RxJavaDagTest.class);
private static Executor customExecutor = Executors.newFixedThreadPool(20);
@Test
public void stackOverflowTest() {
Observable<Character> a = createObservable('A', 100);
Observable<Character> b = createObservable('B', 200);
Observable<Character> c = createObservable('C', 500);
Observable<Character> d = createObservable('D', 150);
Observable<Character> e = createObservable('E', 200);
logger.info("BEGIN");
// As Observable for D is referred at two places in the graph, it needs to be cached to not to execute twice
Observable<Character> dCached = d.cache();
Observable.merge(
Observable.concat(
Observable.merge(
Observable.concat(a, b),
dCached),
c),
Observable.concat(dCached, e))
.toBlocking()
.subscribe(i -> logger.info("Executed : " + i));
logger.info("END");
}
private Observable<Character> createObservable(char c, int sleepMs) {
Observable<Character> single = Observable.just(c)
.flatMap(i -> Observable.<Character> create(s -> {
logger.info("onSubscribe Start Executing : {}", i);
sleep(sleepMs);
s.onNext(Character.valueOf(i));
s.onCompleted();
}).subscribeOn(Schedulers.from(customExecutor)));
return single;
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch (InterruptedException e) {
}
}
}
产出将是:
20:47:05.633 [main] INFO rxtest.RxJavaDagTest BEGIN
20:47:05.745 [pool-1-thread-1] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : A
20:47:05.748 [pool-1-thread-2] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : D
20:47:05.849 [main] INFO rxtest.RxJavaDagTest Executed : A
20:47:05.850 [pool-1-thread-3] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : B
20:47:05.899 [main] INFO rxtest.RxJavaDagTest Executed : D
20:47:05.899 [main] INFO rxtest.RxJavaDagTest Executed : D
20:47:05.899 [pool-1-thread-4] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : E
20:47:06.051 [main] INFO rxtest.RxJavaDagTest Executed : B
20:47:06.051 [pool-1-thread-5] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : C
20:47:06.100 [main] INFO rxtest.RxJavaDagTest Executed : E
20:47:06.552 [main] INFO rxtest.RxJavaDagTest Executed : C
20:47:06.552 [main] INFO rxtest.RxJavaDagTest END
发布于 2019-10-01 15:04:39
下面是使用RxJava创建和运行DAG的通用工具的实现。我并不声称自己是反应理论或RxJava方面的专家,并且承认我做的事情可能并不是最优的。这段代码使用Lombok和JOOL来消除样板;如果有兴趣,我可以用普通的旧Java提供一个版本。
package reactivedag;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import lombok.NonNull;
import lombok.val;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import static io.reactivex.Observable.*;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.jooq.lambda.Seq.seq;
public class ReactiveDag {
public static <T> Observable<T> runAsReactiveDag(
List<T> ts,
Function<T, List<T>> tPredsFn,
Function<T, List<T>> tSuccsFn,
Consumer<T> jobLogic) {
val ctx = new Context<T>(newFixedThreadPool(ts.size()), tPredsFn, tSuccsFn, jobLogic, new HashMap<>());
val dag = seq(ts)
.map(t -> {
Observable<T> pred = createObservable(t, ctx);
List<T> succTs = ctx.tSuccsFn.apply(t);
return createAndMergeDag(succTs, pred, ctx);
})
.flatMap(Optional::stream)
.findSingle()
.orElseThrow(() -> new RuntimeException("No DAG was created"));
dag.subscribe(x -> {}, x -> {}, () -> {
ctx.executor.shutdown();
});
return dag;
}
private static class Context<T> {
@NonNull final ExecutorService executor;
@NonNull final Function<T,List<T>> tPredsFn;
@NonNull final Function<T, List<T>> tSuccsFn;
@NonNull final Consumer<T> jobLogic;
@NonNull final Map<T, List<Observable<T>>> deferredMerges;
Context(ExecutorService executor, Function<T, List<T>> tPredsFn, Function<T, List<T>> tSuccsFn, Consumer<T> jobLogic, Map<T, List<Observable<T>>> deferredMerges) {
this.executor = executor;
this.tPredsFn = tPredsFn;
this.tSuccsFn = tSuccsFn;
this.jobLogic = jobLogic;
this.deferredMerges = deferredMerges;
}
}
private static <T> Observable<T> createObservable(T t, Context<T> ctx) {
return just(t)
.flatMap(tee -> Observable.<T> create(emitter -> {
ctx.jobLogic.accept(t);
emitter.onNext(t);
emitter.onComplete();
}).subscribeOn(Schedulers.from(ctx.executor)));
}
private static <T> Optional<Observable<T>> handleSuccessor(
Observable<T> upstreamDag,
T succT,
int numUpstreamNodes,
Context<T> ctx) {
if (numUpstreamNodes == 1) {
val newUpstreamDag = concat(upstreamDag, createObservable(succT, ctx));
val newSuccTs = ctx.tSuccsFn.apply(succT);
return createAndMergeDag(newSuccTs, newUpstreamDag, ctx);
} else if (numUpstreamNodes > 1) {
//this successor will have to be merged: either add to deferrals or merge now...
List<Observable<T>> deferred = ctx.deferredMerges.getOrDefault(succT, new ArrayList<>());
if (deferred.size() < numUpstreamNodes - 1) {
//not all merge partners are constructed yet: add to deferrals
deferred.add(upstreamDag);
ctx.deferredMerges.put(succT, deferred); //often redundant, benignly
return Optional.empty();
} else {
//ready to merge: merge the current and all deferred upstreams, remove the deferral entry, and
//continue building the downstream recursively
deferred.add(upstreamDag);
val mergedUpstream = merge(deferred);
ctx.deferredMerges.remove(succT);
return handleSuccessor(mergedUpstream, succT, 1, ctx);
}
} else {
throw new RuntimeException("successor " + succT + " is not expected to have zero predecessors");
}
}
private static <T> Optional<Observable<T>> createAndMergeDag(
List<T> ts,
Observable<T> upstreamDag,
Context<T> ctx) {
if (ts.isEmpty())
return Optional.of(upstreamDag);
val maybeWrappedUpstreamDag = ts.size() > 1
? upstreamDag.cache()
: upstreamDag;
val observables = seq(ts)
.map(succT ->
handleSuccessor(maybeWrappedUpstreamDag, succT, ctx.tPredsFn.apply(succT).size(), ctx))
.flatMap(Optional::stream)
.toList();
return observables
.isEmpty()
? Optional.empty()
: Optional.of(observables.size() > 1
? merge(observables)
: observables.get(0));
}
}
我正在使用这个工具来处理一个有点复杂的模型中的所有对象,它大大简化了任务。对于OP的问题,这太过分了,但是为了说明,下面是基于上面的ReactiveDag类的解决方案:
package reactivedag;
import lombok.val;
import org.apache.logging.log4j.Logger;
import static reactivedag.ReactiveDag.runAsReactiveDag;
import static java.lang.Thread.sleep;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.apache.logging.log4j.LogManager.getLogger;
public class Sample {
private static final Logger log = getLogger();
static public void main(String[] args) {
val dag = runAsReactiveDag(asList('A', 'D'),
c -> {
//predecessor getter function
switch (c) {
case 'B': return asList('A');
case 'C': return asList('B', 'D');
case 'E': return asList('D');
default: return emptyList();
}
}, c -> {
//successor getter function
switch (c) {
case 'A': return asList('B');
case 'B': return asList('C');
case 'D': return asList('C', 'E');
default: return emptyList();
}
}, c -> {
//job logic
log.info("Starting " + c);
try {
sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("Ending " + c);
}
);
}
}
发布于 2017-06-30 22:05:53
好吧,根据RxJava文档,这显然是RxJava的一个用例。
RxJava是反应性扩展的Java实现:一个通过使用可观察序列来组合异步和基于事件的程序的库。 它扩展了观察者模式以支持数据/事件序列,并添加了允许您以声明方式组合序列的操作符,同时抽象出对诸如低级别线程、同步、线程安全和并发数据结构等问题的关注。
如果您不需要操作的结果,但您只需要知道它是否完成,那么Completable
是好的。
我不知道是否有一种特定的技术可以帮助您在RxJava代码中转换问题。
我会这样做:
Single.zip(
executeA()
.subscribeOn(Schedulers.newThread())
.andThen(executeB())
.toSingleDefault(""),
executeD()
.subscribeOn(Schedulers.newThread())
.andThen(new CompletableSource() {
@Override
public void subscribe(@NonNull CompletableObserver cs) {
executeE().subscribeOn(Schedulers.newThread())
.subscribe(() -> Log.d("test", "complete E"));
cs.onComplete();
}
})
.toSingleDefault(""),
(BiFunction<String, String, Object>) (s, s2) -> s)
.flatMapCompletable(o -> CompletableObserver::onComplete)
.andThen(executeC())
.doOnSubscribe(disposable -> Log.d("test", "start"))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(() -> Log.d("test", "complete C"));
其中,任务以下列方式定义为Completable
:
private Completable executeA() {
return Completable.create(e -> {
Thread.sleep(4000);
Log.d("test", "completing A");
e.onComplete();
});
}
private Completable executeB() {
return Completable.create(e -> {
Thread.sleep(12000);
Log.d("test", "completing B");
e.onComplete();
});
}
private Completable executeC() {
return Completable.create(e -> {
Thread.sleep(3000);
Log.d("test", "completing C");
e.onComplete();
});
}
private Completable executeD() {
return Completable.create(e -> {
Thread.sleep(10000);
Log.d("test", "completing D");
e.onComplete();
});
}
private Completable executeE() {
return Completable.create(e -> {
Thread.sleep(10000);
Log.d("test", "completing E");
e.onComplete();
});
}
注意到:我不确定我用什么方法来解决andThen
部件附加到executeD()
的问题。我们不能就这样把D和E连在一起:
executeD().andThen(executeE())
因为否则任务C将在E之后开始,但我们希望它在D之后开始。这是我创建执行任务C的CompletableSource的方式,同时允许调用onComplete()
的执行继续进行。
还请注意,在这个实现中,我有两个订阅:路径的每一端一个(C结束和E结束)。
https://stackoverflow.com/questions/44851890
复制相似问题