RxJava2 实战知识梳理(6) - 基于错误类型的重试请求

作者:泽毛

地址:http://www.jianshu.com/p/d135f19e045c

声明:本文是泽毛原创,已获其授权发布,未经原作者允许请勿转载

一、前言

1.1 应用背景

在网络请求时,有时候会出现需要进行重试的情况,重试的时候,有以下几点需要注意:

  • 限制重试的次数
  • 根据错误类型,判断是否要重试
  • 根据错误类型,等待特定的时间之后再去重试

我们先来看一下目前的一些网络框架是怎么做的?通过分析Volley的源码,可以从BasicNetwork的代码中看到,它是将网络请求的代码都放在一个无限的while(true)循环当中,如果发生了异常,会在其中的catch语句中进行处理,如果需要继续重试,那么就吞掉这个异常,并将重试次数加1,这样就会进入下一次的while(true)循环去访问网络;如果不需要重试,那么就抛出这个异常,退出这个无限循环。也就是实现了前面两点需求。

下面我们就来演示如何通过RxJava2来轻松实现上面的三点需求,通过这篇文章,我们将学习retryWhen操作符的具体用法,retryWhenrepeatWhen经常被大家用来比较,如果对repeatWhen感兴趣的同学可以阅读上一篇文章 RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作。

1.2 示例代码

在下面的例子中,我们一共发起了五次请求,也就是subscribe中的代码,其中前四次请求都调用onError方法通知下游请求失败,同时带上了自定义的错误信息wait_shortwait_long,第五次才返回正确的数据。

当我们收到错误之后,会根据错误的类型确定重试的时间,同时,我们还保存了当前重试的次数,避免无限次的重试请求。如果需要重试,那么通过Timer操作符延时指定的时间,否则返回Observable.error(Throwable)放弃重试。

public class RetryActivity extends AppCompatActivity {

    private static final String TAG = RetryActivity.class.getSimpleName();
    private static final String MSG_WAIT_SHORT = "wait_short";
    private static final String MSG_WAIT_LONG = "wait_long";

    private static final String[] MSG_ARRAY = new String[] {
            MSG_WAIT_SHORT,
            MSG_WAIT_SHORT,
            MSG_WAIT_LONG,
            MSG_WAIT_LONG
    };

