随着Android第三库的普及,RxJava和RxAndroid 越来越被人熟知,简洁的语法,配合Java8 Lambda表达式,使代码的结构更加清晰,通过线程调度器更容易控制和切换线程,种种优点,使用它的人也越来越多。但是使用不好,很容易导致内存泄露。Rxlifecycle 就使被用来严格控制由于发布了一个订阅后,由于没有及时取消,导致Activity/Fragment无法销毁导致的内存泄露。本文飘神带你亲自了解Rxlifecycle 。
Github:https://github.com/trello/RxLifecycle 。
该项目是为了防止RxJava中subscription导致内存泄漏而诞生的,核心思想是通过监听Activity、Fragment的生命周期,来自动断开subscription以防止内存泄漏。
Rxlifecycle使用非常方便简单,如下:
build.gradle添加
//Rxlifecycle
compile 'com.trello:rxlifecycle:1.3.1'
compile 'com.trello:rxlifecycle-components:1.3.1'
//Rxjava
compile 'io.reactivex:rxjava:1.2.6'
Components
包中包含RxActivity
、RxFragment
等等,可以用Rxlifecycle提供的,也可以自定义。
官方sample源码: 两种使用方法:
1.手动设置取消订阅的时机,例子1、例子3
2.绑定生命周期,自动取消订阅,例子2
温馨提示:全文代码可以左右滑动
public class MainActivity extends RxAppCompatActivity {
//Note:Activity需要继承RxAppCompatActivity,fragment需要继承RxFragment,等等
//可以使用的组件在components包下面
private static final String TAG = "RxLifecycle";
@Overrideprotected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Log.d(TAG, "onCreate()");
setContentView(R.layout.activity_main);
// Specifically bind this until onPause()
//Note:例子1:
Observable.interval(1, TimeUnit.SECONDS)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.i(TAG, "Unsubscribing subscription from onCreate()");
}
}) /
/Note:手动设置在activity onPause的时候取消订阅
.compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
.subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.i(TAG, "Started in onCreate(), running until onPause(): " + num);
}
});
}
@Override protected void onStart() {
super.onStart();
Log.d(TAG, "onStart()");
}
Note:例子2:
// Using automatic unsubscription, this should determine that the correct time to
// unsubscribe is onStop (the opposite of onStart).
Observable.interval(1, TimeUnit.SECONDS)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.i(TAG, "Unsubscribing subscription from onStart()");
}
})
//Note:bindToLifecycle的自动取消订阅示例,因为是在onStart的时候调用,所以在onStop的时候自动取消订阅
.compose(this.<Long>bindToLifecycle())
.subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.i(TAG, "Started in onStart(), running until in onStop(): " + num);
}
});
}
@Override protected void onResume() {
super.onResume();
Log.d(TAG, "onResume()");
}
Note:例子3:
// `this.<Long>` is necessary if you're compiling on JDK7 or below.
// If you're using JDK8+, then you can safely remove it.
Observable.interval(1, TimeUnit.SECONDS)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.i(TAG, "Unsubscribing subscription from onResume()");
}
})
//Note:手动设置在activity onDestroy的时候取消订阅
.compose(this.<Long>bindUntilEvent(ActivityEvent.DESTROY))
.subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.i(TAG, "Started in onResume(), running until in onDestroy(): " + num);
}
});
}
tip-1如下代码:
Observable.just("hello world!")
.compose(this.<String>bindUntilEvent(ActivityEvent.PAUSE))
.flatMap(new Func1<String, Observable<Long>>() { @Override
public Observable<Long> call(String s) { return Observable.interval(1, TimeUnit.SECONDS);
}
})
.subscribe(new Action1<Long>() { @Override
public void call(Long aLong) {
Log.i(TAG, "....oh,oh,no!!..........." + aLong);
}
});
activity
生命周期paused
的时候
Log.i(TAG, "....oh,oh,no!!..........." + aLong);
还会执行么??会会…
如果你想全部都不执行:
Observable.just("hello world!")
.flatMap(new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String s) {
return Observable.interval(1, TimeUnit.SECONDS);
}
})
//fuck....here
.compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.i(TAG, "....oh,oh,no!!..........." + aLong);
}
});
tip-2
Observable.interval(1, TimeUnit.SECONDS)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.i(TAG, "Unsubscribing subscription ......");
}
})
.doOnNext(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.i(TAG, "........fuck..........." + aLong);
}
})
.flatMap(new Func1<Long, Observable<String>>() {
@Override
public Observable<String> call(Long aLong) {
return Observable.just(aLong + "");
}
})
.compose(this.<String>bindUntilEvent(ActivityEvent.PAUSE))
.subscribe(new Action1<String>() {
@Override
public void call(String num) {
Log.i(TAG, "..........shit..........." + num);
}
});
activity
在paused
的时候,
Log.i(TAG, "........fuck..........." + aLong); Log.i(TAG, "..........shit..........." + num);
都不会执行… 而且会unsubscribe
RxLifecycle - 当Activity被destory时自动暂停网络请求
Android开发中常会有这样一个场景:
发送网络请求 -> 2. 服务器处理请求并返回数据 -> 3. client端接收数据,绘制UI。
在前两步一般都是不会出现问题的,但是在第三步,当数据返回给client端时,如果页面已经不在了,那么就无法去绘制UI,很有可能会导致意向不到的问题。因此,为了解决这个问题,一个好的思路就是当页面离开时,自动断开网络请求数据的处理过程,即数据返回后不再进行任何处理。
要达到上面这样一个功能,我们可以思考,至少需要两部分:
随时监听Activity(Fragment)的生命周期并对外发射出去; 在我们的网络请求中,接收生命周期并进行判断,如果该生命周期是自己绑定的,如Destory,那么就断开数据向下传递的过程。
可以看到,首先有一个核心功能要实现:就是既能够监听Activity生命周期事件并对外发射,又能够接收每一个生命周期事件并作出判断。为了实现这个功能,可以联想到RxJava中的Subject,既能够发射数据,又能够接收数据。
了解Subject的读者可以跳过这部分。
如何理解Subject呢?
很容易,在RxJava
里面,Observable
是数据的发射者,它会对外发射数据,然后经过map
、flatmap
等等数据处理后,最终传递给Observer
,这个数据接收者。因此,抛开中间数据处理不管,可以看出,Observable
对外发射数据,是数据流的开端;Observer
接收数据,是数据流的末端。
那么Subject
呢?看一眼源码:
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {}
首先,它extends Observable,说明Subject具备了对外发射数据的能力,即拥有了from()
、just()
等等;另外,它又implements Observer,说明又能够处理数据,具备onNext()
、onCompleted()
等等。
然后,Subject毕竟只是一个抽象类,那么我们要如何使用它呢?
这里介绍一种最简单的:PublishSubject:
PublishSubject<Object> subject = PublishSubject.create();
// myObserver will receive "one" & "two" and onCompleted events
subject.subscribe(myObserver);
subject.onNext("one");
subject.onNext("two");
subject.onCompleted();
这里做的事情很简单,先创建一个PublishSubject
-> 绑定一个myObserver
,此时subject扮演了Observable
的角色,把数据发射给myObserver ->
然后subject处理接收了两个数据one、two -> 最终这些数据都传递给myObserver
。所以,subject扮演的角色是:
数据one、two => (Observer) subject (Observable) => myObserver
简单来说,我们把数据one、two塞给subject,然后subject又发射给了myObserver。
BaseActivity
监听生命周期
那么我们先来实现生命周期监听功能,基本思路是:在BaseActivity里创建一
个PublishSubject对象,在每个生命周期发生时,把该生命周期事件传递给PublishSubject。具体实现如下(只写部分生命周期,其他类似):
class BaseActivity {
protected final PublishSubject<ActivityLifeCycleEvent> lifecycleSubject = PublishSubject.create();
@Override
protected void onCreate(Bundle savedInstanceState) {
lifecycleSubject.onNext(ActivityLifeCycleEvent.CREATE);
...
} @Override
protected void onPause() {
lifecycleSubject.onNext(ActivityLifeCycleEvent.PAUSE);
...
} @Override
protected void onStop() {
lifecycleSubject.onNext(ActivityLifeCycleEvent.STOP);
...
}
...
}
这样,我们把所有生命周期事件都传给了lifecycleSubject
了,或者lifecycleSubject
已经接收到了并能够对外发射各种生命周期事件的能力了。
改良每一个Observable
,接收生命周期并自动断开自身
通常我们的一次网络请求长这样:
networkObservable
.subscribe(new Observer( handleUI() ));
其中,networkObservable表示一个通用的网络请求,会接收网络数据并传递给Observer去绘制UI。
现在,我们希望这个networkObservable监听Activity的DESTORY事件,一旦发生了DESTORY就自动断开Observer,即使网络数据回来了也不再传递给Observer去绘制UI。
即:
networkObservable
.compose(bindUntilEvent(ActivityLifeCycleEvent.DESTORY)) .subscribe(new Observer( handleUI() ));
因此,我们需要实现:
bindUntilEvent(ActivityLifeCycleEvent.DESTORY)
这个方法,那如何实现呢?
bindUntilEvent
我们知道lifecycleSubject
能够发射生命周期事件了,那么我们可以让networkObservable
去检查lifecycleSubject
发出的生命周期,如果和自己绑定的生命周期事件一样,那就自动停掉即可。
改装networkObservable
对于networkObservable自动停掉,我们可以利用操作符
networkObservable.takeUntil(otherObservable)
它的作用是监听otherObservable
,一旦otherObservable
对外发射了数据,就自动把networkObservable
停掉;
那otherObservable
何时对外发射数据呢?当然是lifecycleSubject
发射出的生命周期事件等于绑定的生命周期事件时,开始发射。
otherObservable = lifecycleSubject.takeFirst(new Func1<ActivityLifeCycleEvent, Boolean>() {
@Override
public Boolean call(ActivityLifeCycleEvent activityLifeCycleEvent) {
return activityLifeCycleEvent.equals(bindEvent);
}
});
其中的关键是判断activityLifeCycleEvent.equals(bindEvent);
,一旦条件满足,otherObservable
就对外发射数据,然后networkObservable
就立即自动停掉。
在BaseActivity里添加lifecycleSubject
,并把每一个生命周期事件按时传递给lifecycleSubject
.在BaseActivity里添加一个bindUntilEvent
方法:
@NonNull
@Overridepublic <T> Observable.Transformer<T, T> bindUntilEvent(
@NonNull final ActivityLifeCycleEvent event) {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> sourceObservable) {
Observable<ActivityLifeCycleEvent> compareLifecycleObservable =
lifecycleSubject.takeFirst(new Func1<ActivityLifeCycleEvent, Boolean>() {
@Override
public Boolean call(ActivityLifeCycleEvent activityLifeCycleEvent) {
return activityLifeCycleEvent.equals(event);
}
}); return sourceObservable.takeUntil(compareLifecycleObservable);
}
};
}
在任意一个网络请求 networkObservable 处改良
networkObservable
.compose(bindUntilEvent(ActivityLifeCycleEvent.DESTORY)) .subscribe(new Observer( handleUI() ));
注意:
文中提到的networkObservable
是网络请求,但实际上这不限于网络请求,任何耗时操作如文件io操作等都可以利用这个方法,来监听生命周期并自动暂停。
对于Fragment中的处理方法也是类似。下期不定时的再来对RxLifeCycle的综合原理做介绍,喜欢的朋友可以来点打赏,鼓励作者出更多好文。
文:http://blog.csdn.net/jdsjlzx/article/details/51527542/一叶飘舟
RxJava 2:
技术 - 资讯 - 感悟
END
、