我目前正在尝试使用来自retrofit和Okhttp的API请求在体系结构组件中实现新的ViewModels,一切都正常,但我不知道如何将错误响应从retrofit传递到LiveDataReactiveStreams.fromPublisher,然后上游传递到片段中的观察者。这就是我到目前为止所知道的:
public class ShowListViewModel extends AndroidViewModel {
private final ClientAdapter clientAdapter;
private LiveData<List<Show>> shows;
public ShowListViewModel(Application application) {
    super(application);
    clientAdapter = new ClientAdapter(getApplication().getApplicationContext());
    loadShows();
}
public LiveData<List<Show>> getShows() {
    if (shows == null) {
        shows = new MutableLiveData<>();
    }
    return shows;
}
void loadShows() {
    shows = LiveDataReactiveStreams.fromPublisher(Observable.fromIterable(ShowsUtil.loadsIds())
            .subscribeOn(Schedulers.io())
            .flatMap(clientAdapter::getShowWithNextEpisode)
            .observeOn(Schedulers.computation())
            .toSortedList(new ShowsUtil.ShowComparator())
            .observeOn(AndroidSchedulers.mainThread())
            .toFlowable());
}在片段中,我在OnCreate中使用以下内容设置了viewModel:
ShowListViewModel model = ViewModelProviders.of(this).get(ShowListViewModel.class);
    model.getShows().observe(this, shows -> {
        if (shows == null || shows.isEmpty()) {
            //This is where we may have empty list etc....
        } else {
            //process results from shows list here
        }
    });一切都像预期的那样工作,但是现在如果我们离线了,那么retrofit就会抛出一个runtimeException并崩溃。我认为问题出在这里:
LiveDataReactiveStreams.fromPublisher(Observable.fromIterable(ShowsUtil.loadsIds())
            .subscribeOn(Schedulers.io())
            .flatMap(clientAdapter::getShowWithNextEpisode)
            .observeOn(Schedulers.computation())
            .toSortedList(new ShowsUtil.ShowComparator())
            .observeOn(AndroidSchedulers.mainThread())
            .toFlowable());
}通常我们会使用rxjava2订阅并在那里捕获改造的错误,但当使用LiveDataReactiveStreams.fromPublisher时,它会为我们订阅flowable。那么我们如何将这个错误传递到这里:
model.getShows().observe(this, shows -> { //process error in fragment});
发布于 2018-08-03 17:51:00
您需要将显示和错误包装到一个可以容纳错误的类中,而不是只通过LiveData对象公开显示列表。
使用您的示例,您可以执行以下操作:
    LiveDataReactiveStreams.fromPublisher(Observable.fromIterable(ShowsUtil.loadsIds())
            .subscribeOn(Schedulers.io())
            .flatMap(clientAdapter::getShowWithNextEpisode)
            .observeOn(Schedulers.computation())
            .toSortedList(new ShowsUtil.ShowComparator())
            .observeOn(AndroidSchedulers.mainThread())
            .map(Result::success)
            .onErrorReturn(Result::error)
            .toFlowable());其中,Result是包含错误或结果的包装类
final class Result<T> {
    private final T result;
    private final Throwable error;
    private Result(@Nullable T result, @Nullable Throwable error) {
        this.result = result;
        this.error = error;
    }
    @NonNull
    public static <T> Result<T> success(@NonNull T result) {
        return new Result(result, null);
    }
    @NonNull
    public static <T> Result<T> error(@NonNull Throwable error) {
        return new Result(null, error);
    }
    @Nullable
    public T getResult() {
        return result;
    }
    @Nullable
    public Throwable getError() {
        return error;
    }
}发布于 2018-07-27 08:56:51
我使用了一个带有LiveDataReactiveStreams.fromPublisher的解决方案,以及一个基于以下类的通用包装类结果。
其思想类似于https://stackoverflow.com/a/51669953/4266287
无论是否使用LiveData,都可以使用此模式,其主要思想是能够在不结束流的情况下处理错误。因此,它的工作原理类似于RxRelay,在这种情况下,即使您的错误也会使用onNext发出,而实际意义来自数据,无论是result还是error。
如果这将成为您的项目的主干的一部分,您可以创建一些额外的框架来使所有这些都变得透明。例如,您可以创建自己的类,它类似于一个支持Resource<T>的流,它有一个observe方法,它模仿您调用RxJava的observe的方式,但是“解开”数据并调用正确的回调:
fun subscribe(
        onNext: (T) -> Unit,
        onError: ((Throwable) -> Unit)? = null
) {
    val handleOnNext: (Resource<T>) -> Unit = { resource: Resource<T> ->
        when (resource.status) {
            Status.SUCCESS -> resource.data?.let(onNext)
            Status.ERROR -> resource.error?.let { onError?.invoke(it) ?: throw it }
        }
    }
    publishSubject
        .subscribeOn(subscribeOn)
        .observeOn(observeOn)
        .run { subscribe(handleOnNext) }
        .addTo(compositeDisposable)
}发布于 2020-05-28 13:56:11
在我的例子中,我将LiveDataReactiveStreams.java作为'MyLiveDataReactiveStreams.java‘复制到我的'util’包中。修改后的版本用GreenRobot EventBus post替换了RuntimeException。然后,在我的应用程序中,我可以订阅该事件并适当地处理错误。对于这个解决方案,我必须在3个地方添加'@SuppressLint("RestrictedApi")‘。我不确定谷歌是否允许play store应用程序这样做。此repo包含一个完整的示例:下面的https://github.com/LeeHounshell/Dogs是相关代码:
    // add GreenRobot to your app/build.gradle
    def greenrobot_version = '3.2.0'
    def timberkt_version = '1.5.1'
    implementation "org.greenrobot:eventbus:$greenrobot_version"
    implementation "com.github.ajalt:timberkt:$timberkt_version"
//--------------------------------------------------
// next put this in your Activity or Fragment to handle the error
override fun onStart() {
    super.onStart()
    EventBus.getDefault().register(this)
}
override fun onStop() {
    super.onStop()
    EventBus.getDefault().unregister(this);
}
@Subscribe(threadMode = ThreadMode.MAIN)
fun onRxErrorEvent(rxError_event: RxErrorEvent) {
    // do your custom error handling here
    Toast.makeText(activity, rxError_event.errorDescription, Toast.LENGTH_LONG).show()
}
//--------------------------------------------------
// create a new class in 'util' to post the error
import com.github.ajalt.timberkt.Timber
import org.greenrobot.eventbus.EventBus
class RxErrorEvent(val description: String) {
    private val _tag = "LEE: <" + RxErrorEvent::class.java.simpleName + ">"
    lateinit var errorDescription: String
    init {
        errorDescription = description
    }
    fun post() {
        Timber.tag(_tag).e("post $errorDescription")
        EventBus.getDefault().post(this)
    }
}
//--------------------------------------------------
// and use MyLiveDataReactiveStreams (below) instead of LiveDataReactiveStreams
/*
 *  This is a modified version of androidx.lifecycle.LiveDataReactiveStreams
 *  The original LiveDataReactiveStreams object can't handle RxJava error conditions.
 *  Now errors are emitted as RxErrorEvent objects using the GreenRobot EventBus.
 */
import android.annotation.SuppressLint;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.arch.core.executor.ArchTaskExecutor;
import androidx.lifecycle.LifecycleOwner;
import androidx.lifecycle.LiveData;
import androidx.lifecycle.Observer;
import com.github.ajalt.timberkt.Timber;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.atomic.AtomicReference;
/**
 * Adapts {@link LiveData} input and output to the ReactiveStreams spec.
 */
@SuppressWarnings("WeakerAccess")
public final class MyLiveDataReactiveStreams {
    private final static String _tag = "LEE: <" + MyLiveDataReactiveStreams.class.getSimpleName() + ">";
    private MyLiveDataReactiveStreams() {
    }
    /**
     * Adapts the given {@link LiveData} stream to a ReactiveStreams {@link Publisher}.
     *
     * <p>
     * By using a good publisher implementation such as RxJava 2.x Flowables, most consumers will
     * be able to let the library deal with backpressure using operators and not need to worry about
     * ever manually calling {@link Subscription#request}.
     *
     * <p>
     * On subscription to the publisher, the observer will attach to the given {@link LiveData}.
     * Once {@link Subscription#request} is called on the subscription object, an observer will be
     * connected to the data stream. Calling request(Long.MAX_VALUE) is equivalent to creating an
     * unbounded stream with no backpressure. If request with a finite count reaches 0, the observer
     * will buffer the latest item and emit it to the subscriber when data is again requested. Any
     * other items emitted during the time there was no backpressure requested will be dropped.
     */
    @NonNull
    public static <T> Publisher<T> toPublisher(
            @NonNull LifecycleOwner lifecycle, @NonNull LiveData<T> liveData) {
        return new MyLiveDataReactiveStreams.LiveDataPublisher<>(lifecycle, liveData);
    }
    private static final class LiveDataPublisher<T> implements Publisher<T> {
        final LifecycleOwner mLifecycle;
        final LiveData<T> mLiveData;
        LiveDataPublisher(LifecycleOwner lifecycle, LiveData<T> liveData) {
            this.mLifecycle = lifecycle;
            this.mLiveData = liveData;
        }
        @Override
        public void subscribe(Subscriber<? super T> subscriber) {
            subscriber.onSubscribe(new MyLiveDataReactiveStreams.LiveDataPublisher.LiveDataSubscription<T>(subscriber, mLifecycle, mLiveData));
        }
        static final class LiveDataSubscription<T> implements Subscription, Observer<T> {
            final Subscriber<? super T> mSubscriber;
            final LifecycleOwner mLifecycle;
            final LiveData<T> mLiveData;
            volatile boolean mCanceled;
            // used on main thread only
            boolean mObserving;
            long mRequested;
            // used on main thread only
            @Nullable
            T mLatest;
            LiveDataSubscription(final Subscriber<? super T> subscriber,
                                 final LifecycleOwner lifecycle, final LiveData<T> liveData) {
                this.mSubscriber = subscriber;
                this.mLifecycle = lifecycle;
                this.mLiveData = liveData;
            }
            @Override
            public void onChanged(@Nullable T t) {
                if (mCanceled) {
                    return;
                }
                if (mRequested > 0) {
                    mLatest = null;
                    mSubscriber.onNext(t);
                    if (mRequested != Long.MAX_VALUE) {
                        mRequested--;
                    }
                } else {
                    mLatest = t;
                }
            }
            @SuppressLint("RestrictedApi")
            @Override
            public void request(final long n) {
                if (mCanceled) {
                    return;
                }
                ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
                    @Override
                    public void run() {
                        if (mCanceled) {
                            return;
                        }
                        if (n <= 0L) {
                            mCanceled = true;
                            if (mObserving) {
                                mLiveData.removeObserver(MyLiveDataReactiveStreams.LiveDataPublisher.LiveDataSubscription.this);
                                mObserving = false;
                            }
                            mLatest = null;
                            mSubscriber.onError(
                                    new IllegalArgumentException("Non-positive request"));
                            return;
                        }
                        // Prevent overflowage.
                        mRequested = mRequested + n >= mRequested
                                ? mRequested + n : Long.MAX_VALUE;
                        if (!mObserving) {
                            mObserving = true;
                            mLiveData.observe(mLifecycle, MyLiveDataReactiveStreams.LiveDataPublisher.LiveDataSubscription.this);
                        } else if (mLatest != null) {
                            onChanged(mLatest);
                            mLatest = null;
                        }
                    }
                });
            }
            @SuppressLint("RestrictedApi")
            @Override
            public void cancel() {
                if (mCanceled) {
                    return;
                }
                mCanceled = true;
                ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
                    @Override
                    public void run() {
                        if (mObserving) {
                            mLiveData.removeObserver(MyLiveDataReactiveStreams.LiveDataPublisher.LiveDataSubscription.this);
                            mObserving = false;
                        }
                        mLatest = null;
                    }
                });
            }
        }
    }
    /**
     * Creates an observable {@link LiveData} stream from a ReactiveStreams {@link Publisher}}.
     *
     * <p>
     * When the LiveData becomes active, it subscribes to the emissions from the Publisher.
     *
     * <p>
     * When the LiveData becomes inactive, the subscription is cleared.
     * LiveData holds the last value emitted by the Publisher when the LiveData was active.
     * <p>
     * Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
     * added, it will automatically notify with the last value held in LiveData,
     * which might not be the last value emitted by the Publisher.
     * <p>
     * Note that LiveData does NOT handle errors and it expects that errors are treated as states
     * in the data that's held. In case of an error being emitted by the publisher, an error will
     * be propagated to the main thread and the app will crash.
     *
     * @param <T> The type of data hold by this instance.
     */
    @NonNull
    public static <T> LiveData<T> fromPublisher(@NonNull Publisher<T> publisher) {
        return new MyLiveDataReactiveStreams.PublisherLiveData<>(publisher);
    }
    /**
     * Defines a {@link LiveData} object that wraps a {@link Publisher}.
     *
     * <p>
     * When the LiveData becomes active, it subscribes to the emissions from the Publisher.
     *
     * <p>
     * When the LiveData becomes inactive, the subscription is cleared.
     * LiveData holds the last value emitted by the Publisher when the LiveData was active.
     * <p>
     * Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
     * added, it will automatically notify with the last value held in LiveData,
     * which might not be the last value emitted by the Publisher.
     *
     * <p>
     * Note that LiveData does NOT handle errors and it expects that errors are treated as states
     * in the data that's held. In case of an error being emitted by the publisher, an error will
     * be propagated to the main thread and the app will crash.
     *
     * @param <T> The type of data hold by this instance.
     */
    private static class PublisherLiveData<T> extends LiveData<T> {
        private final Publisher<T> mPublisher;
        final AtomicReference<MyLiveDataReactiveStreams.PublisherLiveData.LiveDataSubscriber> mSubscriber;
        PublisherLiveData(@NonNull Publisher<T> publisher) {
            mPublisher = publisher;
            mSubscriber = new AtomicReference<>();
        }
        @Override
        protected void onActive() {
            super.onActive();
            MyLiveDataReactiveStreams.PublisherLiveData.LiveDataSubscriber s = new MyLiveDataReactiveStreams.PublisherLiveData.LiveDataSubscriber();
            mSubscriber.set(s);
            mPublisher.subscribe(s);
        }
        @Override
        protected void onInactive() {
            super.onInactive();
            MyLiveDataReactiveStreams.PublisherLiveData.LiveDataSubscriber s = mSubscriber.getAndSet(null);
            if (s != null) {
                s.cancelSubscription();
            }
        }
        final class LiveDataSubscriber extends AtomicReference<Subscription>
                implements Subscriber<T> {
            @Override
            public void onSubscribe(Subscription s) {
                if (compareAndSet(null, s)) {
                    s.request(Long.MAX_VALUE);
                } else {
                    s.cancel();
                }
            }
            @Override
            public void onNext(T item) {
                postValue(item);
            }
            @SuppressLint("RestrictedApi")
            @Override
            public void onError(final Throwable ex) {
                mSubscriber.compareAndSet(this, null);
                ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
                    @Override
                    public void run() {
                        //NOTE: Errors are be handled upstream
                        Timber.tag(_tag).e("LiveData does not handle errors. Errors from publishers are handled upstream via EventBus. error: " + ex);
                        RxErrorEvent rx_event = new RxErrorEvent(ex.toString());
                        rx_event.post();
                    }
                });
            }
            @Override
            public void onComplete() {
                mSubscriber.compareAndSet(this, null);
            }
            public void cancelSubscription() {
                Subscription s = get();
                if (s != null) {
                    s.cancel();
                }
            }
        }
    }
}https://stackoverflow.com/questions/46304042
复制相似问题