前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >框架设计|自己撸一个RxJava 可好?(上)

框架设计|自己撸一个RxJava 可好?(上)

作者头像
开发者技术前线
发布2020-11-23 15:21:43
3590
发布2020-11-23 15:21:43
举报
文章被收录于专栏:开发者技术前线

RxJava 拥有繁多的 API 和复杂的逻辑链,学习复杂的知识,一般从整体再到具体,为了学习 RxJava 的原理,参考其源码,自己动手实现一个简化的 RxJava,旨在理解调用链。

阅读本文,建议先下载代码 LittleRx,毕竟在IDE里阅读代码比在网页上要清晰得多,也可以看下打印的日志

RxJava最主要的4个类:Observable、OnSubscribe、Operator、Subscriber

1、创建一个Observable,然后订阅

代码语言:javascript
复制
Observable
        .create(new OnSubscribe<Integer>() {           
           @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
            }
        })
        .subscribe(new Subscriber<Integer>() {            
            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }
        });public class Observable<T> {    
           private OnSubscribe<T> onSubscribe;   
           private Observable(OnSubscribe<T> onSubscribe) {        
           this.onSubscribe = onSubscribe;
    }    
    public final void subscribe(Subscriber<? super T> subscriber) {
         onSubscribe.call(subscriber);
    }   
     public static <T> Observable<T> create(OnSubscribe<T> onSubscribe) {        
       return new Observable<>(onSubscribe);
    }
}

这里可以看出

subscribe(subscriber)-->onSubscribe.call(subscriber),

所以没有订阅动作就不会触发 OnSubscribe.call()。

2、map 和 lift

代码语言:javascript
复制
Observable
        .create(new OnSubscribe<Integer>() {
            @Override            
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
            }
        })
        .map(new Func1<Integer, String>() {
            @Override            
            public String call(Integer integer) {                
                return "map" + integer;
            }
        })
        .subscribe(new Subscriber<String>() {
            @Override            
            public void onNext(String s) {
                System.out.println(s);
            }
        });

非链式的写法

代码语言:javascript
复制
OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = Observable.create(onSubscribe);
Func1<Integer, String> func = new Func1<>();
Observable<String> observable2 = observable.map(func);
Subscriber<String> subscriber = new Subscriber<>();
observable2.subscribe(subscriber);

create() 跟之前一样,那么 map() 做了什么

代码语言:javascript
复制
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {    
     return lift(new OperatorMap<T, R>(func));
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {    
    return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

lift() 根据上一个 observable 的 onSubscribe 创建一个新的 OnSubscribeLift 返回一个新的 observable2,上面我们说过 subscribe(subscriber)-->onSubscribe.call(subscriber),所以我们接着看 OnSubscribeLift

代码语言:javascript
复制
public class OnSubscribeLift<T, R> implements OnSubscribe<R> {    
   final OnSubscribe<T> parent;    
   final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {        
      this.parent = parent;        
      this.operator = operator;
    }    
    // 先不用关心具体实现,下面讲到再看
      @Override
      public void call(Subscriber<? super R> r) {
           Subscriber<? super T> st = operator.call(r); 
          // 这个 operator 就是 OperatorMap
          parent.call(st); 
        // parent 就是第一个 observable 的 onSubscribe
    }
}

再看下 OperatorMap

代码语言:javascript
复制
public final class OperatorMap<T, R> implements Operator<R, T> {    
    final Func1<? super T, ? extends R> transformer;    
    public OperatorMap(Func1<? super T, ? extends R> transformer) {        t
         his.transformer = transformer;
    }    
    // 先不用关心具体实现,下面讲到再看
    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {        
         return new MapSubscriber<T, R>(o, transformer);
    }    
    private class MapSubscriber<T, R> extends Subscriber<T> {        
    private Subscriber<? super R> actual;        
    private Func1<? super T, ? extends R> transformer;        
    public MapSubscriber(Subscriber<? super R> o, Func1<? super T, ? extends R> transformer) {            
            this.actual = o;            
            this.transformer = transformer;
        }        
        // 先不用关心具体实现,下面讲到再看
        @Override
        public void onNext(T t) {
            R r = transformer.call(t);
            actual.onNext(r);
        }
    }
}

我们把 map() 和 lift() 都去掉,使用最基本的类来实现

代码语言:javascript
复制
OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = new Observable<>(onSubscribe);
Func1<Integer, String> func = new Func1<>();

OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);

OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorMap);

Observable<String> observable2 = new Observable<>(onSubscribe2);

Subscriber<String> subscriber = new Subscriber<>();
observable2.subscribe(subscriber);

到这里,清楚了如何把第一个 Observable<Integer> 转成 Observable<String>,包括 OnSubscribe<Integer> onSubscribe 和 OnSubscribeLift<Integer, String> onSubscribe2 的关系

那么最终的 subscribe() 如何调用到第一个 observable.call(Subscriber<Integer>) 里面的 Subscriber<Integer>.onNext(Integer) 又如何调用到最终的订阅者 subscriber<String>().onNext(String)

