前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Android RxJava+Retrofit完美封装(缓存,请求,生命周期管理)

Android RxJava+Retrofit完美封装(缓存,请求,生命周期管理)

原创
作者头像
java爱好者
修改2019-06-28 19:03:11
3K0
修改2019-06-28 19:03:11
举报
文章被收录于专栏:java架构1+1java架构1+1

前言

Retrofit 和RxJava已经出来很久了,很多前辈写了很多不错的文章,在此不得不感谢这些前辈无私奉献的开源精神,能让我们站在巨人的肩膀上望得更远。对于 RxJava 不是很了解的同学推荐你们看扔物线大神的这篇文章给 Android 开发者的 RxJava 详解一遍看不懂就看第二遍。Retrofit的使用可以 加QQ群:668041364

本文内容是基于Retrofit + RxJava做的一些巧妙的封装。参考了很多文章加入了一些自己的理解,请多指教。源码地址:加QQ群架构师交流群668041364

先放出build.gradle

 compile 'io.reactivex:rxjava:1.1.0'
 compile 'io.reactivex:rxandroid:1.1.0'
 compile 'com.squareup.retrofit2:retrofit:2.0.0-beta4'
 compile 'com.squareup.retrofit2:converter-gson:2.0.0-beta4'
 compile 'com.squareup.retrofit2:adapter-rxjava:2.0.0-beta4'

本文是基于RxJava1.1.0和Retrofit 2.0.0-beta4来进行的。

初始化 Retrofit

新建类Api,此类就是初始化Retrofit,提供一个静态方法初始化Retrofit非常简单.

 private static ApiService SERVICE;
 /**
 * 请求超时时间
 */
 private static final int DEFAULT_TIMEOUT = 10000;
 public static ApiService getDefault() {
 if (SERVICE == null) {
 //手动创建一个OkHttpClient并设置超时时间
 OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
 httpClientBuilder.connectTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
 /**
 * 对所有请求添加请求头
 */
 httpClientBuilder.addInterceptor(new Interceptor() {
 @Override
 public okhttp3.Response intercept(Chain chain) throws IOException {
 Request request = chain.request();
 okhttp3.Response originalResponse = chain.proceed(request);
 return originalResponse.newBuilder().header("key1", "value1").addHeader("key2", "value2").build();
 }
 });
 SERVICE = new Retrofit.Builder()
 .client(httpClientBuilder.build())
 .addConverterFactory(GsonConverterFactory.create())
 .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
 .baseUrl(Url.BASE_URL)
 .build().create(ApiService.class);
 }
 return SERVICE;
 }

提供一个静态方法初始化Retrofit,手动创建了OkHttpClient设置了请求的超时时间。并在OkHttp的拦截器中增加了请求头。注意这里是为所有的请求添加了请求头,你可以单独的给请求增加请求头,例如

 @Headers("apikey:b86c2269fe6588bbe3b41924bb2f2da2")
 @GET("/student/login")
 Observable<HttpResult> login(@Query("phone") String phone, @Query("password") String psw);

和Retrofit初始化不同的地方就在我们添加了这两句话

.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())

service的定义也从这样

@GET("/student/login")
Call<HttpResult> getTopMovie(@Query("start") int start, @Query("count") int count);

变成了

@GET("/student/login")
Observable<HttpResult> login(@Query("phone") String phone, @Query("password") String psw);

返回值变成了Observable,这个Observable不就是RxJava的可观察者(即被观察者)么。

封装服务器请求以及返回数据

用户在使用任何一个网络框架都只关系请求的返回和错误信息,所以对请求的返回和请求要做一个细致的封装。

我们一般请求的返回都是像下面这样

{
 "code":"200",
 "message":"Return Successd!",
 "data":{
 "name":"张三"
 "age":3
 }
}

如果你们的服务器返回不是这样的格式那你就只有坐下来请他喝茶,跟他好好说(把他头摁进显示器)了。大不了就献出你的菊花吧!

对于这样的数据我们肯定要对code做出一些判断,不同的code对应不同的错误信息。所以我们新建一个HttpResult类,对应上面的数据结构。

