首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何使用RxJava执行具有多个依赖项的异步任务

如何使用RxJava执行具有多个依赖项的异步任务
EN

Stack Overflow用户
提问于 2017-06-30 17:20:07
回答 3查看 2.1K关注 0票数 2

我正在学习rxjava,并转换我的一些代码库来了解它是如何工作的。目前,我正试图使用可观察或可完成的方法来完成执行任务(任务执行没有相关的返回值),其依赖关系如下:

执行

任务: A、B、C、D、E

  • B取决于A
  • C依赖于B,D
  • E取决于D

因此,任务执行可能如下所示:

代码语言:javascript
运行
复制
execute A, D  
D completes -> execute E  
A completes -> execute B  
B completes -> execute C (B, D both completed)  

问题

  • 对于rxjava来说,接受任意依赖图是一种很好的用例吗?
  • 这甚至是可观察/可完成的用例吗?
  • 如果是这样的话:实现这种行为的技术是什么?
EN

回答 3

Stack Overflow用户

发布于 2018-09-21 16:32:43

您可以使用凹()合并()运算符来实现这一点。

以下是如何做到这一点:

代码语言:javascript
运行
复制
package rxtest;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;


public class RxJavaDagTest {
    private static final Logger logger = LoggerFactory.getLogger(RxJavaDagTest.class);
    private static Executor customExecutor = Executors.newFixedThreadPool(20);

    @Test
    public void stackOverflowTest() {
        Observable<Character> a = createObservable('A', 100);
        Observable<Character> b = createObservable('B', 200);
        Observable<Character> c = createObservable('C', 500);
        Observable<Character> d = createObservable('D', 150);
        Observable<Character> e = createObservable('E', 200);

        logger.info("BEGIN");

        // As Observable for D is referred at two places in the graph, it needs to be cached to not to execute twice
        Observable<Character> dCached = d.cache();

        Observable.merge(
                Observable.concat(
                        Observable.merge(
                                Observable.concat(a, b),
                                dCached),
                        c),
                Observable.concat(dCached, e))
                .toBlocking()
                .subscribe(i -> logger.info("Executed : " + i));

        logger.info("END");
    }

    private Observable<Character> createObservable(char c, int sleepMs) {
        Observable<Character> single = Observable.just(c)
                .flatMap(i -> Observable.<Character> create(s -> {
                    logger.info("onSubscribe Start Executing : {}", i);
                    sleep(sleepMs);
                    s.onNext(Character.valueOf(i));
                    s.onCompleted();
                }).subscribeOn(Schedulers.from(customExecutor)));
        return single;
    }

    private void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch (InterruptedException e) {
        }
    }
}

产出将是:

代码语言:javascript
运行
复制
20:47:05.633 [main] INFO rxtest.RxJavaDagTest BEGIN
20:47:05.745 [pool-1-thread-1] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : A
20:47:05.748 [pool-1-thread-2] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : D
20:47:05.849 [main] INFO rxtest.RxJavaDagTest Executed : A
20:47:05.850 [pool-1-thread-3] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : B
20:47:05.899 [main] INFO rxtest.RxJavaDagTest Executed : D
20:47:05.899 [main] INFO rxtest.RxJavaDagTest Executed : D
20:47:05.899 [pool-1-thread-4] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : E
20:47:06.051 [main] INFO rxtest.RxJavaDagTest Executed : B
20:47:06.051 [pool-1-thread-5] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : C
20:47:06.100 [main] INFO rxtest.RxJavaDagTest Executed : E
20:47:06.552 [main] INFO rxtest.RxJavaDagTest Executed : C
20:47:06.552 [main] INFO rxtest.RxJavaDagTest END
票数 4
EN

Stack Overflow用户

发布于 2019-10-01 15:04:39

下面是使用RxJava创建和运行DAG的通用工具的实现。我并不声称自己是反应理论或RxJava方面的专家,并且承认我做的事情可能并不是最优的。这段代码使用Lombok和JOOL来消除样板;如果有兴趣,我可以用普通的旧Java提供一个版本。

代码语言:javascript
运行
复制
package reactivedag;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import lombok.NonNull;
import lombok.val;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;

import static io.reactivex.Observable.*;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.jooq.lambda.Seq.seq;

public class ReactiveDag {