1) observable2.subscribe(subscriber) --> 2) onSubscribe2.call(subscriber) 即 OnSubscribeLift.call(subscriber) --> 3) Subscriber<Integer> st = operatorMap.call(subscriber) 即 4) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func) 5) parent.call(st) 即 onSubscribe.call(st) --> 6) st.onNext(1) 即 MapSubscriber.onNext(1) --> 7) String string = func.call(1) 8) subscriber.onNext(string)

至此 Observable.create().map().subscribe() 的调用链就分析完了 很多操作符本质都是 lift(),以此类推,lift() 2次

3、subscribeOn

Scheduler 内部比较繁杂,我们简化下,把 subscribeOn(Scheduler) 简化成 subscribeOnIO()

代码语言:javascript
复制
Observable
        .create(new OnSubscribe<Integer>() {
            @Override            
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
            }
        })
        .subscribeOnIO()
        .subscribe(new Subscriber<Integer>() {
            @Override            
            public void onNext(Integer integer) {
                System.out.println(integer);
            }
        });

如何实现 subscribeOnIO() 让第一个 observable 的 onSubscribe 运行在子线程

代码语言:javascript
复制
public final Observable<T> subscribeOnIO() {    
    return create(new OnSubscribeOnIO<T>(this));
}
public final class OnSubscribeOnIO<T> implements OnSubscribe<T> {    
    private static ExecutorService threadPool = Executors.newCachedThreadPool();    
    final Observable<T> source;    
    public OnSubscribeOnIO(Observable<T> source) {        
         this.source = source;
    }    
    @Override
    public void call(final Subscriber<? super T> subscriber) {
        Runnable runnable = new Runnable() {            
             @Override
            public void run() {
                source.subscribe(subscriber);
                 // --> onSubscribe.call(subscriber) --> subscriber.onNext(1)
            }
        };
        threadPool.submit(runnable);
    }
}

从上面看出 subscribeOnIO() 新建了一个线程并加入 CachedThreadPool,在子线程里订阅上一个 Observable,后续的调用都在这个线程里完成

再考虑下复杂点的,加入 map()

代码语言:javascript
复制
Observable
        .create(new OnSubscribe<Integer>() {
            @Override            
            public void call(Subscriber<? super Integer> subscriber) {
                System.out.println(Thread.currentThread());
                subscriber.onNext(1);
            }
        })
        .map(new Func1<Integer, String>() {
            @Override            
            public String call(Integer integer) {
                System.out.println(Thread.currentThread());                
                return "map" + integer;
            }
        })
        .subscribeOnIO()
        .subscribe(new Subscriber<String>() {
            @Override            
            public void onNext(String s) {
                System.out.println(Thread.currentThread());
                System.out.println(s);
            }
        });

非链式的写法

代码语言:javascript
复制
OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = new Observable<>(onSubscribe);
Func1<Integer, String> func = new Func1<>();
OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorMap);
Observable<String> observable2 = new Observable<>(onSubscribe2);

OnSubscribeOnIO<String> onSubscribe3 = new OnSubscribeOnIO(observable2);
Observable<String> observable3 = new Observable<>(onSubscribe3);

Subscriber<String> subscriber = new Subscriber<>();
observable3.subscribe(subscriber);

1) observable3.subscribe(subscriber) --> 2) onSubscribe3.call(subscriber) 即 OnSubscribeOnIO.call(subscriber) --> 3) 子线程 new Runnable(){} --> observable2.subscribe(subscriber) 4) onSubscribe2.call(subscriber) 即 OnSubscribeLift.call(subscriber) --> 5) Subscriber<Integer> st = operatorMap.call(subscriber) 即 6) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func) 7) parent.call(st) 即 onSubscribe.call(st) --> 8) st.onNext(1) 即 MapSubscriber.onNext(1) --> 9) String string = func.call(1) 10) subscriber.onNext(string)

那要是把 map() 与 subscribeOnIO() 换下位置呢

代码语言:javascript
复制
OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = new Observable<>(onSubscribe);

OnSubscribeOnIO<Integer> onSubscribe2 = new OnSubscribeOnIO(observable);
Observable<Integer> observable2 = new Observable<>(onSubscribe2);

Func1<Integer, String> func = new Func1<>();
OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
OnSubscribeLift<Integer, String> onSubscribe3 = new OnSubscribeLift<>(onSubscrib2, operatorMap);
Observable<String> observable3 = new Observable<>(onSubscribe3);
Subscriber<String> subscriber = new Subscriber<>();
observable3.subscribe(subscriber);

1) observable3.subscribe(subscriber) --> 2) onSubscribe3.call(subscriber) 即 OnSubscribeLift.call(subscriber) --> 3) Subscriber<Integer> st = operatorMap.call(subscriber) 即 4) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func) 5) parent.call(st) 即 onSubscribe2.call(st) 即 OnSubscribeOnIO.call(st) --> 6) 子线程 new Runnable(){} --> observable.subscribe(st) --> 7) onSubscribe.call(st) --> 7) st.onNext(1) 即 MapSubscriber.onNext(1) --> 8) String string = func.call(1) 9) subscriber.onNext(string)

