添加依赖
compile 'io.reactivex.rxjava2:rxjava:2.0.0-RC5'
compile 'io.reactivex.rxjava2:rxandroid:2.0.0-RC1'Observable和Observer通过subscribe()方法实现订阅关系;
Rxjava中是自动发送事件的,
一旦订阅就开始发送;

Observable,
Emitter是发射器的意思,
在subscribe方法中不断调用发射器的方法;
总共有onNext()、onComplete()、onError()三个方法;
用法参考Rxjava1:


Observer,
总共有onNext()、onComplete()、onError()、onSubscribe()四个方法;
其中,onNext()、onComplete()、onError()三个方法分别对应着第一步中Observable的onNext()、onComplete()、onError()三个方法,
只要Observable发出(调用)对应的方法,
Observer对应的方法就会被调用;onError()和onComplete是互斥的,一次只能调用一个;observable.subscribe(observer);dependencies {
implementation fileTree(dir: 'libs', include: ['*.jar'])
implementation 'com.android.support:appcompat-v7:28.0.0'
implementation 'io.reactivex.rxjava2:rxjava:2.0.0-RC5'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.0-RC1'
implementation 'com.squareup.retrofit2:retrofit:2.1.0'
implementation 'com.squareup.retrofit2:converter-gson:2.1.0'
// compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
// compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'
}activity_main.xml:
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout
xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:tools="http://schemas.android.com/tools"
android:id="@+id/activity_main"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:paddingBottom="@dimen/activity_vertical_margin"
android:paddingLeft="@dimen/activity_horizontal_margin"
android:paddingRight="@dimen/activity_horizontal_margin"
android:paddingTop="@dimen/activity_vertical_margin"
android:orientation="vertical"
tools:context="com.cniao5.cniao5rxjava2demo.MainActivity">
<Button
android:layout_height="wrap_content"
android:layout_width="match_parent"
android:text="test"
android:onClick="click"/>
<TextView
android:id="@+id/text"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_marginTop="10dp"
/>
</LinearLayout>MainActivity,java:
第一步,通过create()创建Observable(模拟对象:程序员),
通过onNext()发送数据:
public Observable<String> getObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("奏笛");
e.onNext("泡吧");
e.onComplete();
}
});
}第二步,创建Observer(模拟对象:程序员女朋友),
创建的方法是直接new:
public Observer<String> getObserver(){
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("MainActivity","onSubscribe");
}
@Override
public void onNext(String value) {
Log.d("MainActivity","onNext");
}
@Override
public void onError(Throwable e) {
Log.d("MainActivity","onError");
}
@Override
public void onComplete() {
Log.d("MainActivity","onComplete");
}
};
}第三步,在click()中,
getObservable和observer,
实现订阅:
public void click(View view) {
Observable<String> observale = getObservable();
Observer<String> observer = getObserver();
observale.subscribe(observer);
}
以上便完成了一个最基本的使用; 运行效果:

点击按钮后打印日志:

由此可以应证,
Rxjava中是自动发送事件的, 一旦Observable 被 observer 订阅了(observale.subscribe(observer);),Observable就开始发送; 由Observable通过自身ObservableOnSubscribe中的subscribe()中的onNext()等方法自动发出信息,observer接收到信息后执行对应的onNext()等方法; 在订阅之后,Observer中,onSubscribe()是每次接收数据之前必须要调用的方法;onNext()则是对应Observable调用的次数去调用相应的次数;onComplete()、onError()对应完成/异常状态时候调用;
@Override
public void onSubscribe(Disposable d) {
Log.d("MainActivity","onSubscribe");
}Observer构造方法中的onSubscribe()方法;Disposable d,Disposable是一次性的意思;
其主要有以下两个方法:

用法示例
(用于监听Observable发送的数据,
如果Observable发送的数据等于某个值,
就断绝订阅关系):

更改Observable代码:
public Observable<String> getObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("奏笛");
e.onNext("泡吧");
e.onNext("酗酒");
e.onComplete();
}
});
}运行示例,点击按钮:

