如何组合可观测值以避免给定的嵌套回调和依赖回调?

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (2)
  • 关注 (0)
  • 查看 (58)

这个博客,他给(复制/粘贴了以下代码)回调地狱的示例。然而,没有提到如何通过使用反应性扩展来消除这个问题。

因此,这里F3依赖于F1完成,F4和F5依赖于F2完成。

  1. 想知道在Rx中什么是功能等价的。
  2. 如何在Rx中表示F1、F2、F3、F4和F5都应该异步拉出?

注:我目前正试图把我的头脑围绕着Rx,所以在问这个问题之前,我没有尝试解决这个例子。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class CallbackB {

    /**
     * Demonstration of nested callbacks which then need to composes their responses together.
     * <p>
     * Various different approaches for composition can be done but eventually they end up relying upon
     * synchronization techniques such as the CountDownLatch used here or converge on callback design
     * changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.
     */
    public static void run() throws Exception {
        final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
        /* the following are used to synchronize and compose the asynchronous callbacks */
        final CountDownLatch latch = new CountDownLatch(3);
        final AtomicReference<String> f3Value = new AtomicReference<String>();
        final AtomicReference<Integer> f4Value = new AtomicReference<Integer>();
        final AtomicReference<Integer> f5Value = new AtomicReference<Integer>();

        try {
            // get f3 with dependent result from f1
            executor.execute(new CallToRemoteServiceA(new Callback<String>() {

                @Override
                public void call(String f1) {
                    executor.execute(new CallToRemoteServiceC(new Callback<String>() {

                        @Override
                        public void call(String f3) {
                            // we have f1 and f3 now need to compose with others
                            System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5"));
                            // set to thread-safe variable accessible by external scope 
                            f3Value.set(f3);
                            latch.countDown();
                        }

                    }, f1));
                }

            }));

            // get f4/f5 after dependency f2 completes 
            executor.execute(new CallToRemoteServiceB(new Callback<Integer>() {

                @Override
                public void call(Integer f2) {
                    executor.execute(new CallToRemoteServiceD(new Callback<Integer>() {

                        @Override
                        public void call(Integer f4) {
                            // we have f2 and f4 now need to compose with others
                            System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5"));
                            // set to thread-safe variable accessible by external scope 
                            f4Value.set(f4);
                            latch.countDown();
                        }

                    }, f2));
                    executor.execute(new CallToRemoteServiceE(new Callback<Integer>() {

                        @Override
                        public void call(Integer f5) {
                            // we have f2 and f5 now need to compose with others
                            System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5));
                            // set to thread-safe variable accessible by external scope 
                            f5Value.set(f5);
                            latch.countDown();
                        }

                    }, f2));
                }

            }));

            /* we must wait for all callbacks to complete */
            latch.await();
            System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get()));
        } finally {
            executor.shutdownNow();
        }
    }

    public static void main(String[] args) {
        try {
            run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static final class CallToRemoteServiceA implements Runnable {

        private final Callback<String> callback;

        private CallToRemoteServiceA(Callback<String> callback) {
            this.callback = callback;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call("responseA");
        }
    }

    private static final class CallToRemoteServiceB implements Runnable {

        private final Callback<Integer> callback;

        private CallToRemoteServiceB(Callback<Integer> callback) {
            this.callback = callback;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(100);
        }
    }

    private static final class CallToRemoteServiceC implements Runnable {

        private final Callback<String> callback;
        private final String dependencyFromA;

        private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) {
            this.callback = callback;
            this.dependencyFromA = dependencyFromA;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call("responseB_" + dependencyFromA);
        }
    }

    private static final class CallToRemoteServiceD implements Runnable {

        private final Callback<Integer> callback;
        private final Integer dependencyFromB;

        private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) {
            this.callback = callback;
            this.dependencyFromB = dependencyFromB;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(140);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(40 + dependencyFromB);
        }
    }

    private static final class CallToRemoteServiceE implements Runnable {

        private final Callback<Integer> callback;
        private final Integer dependencyFromB;

        private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) {
            this.callback = callback;
            this.dependencyFromB = dependencyFromB;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(55);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(5000 + dependencyFromB);
        }
    }

    private static interface Callback<T> {
        public void call(T value);
    }
}
提问于
用户回答回答于

下面是一个使用PlatMap、zip和Merge异步完成服务组合的示例。

它获取一个用户对象,然后并发获取Social和PersonalizedCatalog数据,然后对PersonalizedCatalog中的每个视频同时获取一个书签、评级和元数据,将它们压缩到一起,并将所有响应合并到一个渐进流输出中,作为服务器发送的事件。

return getUser(userId).flatMap(user -> {
    Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user)
            .flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
                    video -> {
                        Observable<Bookmark> bookmark = getBookmark(video);
                        Observable<Rating> rating = getRatings(video);
                        Observable<VideoMetadata> metadata = getVideoMetadata(video);
                        return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
                    }));

    Observable<Map<String, Object>> social = getSocial(user).map(s -> {
        return s.getDataAsMap();
    });

    return Observable.merge(catalog, social);
}).flatMap(data -> {
    String json = SimpleJson.mapToJson(data);
    return response.writeStringAndFlush("data: " + json + "\n");
});

热门问答

腾讯云GPU服务器不能联外网吗?

小爱同学

腾讯云 · 技术支持 (已认证)

推荐
腾讯云GPU服务器可连外网,GPU 云服务器提供和标准CVM 云服务器一致的方便快捷的管理方式。 图片.png GPU云服务器作为CVM云服务器的一类特殊实例,购买、 操作、维护等方式与CVM云服务器一致 图片.png GPU 云服务器(GPU Cloud Computin...... 展开详请

win服务器怎么给文件夹配置755权限?

推荐
下面以腾讯云win服务器(Windows Server 2016 数据中心版 64位中文版)为文件夹配置755权限为例 1.右击【属性】 图片.png 2 .选择【安全】- 【编辑】 图片.png 3. 可对当前文件进行755权限配置 图片.png 要修改某个文件的权...... 展开详请

腾讯云sdk 兼容JDK6?

推荐

如果你说的是https://cloud.tencent.com/document/sdk/Java的话,jdk最低版本是1.7,不支持1.6

android 离线推送 为什么setOfflinePushListener不回调?

嗨喽你好摩羯座
推荐
您好,使用云通信 IM SDK 的通知栏提醒,建议参考:https://cloud.tencent.com/document/product/269/9234 中的描述来操作,通知栏提醒的内容由类 TIMOfflinePushNotification 来定义,可以通过这个类对外...... 展开详请

为什么cmq的topic配置订阅者为queue,向topic发送消息无法到达queue?

是的, 向topic发送消息应该会立即投递到订阅者。您可以检查您配置的队列名称是否正确且是真实存在的队列。如还不能解决您的问题,您可以点击控制台右上角的“工单”,进行问题进一步的排查,腾讯云会有专业的售后24小时为您服务。

脏字过滤只支持 TIMTextElem 消息,对自定义消息无效,请问有接口可以主动检查吗?

学生路人
推荐
您好,脏字检查的内容包括单聊和群组消息(只检查文本消息 TIMTextElem,不支持对自定义消息 TIMCustomElem 的过滤)、群名片、群组资料(群名称、群简介、群公告)用户资料和好友关系链中 bytes 类型的数据(如昵称、好友备注和好友分组等)。目前没有这样的接口喔...... 展开详请

所属标签

扫码关注云+社区