    private TextView mTvRetryWhen;
    private CompositeDisposable mCompositeDisposable;
    private int mMsgIndex;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_retry);
        mTvRetryWhen = (TextView) findViewById(R.id.tv_retry_when);
        mTvRetryWhen.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                startRetryRequest();
            }
        });
        mCompositeDisposable = new CompositeDisposable();
    }

    private void startRetryRequest() {
        Observable<String> observable = Observable.create(
new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) 
                        throws Exception {
                int msgLen = MSG_ARRAY.length;
                doWork();
                
                if (mMsgIndex < msgLen) { 
                    e.onError(new Throwable(MSG_ARRAY[mMsgIndex]));
                    mMsgIndex++;
                } else { 
                    e.onNext("Work Success");
                    e.onComplete();
                }
            }

        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {

            private int mRetryCount;

            @Override
            public ObservableSource<?> apply(Observable<Throwable> 
                           throwableObservable) throws Exception {
                return throwableObservable.flatMap(new Function<Throwable, 
                           ObservableSource<?>>() {

                    @Override
                    public ObservableSource<?> apply(Throwable throwable) 
                           throws Exception {
                        String errorMsg = throwable.getMessage();
                        long waitTime = 0;
                        switch (errorMsg) {
                            case MSG_WAIT_SHORT:
                                waitTime = 2000;
                                break;
                            case MSG_WAIT_LONG:
                                waitTime = 4000;
                                break;
                            default:
                                break;
                        }
                        Log.d(TAG, "发生错误,尝试等待时间=" + waitTime + "
                                ,当前重试次数=" + mRetryCount);
                        mRetryCount++;
                        return waitTime > 0 && mRetryCount <= 4 
                           ? Observable.timer(waitTime, TimeUnit.MILLISECONDS) : Observable.error(throwable);
                    }

                });
            }

        });
        DisposableObserver<String> disposableObserver = 
                           new DisposableObserver<String>() {

            @Override
            public void onNext(String value) {
                Log.d(TAG, "DisposableObserver onNext=" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "DisposableObserver onError=" + e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "DisposableObserver onComplete");
            }
        };
        observable.subscribeOn(Schedulers.io()).observeOn(
                           AndroidSchedulers.mainThread())
                           .subscribe(disposableObserver);
        mCompositeDisposable.add(disposableObserver);
    }

    private void doWork() {
        long workTime = (long) (Math.random() * 500) + 500;
        try {
            Log.d(TAG, "doWork start,  threadId=" 
                           + Thread.currentThread().getId());
            Thread.sleep(workTime);
            Log.d(TAG, "doWork finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上述代码的运行结果为,红框中的间隔就是每次等待重试的时间:

二、示例解析

2.1 retryWhen 介绍

retryWhen的原理图如下所示:

retryWhen提供了重订阅的功能,对于retryWhen来说,它的重订阅触发有两点要素:

  • 上游通知retryWhen本次订阅流已经完成,询问其是否需要重订阅,该询问是以onError事件触发的。
  • retryWhen根据onError的类型,决定是否需要重订阅,它通过返回一个ObservableSource<?>来通知,如果该ObservableSource返回onComplete/onError,那么不会触发重订阅;如果发送onNext,那么会触发重订阅。

实现retryWhen的关键在于如何定义它的Function参数:

  • Function的输入是一个Observable<Throwable>,输出是一个泛型ObservableSource<?>。如果我们接收Observable<Throwable>发送的消息,那么就可以得到上游发送的错误类型,并根据该类型进行响应的处理。
  • 如果输出的Observable发送了onComplete或者onError则表示不需要重订阅,结束整个流程;否则触发重订阅的操作。也就是说,它 仅仅是作为一个是否要触发重订阅的通知onNext发送的是什么数据并不重要。
  • 对于每一次订阅的数据流 Function 函数只会回调一次,并且是在onError(Throwable throwable)的时候触发,它不会收到任何的onNext事件。
  • Function函数中,必须对输入的 Observable<Object>进行处理,这里我们使用的是flatMap操作符接收上游的数据,对于flatMap的解释,大家可以参考 RxJava2 实战知识梳理(4) - 结合 Retrofit 请求新闻资讯 。

2.2 retryWhen 和 repeatWhen 对比

在 RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作 中我们已经对repeatWhen进行了介绍,让我们再来看一下它的原理图:

可以看到,retryWhen

和repeatWhen

最大的不同就是:retryWhen

是收到onError

后触发是否要重订阅的询问,而repeatWhen

是通过

onComplete触发。

2.3 根据 Throwable 的类型选择响应的重试策略

由于上游可以通过onError(Throwable throwable)中的异常通知retryWhen,那么我们就可以根据异常的类型来决定重试的策略。

就像我们在上面例子中做的那样,我们通过flatMap操作符获取到异常的类型,然后根据异常的类型选择动态地决定延迟重试的时间,再用Timer操作符实现延迟重试;当然,对于一些异常,我们可以直接选择不重试,即直接返回Observable.empty或者Observable.error(Throwable throwable)

原文发布于微信公众号 - Android先生(cyg_24kshign)

原文发表时间:2017-09-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏三流程序员的挣扎

RxJava Single Completable Maybe 使用和源码阅读

Single 用于只发射一次数据就结束了,所以无需通过 onComplete 通知观察者,要么 onSuccess 要么 onError。

24920
来自专栏开发之途

Android IPC机制(2)-AIDL

18540
来自专栏刘晓杰

retrofit的使用

370140
来自专栏difcareer的技术笔记

Android架构分析【三】mvp-cleanClean Architecture代码分析总结

乍一看这张图,就像洋葱,一层包一层,其核心理念就是向内依赖。也就是说:外层依赖内层,内层不能依赖外层,层层向内依赖。越往内,越稳定。在这个核心理念下,定义...

9920
来自专栏非著名程序员

Android 内存泄露简介、典型情景及检测解决

什么是内存泄露? Android虚拟机的垃圾回收采用的是根搜索算法。GC会从根节点(GC Roots)开始对heap进行遍历。到最后,部分没有直接或者间接引用到...

22580
来自专栏双十二技术哥

Android AsyncLayoutInflater 源码解析

我们已经学习了 Layout 相关的方方面面,本文就来学习下一个相对新颖的知识点:AsyncLayoutInflater;说它相对新颖是因为它是Android ...

18320
来自专栏向治洪

Volley解析之表单提交篇

要实现表单的提交,就要知道表单提交的数据格式是怎么样,这里我从某知名网站抓了一条数据,先来分析别人提交表单的数据格式。  数据包: Connection: ...

24250
来自专栏Fish

Android判断网络状况

啊,调bug的时候发现在没有网络的时候程序会崩,因此决定加个网络判断的。就是这个代码啦~然后到了要用的时候,new一个类对象调用这个方法就可以了。 packag...

24390
来自专栏Android开发指南

3.数据存储

32670
来自专栏学海无涯

Android开发之自动填充短信验证码

笔者发现在很多应用中,都有自动获取验证码的功能:点击获取验证码按钮,收到短信,当前应用不需要退出程序就可以获取到短信中的验证码,并自动填充。觉得这种用户体验很赞...

55380

扫码关注云+社区

领取腾讯云代金券