    public static <T> Observable<T> runAsReactiveDag(
            List<T> ts,
            Function<T, List<T>> tPredsFn,
            Function<T, List<T>> tSuccsFn,
            Consumer<T> jobLogic) {
        val ctx = new Context<T>(newFixedThreadPool(ts.size()), tPredsFn, tSuccsFn, jobLogic, new HashMap<>());

        val dag = seq(ts)
                .map(t -> {
                    Observable<T> pred = createObservable(t, ctx);
                    List<T> succTs = ctx.tSuccsFn.apply(t);
                    return createAndMergeDag(succTs, pred, ctx);
                })
                .flatMap(Optional::stream)
                .findSingle()
                .orElseThrow(() -> new RuntimeException("No DAG was created"));
        dag.subscribe(x -> {}, x -> {}, () -> {
            ctx.executor.shutdown();
        });
        return dag;
    }

    private static class Context<T> {
        @NonNull final ExecutorService executor;
        @NonNull final Function<T,List<T>> tPredsFn;
        @NonNull final Function<T, List<T>> tSuccsFn;
        @NonNull final Consumer<T> jobLogic;
        @NonNull final Map<T, List<Observable<T>>> deferredMerges;

        Context(ExecutorService executor, Function<T, List<T>> tPredsFn, Function<T, List<T>> tSuccsFn, Consumer<T> jobLogic, Map<T, List<Observable<T>>> deferredMerges) {
            this.executor = executor;
            this.tPredsFn = tPredsFn;
            this.tSuccsFn = tSuccsFn;
            this.jobLogic = jobLogic;
            this.deferredMerges = deferredMerges;
        }
    }

    private static <T> Observable<T> createObservable(T t, Context<T> ctx) {
        return just(t)
                .flatMap(tee -> Observable.<T> create(emitter -> {
                    ctx.jobLogic.accept(t);
                    emitter.onNext(t);
                    emitter.onComplete();

                }).subscribeOn(Schedulers.from(ctx.executor)));
    }

    private static <T> Optional<Observable<T>> handleSuccessor(
        Observable<T> upstreamDag,
        T succT,
        int numUpstreamNodes,
        Context<T> ctx) {

        if (numUpstreamNodes == 1) {
            val newUpstreamDag = concat(upstreamDag, createObservable(succT, ctx));
            val newSuccTs = ctx.tSuccsFn.apply(succT);
            return createAndMergeDag(newSuccTs, newUpstreamDag, ctx);

        } else if (numUpstreamNodes > 1) {
            //this successor will have to be merged: either add to deferrals or merge now...
            List<Observable<T>> deferred = ctx.deferredMerges.getOrDefault(succT, new ArrayList<>());
            if (deferred.size() < numUpstreamNodes - 1) {
                //not all merge partners are constructed yet: add to deferrals
                deferred.add(upstreamDag);
                ctx.deferredMerges.put(succT, deferred); //often redundant, benignly
                return Optional.empty();
            } else {
                //ready to merge: merge the current and all deferred upstreams, remove the deferral entry, and
                //continue building the downstream recursively
                deferred.add(upstreamDag);
                val mergedUpstream = merge(deferred);
                ctx.deferredMerges.remove(succT);
                return handleSuccessor(mergedUpstream, succT, 1, ctx);
            }
        } else {
            throw new RuntimeException("successor " + succT + " is not expected to have zero predecessors");
        }

    }

    private static <T> Optional<Observable<T>> createAndMergeDag(
            List<T> ts,
            Observable<T> upstreamDag,
            Context<T> ctx) {

            if (ts.isEmpty())
                return Optional.of(upstreamDag);

            val maybeWrappedUpstreamDag = ts.size() > 1
                    ? upstreamDag.cache()
                    : upstreamDag;

            val observables = seq(ts)
                .map(succT ->
                    handleSuccessor(maybeWrappedUpstreamDag, succT, ctx.tPredsFn.apply(succT).size(), ctx))
                .flatMap(Optional::stream)
                .toList();

            return observables
                .isEmpty()
                    ? Optional.empty()
                    : Optional.of(observables.size() > 1
                    ? merge(observables)
                    : observables.get(0));
    }
}

