前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >框架设计|自己撸一个RxJava 可好?(下)

框架设计|自己撸一个RxJava 可好?(下)

作者头像
开发者技术前线
发布2020-11-23 15:22:20
3490
发布2020-11-23 15:22:20
举报

RxJava所谓博大精深,假如一天学习一个操作符,至少也要学习一个半月,如果理解原理,至少6个月不过分!

为了学习 RxJava 的原理,参考其源码,自己动手实现一个简化的 RxJava,代码 LittleRx(https://github.com/iceGeneral/LittleRx)

本文接上一篇 框架设计|自己撸一个RxJava 可好?(上)

上一篇主要实现了操作符 create、map、lift、subscribeOnIO、observeOn,后来我又在代码补充了 flatMap、just、from、merge,在第一篇文章的基础上,后来补充的这几个操作符就很容易理解了,所以这里就不介绍了 本文讲参考 RxJava 源码,如何实现 zip

Demo

代码语言:javascript
复制
Observable<Integer> o1 = Observable.create(new OnSubscribe<Integer>() {
    @Override    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(8);
        subscriber.onNext(9);
    }
});
Observable<String> o2 = Observable.create(new OnSubscribe<String>() {
    @Override    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("A");
        subscriber.onNext("B");
        subscriber.onNext("C");
    }
});
Observable.zip(o1, o2, new Func2<Integer, String, String>() {
    @Override    
    public String call(Integer integer, String s) {        return integer + s;
    }
}).subscribe(new Subscriber<String>() {
    @Override    public void onNext(String s) {
        System.out.println(s);
    }
});

很显然,我们希望看到的结果是:8A 和 9B,而 C 是被忽略的

Observable

代码语言:javascript
复制
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, 
final Func2<? super T1, ? super T2, ? extends R> zipFunction) {    
    return just(new Observable<?>[]{o1, o2}).lift(new OperatorZip<R>(zipFunction));
}
public static <T> Observable<T> just(final T value) {    return create(new OnSubscribe<T>() {        
     @Override
     public void call(Subscriber<? super T> subscriber) {
            subscriber.onNext(value);
        }
    });
}

这里我们看看非链式的写法

代码语言:javascript
复制
final Observable[] observables = new Observable[]{o1, o2}; 
OnSubscribe onSubscribe = new OnSubscribe() {
    @Override    
    public void call(Subscriber subscriber) {
        subscriber.onNext(observables);
    }
}

Observable o3 = new Observable(onSubscribe);

Func2 func = new Func2()... // integer + sOperatorZip operatorZip = new OperatorZip(func);// o3.lift() 即OnSubscribeLift onSubscribeLift = new OnSubscribeLift(onSubscribe, operatorZip);

Observable zipObservable = new Observable(onSubscribeLift);

这样就初始化完了,接下来就是看 subscribe(subscriber) 调用过程 1) zipObservable.subscribe(subscriber) --> 2) onSubscribeLift.call(subscriber) --> 3) Subscriber subscriber2= operatorZip.call(subscriber) --> // 重点就是这一步 4) onSubscribe.call(subscriber2) 5) subscriber2.onNext(observables)

先看 operatorZip.call(subscriber),然后再看 subscriber2.onNext(observables)

代码语言:javascript
复制
public final class OperatorZip<R> implements Operator<R, Observable<?>[]> {    
   final FuncN<? extends R> zipFunction;

    public OperatorZip(Func2 f) {
        zipFunction = FuncN.fromFunc(f); // 并不重要,见文末
    }    
    @Override
    public Subscriber<? super Observable<?>[]> call(Subscriber<? super R> child) {        
     final Zip<R> zipper = new Zip<R>(child, zipFunction);        
     final ZipSubscriber subscriber = new ZipSubscriber(child, zipper);        
         return subscriber;
    }    
    private final class ZipSubscriber extends Subscriber<Observable[]> {        
     final Subscriber<? super R> child;        
     final Zip<R> zipper;