public class HttpResult<T> {
 private int code;
 private String message;
 private T data;
 public T getData() {
 return data;
 }
 public void setData(T data) {
 this.data = data;
 }
 public int getCode() {
 return code;
 }
 public void setCode(int code) {
 this.code = code;
 }
 public String getMessage() {
 return message;
 }
 public void setMessage(String message) {
 this.message = message;
 }
 
}

这算是所有实体的一个基类,data可以为任何数据类型。

我们要对所以返回结果进行预处理,新建一个RxHelper,预处理无非就是对code进行判断和解析,不同的错误返回不同的错误信息,这还不简单。Rxjava的map操作符不是轻松解决

Api.getDefault().login("name","psw")

.map(new HttpResultFunc<UserEntity>());

.subscribeOn(Schedulers.io())

.unsubscribeOn(Schedulers.io())

.subscribeOn(AndroidSchedulers.mainThread())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(subscriber);

private class HttpResultFunc<T> implements Func1<HttpResult<T>, T> {

@Override

public T call(HttpResult<T> httpResult) {

Log.e("error", httpResult.getData().toString() + "");

if (httpResult.getCode() != 0) {

throw new ApiException(httpResult.getCode());

}

return httpResult.getData();

}

}

哟,这不是轻松愉快 so seay么!对code进行了判断,code为0就做对应更新UI或者其他后续操作,不等于0就抛出异常,在ApiException中队code做处理,根据message字段进行提示用户

 private static String getApiExceptionMessage(int code){
 switch (code) {
 case USER_NOT_EXIST:
 message = "该用户不存在";
 break;
 case WRONG_PASSWORD:
 message = "密码错误";
 break;
 default:
 message = "未知错误";
 }
 return message;
 }

撒花!!!

然而。。。RxJava永远比你想象的强大。RxJava中那么多操作符看到我身体不适,有个操作符compose。因为我们在每一个请求中都会处理code以及一些重用一些操作符,比如用observeOn和subscribeOn来切换线程。RxJava提供了一种解决方案:Transformer(转换器),一般情况下就是通过使用操作符Observable.compose()来实现。具体可以参考避免打断链式结构:使用.compose( )操作符

新建一个RxHelper对结果进行预处理,代码

public class RxHelper {
 /**
 * 对结果进行预处理
 *
 * @param <T>
 * @return
 */
 public static <T> Observable.Transformer<HttpResult<T>, T> handleResult() {
 return new Observable.Transformer<HttpResult<T>, T>() {
 @Override
 public Observable<T> call(Observable<HttpResult<T>> tObservable) {
 return tObservable.flatMap(new Func1<HttpResult<T>, Observable<T>>() {
 @Override
 public Observable<T> call(HttpResult<T> result) {
 LogUtils.e(result.getCode()+"");
 if (result.getCode() == 0) {
 return createData(result.getData());
 } else {
 return Observable.error(new ApiException(result.getCode()));
 }
 }
 }).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread());
 }
 };
 }
 /**
 * 创建成功的数据
 *
 * @param data
 * @param <T>
 * @return
 */
 private static <T> Observable<T> createData(final T data) {
 return Observable.create(new Observable.OnSubscribe<T>() {
 @Override
 public void call(Subscriber<? super T> subscriber) {
 try {
 subscriber.onNext(data);
 subscriber.onCompleted();
 } catch (Exception e) {
 subscriber.onError(e);
 }
 }
 });
 }
}

Transformer实际上就是一个Func1<Observable<T>, Observable<R>>,换言之就是:可以通过它将一种类型的Observable转换成另一种类型的Observable,和调用一系列的内联操作符是一模一样的。这里我们首先使用flatMap操作符把Obserable<HttpResult<T>>,转换成为Observable<T>在内部对code进行了预处理。如果成功则把结果Observable<T>发射给订阅者。反之则把code交给ApiException并返回一个异常,ApiException中我们对code进行相应的处理并返回对应的错误信息