我正在使用这个工具来处理一个有点复杂的模型中的所有对象,它大大简化了任务。对于OP的问题,这太过分了,但是为了说明,下面是基于上面的ReactiveDag类的解决方案:

代码语言:javascript
运行
复制
package reactivedag;

import lombok.val;
import org.apache.logging.log4j.Logger;

import static reactivedag.ReactiveDag.runAsReactiveDag;
import static java.lang.Thread.sleep;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.apache.logging.log4j.LogManager.getLogger;

public class Sample {
    private static final Logger log = getLogger();

    static public void main(String[] args) {
        val dag = runAsReactiveDag(asList('A', 'D'),
            c -> {
                //predecessor getter function
                switch (c) {
                    case 'B': return asList('A');
                    case 'C': return asList('B', 'D');
                    case 'E': return asList('D');
                    default: return emptyList();
                }
            }, c -> {
                //successor getter function
                switch (c) {
                    case 'A': return asList('B');
                    case 'B': return asList('C');
                    case 'D': return asList('C', 'E');
                    default: return emptyList();
                }
            }, c -> {
                //job logic
                log.info("Starting " + c);
                try {
                    sleep(2000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                log.info("Ending " + c);
            }
        );
    }
}
票数 1
EN

Stack Overflow用户

发布于 2017-06-30 22:05:53

  1. 对于rxjava来说,接受任意依赖图是一种很好的用例吗?

好吧,根据RxJava文档,这显然是RxJava的一个用例。

RxJava是反应性扩展的Java实现:一个通过使用可观察序列来组合异步和基于事件的程序的库。 它扩展了观察者模式以支持数据/事件序列,并添加了允许您以声明方式组合序列的操作符,同时抽象出对诸如低级别线程、同步、线程安全和并发数据结构等问题的关注。

  1. 这甚至是可观察/可完成的用例吗?

如果您不需要操作的结果,但您只需要知道它是否完成,那么Completable是好的。

  1. 如果是这样的话:实现这种行为的技术是什么?

我不知道是否有一种特定的技术可以帮助您在RxJava代码中转换问题。

我会这样做:

代码语言:javascript
运行
复制
Single.zip(
            executeA()
                    .subscribeOn(Schedulers.newThread())
                    .andThen(executeB())
                    .toSingleDefault(""),
            executeD()
                    .subscribeOn(Schedulers.newThread())
                    .andThen(new CompletableSource() {
                        @Override
                        public void subscribe(@NonNull CompletableObserver cs) {
                            executeE().subscribeOn(Schedulers.newThread())
                                    .subscribe(() -> Log.d("test", "complete E"));
                            cs.onComplete();
                        }
                    })
                    .toSingleDefault(""),
            (BiFunction<String, String, Object>) (s, s2) -> s)
            .flatMapCompletable(o -> CompletableObserver::onComplete)
            .andThen(executeC())
            .doOnSubscribe(disposable -> Log.d("test", "start"))
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(() -> Log.d("test", "complete C"));

其中,任务以下列方式定义为Completable

代码语言:javascript
运行
复制
private Completable executeA() {
    return Completable.create(e -> {
        Thread.sleep(4000);
        Log.d("test", "completing A");
        e.onComplete();
    });
}

private Completable executeB() {
    return Completable.create(e -> {
        Thread.sleep(12000);
        Log.d("test", "completing B");
        e.onComplete();
    });
}

private Completable executeC() {
    return Completable.create(e -> {
        Thread.sleep(3000);
        Log.d("test", "completing C");
        e.onComplete();
    });
}

private Completable executeD() {
    return Completable.create(e -> {
        Thread.sleep(10000);
        Log.d("test", "completing D");
        e.onComplete();
    });
}

private Completable executeE() {
    return Completable.create(e -> {
        Thread.sleep(10000);
        Log.d("test", "completing E");
        e.onComplete();
    });
}

注意到:我不确定我用什么方法来解决andThen部件附加到executeD()的问题。我们不能就这样把D和E连在一起:

代码语言:javascript
运行
复制
executeD().andThen(executeE())

因为否则任务C将在E之后开始,但我们希望它在D之后开始。这是我创建执行任务C的CompletableSource的方式,同时允许调用onComplete()的执行继续进行。

还请注意,在这个实现中,我有两个订阅:路径的每一端一个(C结束和E结束)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44851890

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档