首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >领域:使用清洁架构和RxJava2

领域:使用清洁架构和RxJava2
EN

Stack Overflow用户
提问于 2017-04-03 18:40:17
回答 1查看 2.5K关注 0票数 3

有一点上下文,我尝试将一些干净的体系结构应用到我的一个项目中,而且我在存储库的(领域)磁盘实现方面遇到了问题。我有一个存储库,它根据某些条件(缓存)从不同的DataStores中提取一些数据。这就是理论,当所有这些都与UseCases和RxJava2混合时,问题就出现了。

首先,我从领域中获取对象列表,然后手动创建一个可以观察到的对象。subscribe ,但是(如预期的那样)在不同的线程上执行,因此领域最终导致…崩溃。 (第二代码块)

这是我用来创建可观察性(来自抽象类DiskStoreBase)的代码:

代码语言:javascript
复制
Observable<List<T>> createListFrom(final List<T> list) {
    return Observable.create(new ObservableOnSubscribe<List<T>>() {
        @Override
        public void subscribe(ObservableEmitter<List<T>> emitter) throws Exception {
            if (list != null) {
                emitter.onNext(list);
                emitter.onComplete();
            } else {
                emitter.onError(new ExceptionCacheNotFound());
            }
        }
    });
}

如何处理这种情况?

更多的DiskStoreForZone代码

代码语言:javascript
复制
@Override
public Observable<List<ResponseZone>> entityList() {
    Realm realm = Realm.getDefaultInstance();
    List<ResponseZone> result = realm.where(ResponseZone.class).findAll();
    return createListFrom(result);
}

确切的坠机:

代码语言:javascript
复制
E/REALM_JNI: jni: ThrowingException 8, Realm accessed from incorrect thread.
E/REALM_JNI: Exception has been thrown: Realm accessed from incorrect thread.
EN

Stack Overflow用户

发布于 2017-04-04 08:50:18

它不能工作,因为尽管使用Rx,数据层并不是反应性的。

领域本质上是一个反应性数据源,其托管对象本质上也是可变(按领域更新)和(只能在打开领域的同一线程上访问)。

要使代码正常工作,您需要从领域中复制数据。

代码语言:javascript
复制
@Override
public Single<List<ResponseZone>> entityList() {
    return Single.fromCallable(() -> {
       try(Realm realm = Realm.getDefaultInstance()) {
           return realm.copyFromRealm(realm.where(ResponseZone.class).findAll());
       }
    });
}

我冒昧地将您的Single表示为一个Single,考虑到它不是一个可观察的,它不监听更改,只有1事件,这就是列表本身。因此,通过ObservableEmitter发送它并不是真正有意义的,因为它不会发出事件。

因此,这就是为什么我说:您的数据层不是反应性的。你没有在听变化。您只是直接获取数据,并且从未收到任何更改的通知;尽管使用了Rx。

我画了几幅画来说明我的观点。(蓝色表示副作用)

在这种情况下,可以调用一次性操作从多个数据源(缓存、本地、远程)检索数据。一旦获得数据,就不会监听更改;从技术上讲,如果您在一个地方和另一个地方编辑数据,唯一的更新方法是“强制缓存手动检索新数据”;为此,必须知道您在其他地方修改了数据。为此,您需要一种直接调用回调或发送消息/事件-更改通知的方法。

因此,在某种程度上,您必须创建一个缓存无效通知事件。如果你听的话,解决方案可能又是被动的。只是你是手工做的。

考虑到领域已经是一个反应性数据源(类似于SQLBrite for SQLite),它能够提供更改通知,通过这些通知您可以“使缓存失效”。

实际上,如果您的本地数据源是唯一的数据源,而且网络中的任何写入都是您所听的更改,那么您的“缓存”可以被写入为replay(1).publish().refCount() (为新订阅者重播最新数据,如果计算了新数据,则用新的数据替换数据),即RxReplayingShare

从处理程序线程的活套创建。,您可以在后台线程上侦听领域中的更改,创建一个返回最新非托管副本的反应性数据源,您可以在线程之间传递这些副本(尽管如果选择此路由,则直接映射到不可变域模型优于copyFromRealm() --路由是干净的体系结构)。

代码语言:javascript
复制
return io.reactivex.Observable.create(new ObservableOnSubscribe<List<ResponseZone>>() {
    @Override
    public void subscribe(ObservableEmitter<List<ResponseZone>> emitter)
            throws Exception {
        final Realm observableRealm = Realm.getDefaultInstance();
        final RealmResults<ResponseZone> results = observableRealm.where(ResponseZone.class).findAllAsync();
        final RealmChangeListener<RealmResults<ResponseZone>> listener = results -> {
            if(!emitter.isDisposed()) {
                if(results.isValid() && results.isLoaded()) {
                    emitter.onNext(observableRealm.copyFromRealm(results));
                }
            }
        };

        emitter.setDisposable(Disposables.fromRunnable(() -> {
            if(results.isValid()) {
                results.removeChangeListener(listener);
            }
            observableRealm.close();
        }));
        results.addChangeListener(listener);
        // initial value will be handled by async query
    }
}).subscribeOn(looperScheduler).unsubscribeOn(looperScheduler);

其中,looper调度程序获得如下所示

代码语言:javascript
复制
    handlerThread = new HandlerThread("LOOPER_SCHEDULER");
    handlerThread.start();
    synchronized(handlerThread) {
        looperScheduler  = AndroidSchedulers.from(handlerThread.getLooper());
    }

这就是使用领域创建反应性清洁体系结构的方法。

加:

只有当您打算在领域上实际执行清洁体系结构时,才需要LooperScheduler。这是因为默认情况下,王国鼓励您使用您的数据对象作为域模型,并且作为一种优势,可以提供延迟加载的线程本地视图,这些视图在更新时会发生变化;但是说您应该使用不可变的域模型(独立于您的数据层)。因此,如果您想要在任何时候在后台线程上复制域的反应性干净架构,那么您将需要一个looper调度程序(或者在后台线程上观察,但是在Schedulers.io()上从刷新的领域进行复制)。

对于领域,通常您希望使用RealmObjects作为域模型,并依赖于惰性评估。在这种情况下,您不使用copyFromRealm(),也不将RealmResults映射到其他东西;但是可以将其公开为FlowableLiveData

您可以阅读有关此这里的相关内容。

票数 13
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43192015

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档