可以发现已经没有onComplete()方法的打印信息了,
因为在onNext()中途已经断绝订阅关系了;
observer的简洁写法 public void click(View view) {
Observable<String> observale = getObservable();
observale.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
if(s.equals("奏笛")){
Log.d("MainActivity","收到奏笛!!这里类似于Observer的onNext()");
}
if(s.equals("吟诗")){
Log.d("MainActivity","收到吟诗!!这里类似于Observer的onNext()");
}
if(s.equals("酗酒")){
Log.d("MainActivity","收到酗酒!!这里类似于Observer的onNext()");
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d("MainActivity","这里类似于Observer的onError()");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d("MainActivity","这里类似于Observer的onComplete()");
}
});
}
public Observable<String> getObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("奏笛");
e.onNext("吟诗");
e.onNext("酗酒");
e.onComplete();
}
});
}运行示例,点击按钮:

更改上面代码,
Observable.just()创建Observable对象,效果也是一样的; 运行示例,点击按钮,打印日志同上:
public Observable<String> getObservable() {
Observable observable = Observable.just("奏笛","吟诗","酗酒");
return observable;
}
或者显示在TextView上:


Observable.fromArray()创建Observable对象,Observable observable = Observable.fromArray("奏笛","泡吧","吟诗");
其实用法跟just()是一样的;
just()源码如下,
里面最终也是调用fromArray()实现的:

Observable.fromCallable()创建Observable对象,特点:只能返回一个数据;


本节笔记Activity全文(注意io.reactivex包的引用):
package com.cniao5.cniao5rxjava2demo;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.TextView;
import java.util.concurrent.Callable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
public class MainActivity extends AppCompatActivity {
private TextView textView;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
textView = (TextView) findViewById(R.id.text);
}
public void click(View view) {
Observable<String> observale = getObservable();
// Observer<String> observer = getObserver();
//
// observale.subscribe(observer);
// observale.subscribe(new Consumer<String>() {
// @Override
// public void accept(String s) throws Exception {
// Log.d("MainActivity","accept="+s);
//
// textView.append(s);
// textView.append("//n");
// }
// });
observale.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
if(s.equals("奏笛")){
Log.d("MainActivity","收到奏笛!!这里类似于Observer的onNext()");
}
if(s.equals("吟诗")){
Log.d("MainActivity","收到吟诗!!这里类似于Observer的onNext()");
}
if(s.equals("酗酒")){
Log.d("MainActivity","收到酗酒!!这里类似于Observer的onNext()");
}
textView.append(s);
textView.append("\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d("MainActivity","这里类似于Observer的onError()");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d("MainActivity","这里类似于Observer的onComplete()");
}
});
}
public Observable<String> getObservable() {
// Observable observable = Observable.just("奏笛","吟诗","酗酒");
// Observable observable = Observable.fromArray("奏笛","泡吧","吟诗");
return Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "奏笛";
}
});
// return observable;
// return Observable.create(new ObservableOnSubscribe<String>() {
// @Override
// public void subscribe(ObservableEmitter<String> e) throws Exception {
// e.onNext("奏笛");
// e.onNext("吟诗");
// e.onNext("酗酒");
// e.onComplete();
//// e.onError(new);
//
// }
// });
}
public Observer<String> getObserver(){
return new Observer<String>() {
Disposable dd =null;//定义一个变量局部变量
@Override
public void onSubscribe(Disposable d) {
dd = d;//把这段订阅关系的Disposable变量拿下来
Log.d("MainActivity","onSubscribe");
}
@Override
public void onNext(String value) {
Log.d("MainActivity","onNext");
if(value.equals("酗酒")){
dd.dispose();//如果发送的数据等于某个值,就断绝关系
Log.d("MainActivity","你的小可爱已经不想理你了!!!");
}
}
@Override
public void onError(Throwable e) {
Log.d("MainActivity","onError");
}
@Override
public void onComplete() {
Log.d("MainActivity","onComplete");
}
};
}
}