        public ZipSubscriber(Subscriber<? super R> child, Zip<R> zipper) {           
         this.child = child;            
         this.zipper = zipper;
        }        
        @Override
        public void onNext(Observable[] os) {
            zipper.start(os);
        }
    }

}

截止到现在,最后一步是 zipper.start(observables),继续看 zipper 是何方神圣

代码语言:javascript
复制
static final class Zip<R> {    
   final Subscriber<? super R> child;    
   private final FuncN<? extends R> zipFunction;    
   private Subscriber[] subscribers;    
   public Zip(final Subscriber<? super R> child, FuncN<? extends R> zipFunction) {        
     this.child = child;        
     this.zipFunction = zipFunction;
    }    
    public void start(Observable[] observables) {             
        final int length = os.length;
        subscribers = new Subscriber[length];        for (int i = 0; i < observables.length; i++) {
            InnerSubscriber subscriber = new InnerSubscriber();
            subscribers[i] = subscriber;
        }        
        for (int i = 0; i < observables.length; i++) {
            observables[i].subscribe(subscribers[i]);
        }
    }    
    private void tick() {        
       final int length = subscribers.length;        
       final Object[] objs = new Object[length];         
       for (int i = 0; i < length; i++) {
            InnerSubscriber subscriber = (InnerSubscriber) subscribers[i];
            objs[i] = subscriber.queue.peek();            
            if (objs[i] == null) {                
              return;
            }
        }        
        for (int i = 0; i < length; i++) {
            InnerSubscriber subscriber = (InnerSubscriber) subscribers[i];
            subscriber.queue.poll();
        }
        child.onNext(zipFunction.call(objs));
    }    
    final class InnerSubscriber extends Subscriber {
        Queue queue = new LinkedList();        
        @Override
        public void onNext(Object o) {
            queue.offer(o);
            tick();
        }
    }

}

原理就是把多个 observable 合并成一个 observables 事件 ZipSubscriber 的 onNext(observables) 调用 zip.start(observables) 遍历 observables 执行 observable.subscribe(InnerSubscriber) InnerSubscriber 内部维护一个队列 InnerSubscriber.onNext(Object) 把 obj 存入队列,并触发 tick() tick() 会检查所有的 InnerSubscriber.queue 是否都有值 如果是,才触发上层的 subscriber 的 onNext(R value) 当然 value 得先经过我们定义的 func2 的处理,即 value = zipFunction.call(Object[]),调用了 subscriber.onNext(value) 后,再把每一个 InnerSubscriber.queue 的第一个元素移除 如果不是,则直接 return

假如 zip(o1, o2, o3, o4, o5),那么就会创建5个 InnerSubscriber,任何一个 observable.call(InnerSubscriber) --> InnerSubscriber.onNext(obj) 都会先把 obj 加入 InnerSubscriber 内部的 queue 再触发tick()

代码语言:javascript
复制
public interface FuncN<R> {    
     R call(Object... args);    
     static <T0, T1, R> FuncN<R> fromFunc(final Func2<? super T0, ? super T1, ? extends R> f) {        
      return new FuncN<R>() {            
      @SuppressWarnings("unchecked")            
      @Override
      public R call(Object... args) {                i
      f (args.length != 2) {                    
        throw new RuntimeException("Func2 expecting 2 arguments.");
                }                
         return f.call((T0) args[0], (T1) args[1]);
            }

        };
    }
}

文:风风风筝

http://www.jianshu.com/p/f8b18605ac66


iOS特殊赞助通道,支持作者恭喜发财!

☞ 持续关注,[撸框架系列]

后续不定期将推出撸RxJava, 撸RxBus,撸RxCache,RxLifecyle.

精彩推荐

撸Retrfoit:

抛开理论,从实践中动手撸一Retrofit

学Rxjava:

关于RxJava2.0你不知道的事

自己撸一个RxJava 可好?(上)

RxBus:

当EventBus遇上自撸RxBus的时候?

技术 - 资讯 - 感悟

END

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-07-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发者技术前线 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档