public class ApiException extends RuntimeException{
 public static final int USER_NOT_EXIST = 100;
 public static final int WRONG_PASSWORD = 101;
 private static String message;
 public ApiException(int resultCode) {
 this(getApiExceptionMessage(resultCode));
 }
 public ApiException(String detailMessage) {
 super(detailMessage);
 }
 @Override
 public String getMessage() {
 return message;
 }
 /**
 * 由于服务器传递过来的错误信息直接给用户看的话,用户未必能够理解
 * 需要根据错误码对错误信息进行一个转换,在显示给用户
 * @param code
 * @return
 */
 private static String getApiExceptionMessage(int code){
 switch (code) {
 case USER_NOT_EXIST:
 message = "该用户不存在";
 break;
 case WRONG_PASSWORD:
 message = "密码错误";
 break;
 default:
 message = "未知错误";
 }
 return message;
 }
}

最后调用了频繁使用的subscribeOn()和observeOn()以及unsubscribeOn()。

处理ProgressDialog

在Rxjava中我们什么时候来显示Dialog呢。一开始觉得是放在Subscriber<T>的onStart中。onStart可以用作流程开始前的初始化。然而 onStart()由于在 subscribe()发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe()被调用时的线程。所以onStart并不能保证永远在主线程运行。

怎么办呢?

千万不要小看了RxJava,与 onStart()相对应的有一个方法 doOnSubscribe(),它和 onStart()同样是在subscribe()调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe()执行在 subscribe()发生的线程;而如果在 doOnSubscribe()之后有 subscribeOn()的话,它将执行在离它最近的subscribeOn()所指定的线程。可以看到在RxHelper中看到我们调用了两次subscribeOn,最后一个调用也就是离doOnSubscribe()最近的一次subscribeOn是指定的AndroidSchedulers.mainThread()也就是主线程。这样我们就就能保证它永远都在主线运行了。这里不得不感概RxJava的强大。

这里我们自定义一个类ProgressSubscriber继承Subscriber<T>

public abstract class ProgressSubscriber<T> extends Subscriber<T> implements ProgressCancelListener{
 private SimpleLoadDialog dialogHandler;
 public ProgressSubscriber(Context context) {
 dialogHandler = new SimpleLoadDialog(context,this,true);
 }
 @Override
 public void onCompleted() {
 dismissProgressDialog();
 }
 /**
 * 显示Dialog
 */
 public void showProgressDialog(){
 if (dialogHandler != null) {
 dialogHandler.obtainMessage(SimpleLoadDialog.SHOW_PROGRESS_DIALOG).sendToTarget();
 }
 }
 @Override
 public void onNext(T t) {
 _onNext(t);
 }
 /**
 * 隐藏Dialog
 */
 private void dismissProgressDialog(){
 if (dialogHandler != null) {
 dialogHandler.obtainMessage(SimpleLoadDialog.DISMISS_PROGRESS_DIALOG).sendToTarget();
 dialogHandler=null;
 }
 }
 @Override
 public void onError(Throwable e) {
 e.printStackTrace();
 if (false) { //这里自行替换判断网络的代码
 _onError("网络不可用");
 } else if (e instanceof ApiException) {
 _onError(e.getMessage());
 } else {
 _onError("请求失败,请稍后再试...");
 }
 dismissProgressDialog();
 }
 @Override
 public void onCancelProgress() {
 if (!this.isUnsubscribed()) {
 this.unsubscribe();
 }
 }
 protected abstract void _onNext(T t);
 protected abstract void _onError(String message);
}

初始化ProgressSubscriber新建了一个我们自己定义的ProgressDialog并且传入一个自定义接口ProgressCancelListener。此接口是在SimpleLoadDialog消失onCancel的时候回调的。用于终止网络请求。

 load.setOnCancelListener(new DialogInterface.OnCancelListener() {
 @Override
 public void onCancel(DialogInterface dialog) {
 mProgressCancelListener.onCancelProgress();
 }
 });

ProgressSubscriber其他就很简单了,在onCompleted()和onError()的时候取消Dialog。需要的时候调用showProgressDialog即可。

处理数据缓存

服务器返回的数据我们肯定要做缓存,所以我们需要一个RetrofitCache类来做缓存处理。