看得出来,不管 subscribeOnIO() 在哪,第一个 onSubscribe.call() 总是运行在子线程

4、observeOn

先看下 demo 最终写法

代码语言:javascript
复制
Handler handler = new Handler();

Observable
        .create(new OnSubscribe<Integer>() {
            @Override            
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
            }
        })
        .observeOn(handler)
        .map(new Func1<Integer, String>() {
            @Override            
            public String call(Integer integer) {                
               return "map" + integer;
            }
        })
        .subscribeOnIO()
        .subscribe(new Subscriber<String>() {
            @Override            
            public void onNext(String s) {
                System.out.println(s);
            }
        });

handler.loop(); 
//队列没有消息时会挂起当前线程,直到收到新的消息

同样我们也自己实现一个简单的可以切换回主线程的 observeOn(Handler)

代码语言:javascript
复制
public class Observable<T> {
    ...    public final Observable<T> observeOn(Handler handler) {        return lift(new OperatorObserveOn<T>(handler));
    }
}

OperatorObserveOn

代码语言:javascript
复制
public final class OperatorObserveOn<T> implements Operator<T, T> {    
   private Handler handler;    
   public OperatorObserveOn(Handler handler) {        
       this.handler = handler;
    }    
    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
        Subscriber<T> s = new Subscriber<T>() {       
         @Override
         public void onNext(final T t) {                
            handler.post(new Runnable() {                    
         @Override
         public void run() {
             subscriber.onNext(t);
                    }
                });
            }
        };        
        return s;
    }
}

自定义Handler

代码语言:javascript
复制
public class Handler {    
    private ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10);    
    public void loop() {        
    for (; ; ) {
            Runnable runnable;            
            try {
                runnable = queue.take();
                // 没有数据则一直阻塞,直到有数据自动唤醒
            } catch (InterruptedException e) {                         
                return;
            }            
            if (runnable == null) {                
                 return;
            }
            runnable.run();
        }
    }    
     public void post(Runnable runnable) {       
      try {           
         queue.put(runnable);// 没有空间则一直阻塞,直到有空间
        } catch (InterruptedException e) {                       
           return;
        }
    }
}

非链式写法

代码语言:javascript
复制
OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = new Observable<>(onSubscribe);

OperatorObserveOn<Integer> operatorObserveOn = new OperatorObserveOn(handler);
OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorObserveOn);

Func1<Integer, String> func = new Func1<>();
OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
OnSubscribeLift<Integer, String> onSubscribe3 = new OnSubscribeLift<>(onSubscribe2, operatorMap);
Observable<String> observable2 = new Observable<>(onSubscribe3);
OnSubscribeOnIO<String> onSubscribe4 = new OnSubscribeOnIO(observable2);
Observable<String> observable3 = new Observable<>(onSubscribe4);
Subscriber<String> subscriber = new Subscriber<>();
observable3.subscribe(subscriber);

1) observable3.subscribe(subscriber) --> 2) onSubscribe4.call(subscriber) 即 OnSubscribeOnIO.call(subscriber) --> 3) 子线程 new Runnable(){} --> observable2.subscribe(subscriber) 4) onSubscribe3.call(subscriber) 即 OnSubscribeLift.call(subscriber) --> 5) Subscriber<Integer> st = operatorMap.call(subscriber) 即 6) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func) 7) parent.call(st) 即 onSubscribe2.call(st) 即 OnSubscribeLift.call(st)--> 8) Subscriber<Integer> st2 = operatorObserveOn.call(st) --> 9) parent.call(st2) 即 onSubscribe.call(st2) --> 8) st2.onNext(1) --> // onNext()里面切换到Handler所在线程 9) st.onNext(1) --> 9) String string = func.call(1) 10) subscriber.onNext(string)

5 其他

总的来说,调用链确实有点复杂,不过也还是可以接受的,一个调用链花点时间想想还是能清楚,只是每碰到一个调用链都要花点时间才能想清楚,还没能力能在几秒内就能想清楚,只能是多想多锻炼了。比如想想上面的,如果把 observeOn(handler) 放在 map() 后面呢?

作者丨风风风筝

http://www.jianshu.com/u/27ed73415f71

☞ 持续关注,[撸框架系列]

撸RxBus和Okhttp,Retrofit看推荐!

精彩推荐

当EventBus遇上自撸RxBus的时候?

自己动手轻松撸一个OkHttp框架

抛开理论,从实践中剖析Retrofit原理

LeakCanary- 如何检测 Activity 是否泄漏

技术 - 资讯 - 感悟

END

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-07-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发者技术前线 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、创建一个Observable,然后订阅
  • 2、map 和 lift
  • 3、subscribeOn
  • 4、observeOn
  • 5 其他
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档