如何组合可观测值以避免给定的嵌套回调和依赖回调?

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (2)
  • 关注 (0)
  • 查看 (51)

这个博客,他给(复制/粘贴了以下代码)回调地狱的示例。然而,没有提到如何通过使用反应性扩展来消除这个问题。

因此,这里F3依赖于F1完成,F4和F5依赖于F2完成。

  1. 想知道在Rx中什么是功能等价的。
  2. 如何在Rx中表示F1、F2、F3、F4和F5都应该异步拉出?

注:我目前正试图把我的头脑围绕着Rx,所以在问这个问题之前,我没有尝试解决这个例子。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class CallbackB {

    /**
     * Demonstration of nested callbacks which then need to composes their responses together.
     * <p>
     * Various different approaches for composition can be done but eventually they end up relying upon
     * synchronization techniques such as the CountDownLatch used here or converge on callback design
     * changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.
     */
    public static void run() throws Exception {
        final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
        /* the following are used to synchronize and compose the asynchronous callbacks */
        final CountDownLatch latch = new CountDownLatch(3);
        final AtomicReference<String> f3Value = new AtomicReference<String>();
        final AtomicReference<Integer> f4Value = new AtomicReference<Integer>();
        final AtomicReference<Integer> f5Value = new AtomicReference<Integer>();

        try {
            // get f3 with dependent result from f1
            executor.execute(new CallToRemoteServiceA(new Callback<String>() {

                @Override
                public void call(String f1) {
                    executor.execute(new CallToRemoteServiceC(new Callback<String>() {

                        @Override
                        public void call(String f3) {
                            // we have f1 and f3 now need to compose with others
                            System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5"));
                            // set to thread-safe variable accessible by external scope 
                            f3Value.set(f3);
                            latch.countDown();
                        }

                    }, f1));
                }

            }));

            // get f4/f5 after dependency f2 completes 
            executor.execute(new CallToRemoteServiceB(new Callback<Integer>() {

                @Override
                public void call(Integer f2) {
                    executor.execute(new CallToRemoteServiceD(new Callback<Integer>() {

                        @Override
                        public void call(Integer f4) {
                            // we have f2 and f4 now need to compose with others
                            System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5"));
                            // set to thread-safe variable accessible by external scope 
                            f4Value.set(f4);
                            latch.countDown();
                        }

                    }, f2));
                    executor.execute(new CallToRemoteServiceE(new Callback<Integer>() {

                        @Override
                        public void call(Integer f5) {
                            // we have f2 and f5 now need to compose with others
                            System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5));
                            // set to thread-safe variable accessible by external scope 
                            f5Value.set(f5);
                            latch.countDown();
                        }

                    }, f2));
                }

            }));

            /* we must wait for all callbacks to complete */
            latch.await();
            System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get()));
        } finally {
            executor.shutdownNow();
        }
    }

    public static void main(String[] args) {
        try {
            run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static final class CallToRemoteServiceA implements Runnable {

        private final Callback<String> callback;

        private CallToRemoteServiceA(Callback<String> callback) {
            this.callback = callback;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call("responseA");
        }
    }

    private static final class CallToRemoteServiceB implements Runnable {

        private final Callback<Integer> callback;

        private CallToRemoteServiceB(Callback<Integer> callback) {
            this.callback = callback;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(100);
        }
    }

    private static final class CallToRemoteServiceC implements Runnable {

        private final Callback<String> callback;
        private final String dependencyFromA;

        private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) {
            this.callback = callback;
            this.dependencyFromA = dependencyFromA;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call("responseB_" + dependencyFromA);
        }
    }

    private static final class CallToRemoteServiceD implements Runnable {

        private final Callback<Integer> callback;
        private final Integer dependencyFromB;

        private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) {
            this.callback = callback;
            this.dependencyFromB = dependencyFromB;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(140);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(40 + dependencyFromB);
        }
    }

    private static final class CallToRemoteServiceE implements Runnable {

        private final Callback<Integer> callback;
        private final Integer dependencyFromB;

        private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) {
            this.callback = callback;
            this.dependencyFromB = dependencyFromB;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(55);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(5000 + dependencyFromB);
        }
    }

    private static interface Callback<T> {
        public void call(T value);
    }
}
提问于
用户回答回答于

使用RxJava,您将通过以下代码实现这一点:

Observable<Integer> f3 = callRemoveServiceA() // call serviceA
            // call serviceB with the result of serviceA
            .flatMap((f1) -> callRemoveServiceB(f1)); 


Observable<Integer> f4Andf5 = callRemoveServiceC() // call serviceC
                    // call serviceD and serviceE then build a new value
                    .flatMap((f2) -> callRemoveServiceD(f2).zipWith(callRemoveServiceE(f2), (f4, f5) -> f4 * f5));

// compute the string to display from f3, and the f4, f5 pair
f3.zipWith(f4Andf5, (childF3, childF4Andf5) -> childF3 + " => " + childF4Andf5)
            // display the value
            .subscribe(System.out::println);

这里的重要部分是使用flapMapzip(或zipWith)

  • flapMap将一个值转换为另一个Observable.
  • zip将从两个不同的Observable。因此,您可以使用两个(或更多)的结果构建一个新值。

热门问答

快照容量与费用的比例?如何关闭停用?

帅的惊动我国计算机大神
推荐已采纳
快照已于2019年1月22日0时启动正式商业化进程,商业化后所有存量快照和新产生的快照将根据快照使用的存储容量进行收费。 在快照商业化后,腾讯云仍旧会在国内主要地域为用户提供一定量的免费额度。免费额度策略如下: 免费额度覆盖范围为中国大陆地域,中国香港及海外地域暂无免费快照额...... 展开详请

红包消息如何构建?

红包消息的话,与@消息类似,可以通过 TIMCustomElem 来实现。需要应用在UI上做相应的特殊处理,比如检查到当前消息为红包消息后,消息展示为红包的样式。 另外,红包消息作为重要消息,最好在发送消息的时候将其设置为高优先级消息,以最大程序保证消息在触达频率限制的情况下仍可...... 展开详请

AVChatRoom和ChatRoom有什么区别?

面向的应用场景不同 ChatRoom适用于群组规模中等(不超过数千人级别)、成员进出不太频繁(每秒十几个人进出)的场景;AVChatRoom是专门为了大型直播设计的,适用于人数众多(万级以上)、成员进出频繁(每秒数百人以上进出)的场景。 AVChatRoom的优点 支持人数无上限...... 展开详请

serverless函数如何支持跨域?

解决跨域的方式有几种: 1. 如果不像自行解决跨域问题,且没有处理 http header 方法的问题,可以在 API 网关中,针对 API 配置,不选择 ANY 方法,而且仅选择非 header 的方法,然后勾选启用 CORS,由 API 网关协助解决跨域。完成配置后记得保存并...... 展开详请

调用短信接口返回错误?

您好,短信错误码文档:https://cloud.tencent.com/document/product/382/3771。 报错1021说明:请求发起时间不正常,通常是由于您的服务器时间与腾讯云服务器时间差异超过 10 分钟导致的 核查步骤: 1.请核查服务器时间是否正常; ...... 展开详请

对象存储数据三副本问题,谢谢 ?

波斯狗儿对象存储产品经理
推荐已采纳

1 COS 不完全使用副本的方式保存,数据调度能力属于我们的产品核心竞争力,具体实现方式一般不披露。

2 副本对用户是不感知的,COS 是一个最终一致性的存储,如果发起删除导致数据丢失,所有的数据都会被删除。

所属标签

扫码关注云+社区