public class RetrofitCache {

/**

* @param cacheKey 缓存的Key

* @param fromNetwork

* @param isSave 是否缓存

* @param forceRefresh 是否强制刷新

* @param <T>

* @return

*/

public static <T> Observable<T> load(final String cacheKey,

Observable<T> fromNetwork,

boolean isSave, boolean forceRefresh) {

Observable<T> fromCache = Observable.create(new Observable.OnSubscribe<T>() {

@Override

public void call(Subscriber<? super T> subscriber) {

T cache = (T) Hawk.get(cacheKey);

if (cache != null) {

subscriber.onNext(cache);

} else {

subscriber.onCompleted();

}

}

}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

//是否缓存

if (isSave) {

/**

* 这里的fromNetwork 不需要指定Schedule,在handleRequest中已经变换了

*/

fromNetwork = fromNetwork.map(new Func1<T, T>() {

@Override

public T call(T result) {

Hawk.put(cacheKey, result);

return result;

}

});

}

//强制刷新

if (forceRefresh) {

return fromNetwork;

} else {

return Observable.concat(fromCache, fromNetwork).first();

}

}

}

几个参数注释上面已经写得很清楚了,不需要过多的解释。这里我们先取了一个Observable<T>对象fromCache,里面的操作很简单,去缓存里面找个key对应的缓存,如果有就发射数据。在fromNetwork里面做的操作仅仅是缓存数据这一操作。最后判断如果强制刷新就直接返回fromNetwork反之用Observable.concat()做一个合并。concat操作符将多个Observable结合成一个Observable并发射数据。这里又用了first()。fromCache和fromNetwork任何一步一旦发射数据后面的操作都不执行。

最后我们新建一个HttpUtil用来返回用户关心的数据,缓存,显示Dialog在这里面进行。

public class HttpUtil{
 /**
 * 构造方法私有
 */
 private HttpUtil() {
 }
 /**
 * 在访问HttpUtil时创建单例
 */
 private static class SingletonHolder {
 private static final HttpUtil INSTANCE = new HttpUtil();
 }
 /**
 * 获取单例
 */
 public static HttpUtil getInstance() {
 return SingletonHolder.INSTANCE;
 }
 //添加线程管理并订阅
 public void toSubscribe(Observable ob, final ProgressSubscriber subscriber,String cacheKey,boolean isSave, boolean forceRefresh) {
 //数据预处理
 Observable.Transformer<HttpResult<Object>, Object> result = RxHelper.handleResult();
 //重用操作符
 Observable observable = ob.compose(result)
 .doOnSubscribe(new Action0() {
 @Override
 public void call() {
 //显示Dialog和一些其他操作
 subscriber.showProgressDialog();
 }
 });
 //缓存
 RetrofitCache.load(cacheKey,observable,isSave,forceRefresh).subscribe(subscriber);
 }

Activity生命周期管理

基本的网络请求都是向服务器请求数据,客户端拿到数据后更新UI。但也不排除意外情况,比如请求回数据途中Activity已经不在了,这个时候就应该取消网络请求。

要实现上面的功能其实很简单,两部分

  • 随时监听Activity(Fragment)的生命周期并对外发射出去; 在我们的网络请求中,接收生命周期
  • 并进行判断,如果该生命周期是自己绑定的,如Destory,那么就断开数据向下传递的过程

实现以上功能需要用到Rxjava的Subject的子类PublishSubject

在你的BaseActivity中添加如下代码

public class BaseActivity extends AppCompatActivity {
 public final PublishSubject<ActivityLifeCycleEvent> lifecycleSubject = PublishSubject.create();
 @Override
 protected void onCreate(Bundle savedInstanceState) {
 lifecycleSubject.onNext(ActivityLifeCycleEvent.CREATE);
 super.onCreate(savedInstanceState);
 }
 @Override
 protected void onPause() {
 lifecycleSubject.onNext(ActivityLifeCycleEvent.PAUSE);
 super.onPause();
 }
 @Override
 protected void onStop() {
 lifecycleSubject.onNext(ActivityLifeCycleEvent.STOP);
 super.onStop();
 }
 @Override
 protected void onDestroy() {
 super.onDestroy();
 lifecycleSubject.onNext(ActivityLifeCycleEvent.DESTROY);
 }

这样的话,我们把所有生命周期事件都传给了PublishSubject了,或者说PublishSubject已经接收到了并能够对外发射各种生命周期事件的能力了。

现在我们要让网络请求的时候去监听这个PublishSubject,在收到相应的生命周期后取消网络请求,这又用到了我们神奇的compose(),我们需要修改handleResult代码如下

public static <T> Observable.Transformer<HttpResult<T>, T> handleResult(final ActivityLifeCycleEvent event,final PublishSubject<ActivityLifeCycleEvent> lifecycleSubject) {

return new Observable.Transformer<HttpResult<T>, T>() {

@Override

public Observable<T> call(Observable<HttpResult<T>> tObservable) {

Observable<ActivityLifeCycleEvent> compareLifecycleObservable =

lifecycleSubject.takeFirst(new Func1<ActivityLifeCycleEvent, Boolean>() {

@Override

public Boolean call(ActivityLifeCycleEvent activityLifeCycleEvent) {

return activityLifeCycleEvent.equals(event);

}

});

return tObservable.flatMap(new Func1<HttpResult<T>, Observable<T>>() {

@Override

public Observable<T> call(HttpResult<T> result) {

if (result.getCount() != 0) {

return createData(result.getSubjects());

} else {

return Observable.error(new ApiException(result.getCount()));

}

}

}) .takeUntil(compareLifecycleObservable).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread());

}

};

}

调用的时候增加了两个参数一个是ActivityLifeCycleEvent 其实就是一些枚举表示Activity的生命周期

public enum ActivityLifeCycleEvent {
 CREATE,
 START,
 RESUME,
 PAUSE,
 STOP,
 DESTROY
}

另外一个参数就是我们在BaseActivity添加的PublishSubject,这里用到了takeUntil()它的作用是监听我们创建的compareLifecycleObservable,compareLifecycleObservable中就是判断了如果当前生命周期和Activity一样就发射数据,一旦compareLifecycleObservable 对外发射了数据,就自动把当前的Observable(也就是网络请求的Observable)停掉。

当然有个库是专门针对这种情况的,叫RxLifecycle,不过要继承他自己的RxActivity,当然这个库不只是针对网络请求,其他所有的Rxjava都可以。有需要的可以去看看。

最后新建一个ApiService存放我们的请求

public interface ApiService {
 @GET("/student/mobileRegister")
 Observable<HttpResult<UserEntity>> login(@Query("phone") String phone, @Query("password") String psw);
}

使用

使用起来就超级简单了

/**

*

*

// ┏┓   ┏┓

//┏┛┻━━━┛┻┓

//┃       ┃

//┃   ━   ┃

//┃ ┳┛ ┗┳ ┃

//┃       ┃

//┃   ┻   ┃

//┃       ┃

//┗━┓   ┏━┛

// ┃   ┃ 神兽保佑

// ┃   ┃ 阿弥陀佛

// ┃   ┗━━━┓

// ┃       ┣┓

// ┃       ┏┛

// ┗┓┓┏━┳┓┏┛

// ┃┫┫ ┃┫┫

// ┗┻┛ ┗┻┛

//

*/

//获取豆瓣电影TOP 100

Observable ob = Api.getDefault().getTopMovie(0, 100);

HttpUtil.getInstance().toSubscribe(ob, new ProgressSubscriber<List<Subject>>(this) {

@Override

protected void _onError(String message) {

Toast.makeText(MainActivity.this, message, Toast.LENGTH_LONG).show();

}

@Override

protected void _onNext(List<Subject> list) {

}

}, "cacheKey", ActivityLifeCycleEvent.PAUSE, lifecycleSubject, false, false);

具体很多东西都可以在使用的时候具体修改,比如缓存我用的Hawk。Dialog是我自己定义的一个SimpleLoadDialog。源码已经给出请多指教!

-------------更新--------------

评论区有人提出对于Activity生命周期的管理,个人疏忽大意,特地来加上。

END!

Thanks

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档