前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava之timer和interval操作符源码解析

RxJava之timer和interval操作符源码解析

作者头像
103style
发布2022-12-19 13:22:01
6730
发布2022-12-19 13:22:01
举报

转载请以链接形式标明出处: 本文出自:103style的博客

timer 操作符

timer 操作符实际上返回的是一个 ObservableTimer对象。两个参数的方法默认在 Schedulers.computation()中工作。

代码语言:javascript
复制
 public static Observable<Long> timer(long delay, TimeUnit unit) {
     return timer(delay, unit, Schedulers.computation());
 }
 public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) {
     return RxJavaPlugins.onAssembly(new ObservableTimer(Math.max(delay, 0L), unit, scheduler));
 }

ObservableTimer 源码:

  • 构建了 TimerObserver 对象。
  • 执行 观察者 的 onSubscribe 方法。
  • 通过scheduler.scheduleDirect(ios, delay, unit) 返回一个 Disposable 对象。
  • 将返回的 Disposable 对象传给 TimerObserver 对象的 setResource 方法
代码语言:javascript
复制
public final class ObservableTimer extends Observable<Long> {
    final Scheduler scheduler;
    final long delay;
    final TimeUnit unit;
    public ObservableTimer(long delay, TimeUnit unit, Scheduler scheduler) {
        this.delay = delay;
        this.unit = unit;
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(Observer<? super Long> observer) {
        TimerObserver ios = new TimerObserver(observer);
        observer.onSubscribe(ios);
        Disposable d = scheduler.scheduleDirect(ios, delay, unit);
        ios.setResource(d);
    }
    ...
}

TimerObserver对象源码:

代码语言:javascript
复制
static final class TimerObserver extends AtomicReference<Disposable>
implements Disposable, Runnable {

    final Observer<? super Long> downstream;

    TimerObserver(Observer<? super Long> downstream) {
        this.downstream = downstream;
    }
    ...
    @Override
    public void run() {
        if (!isDisposed()) {
            downstream.onNext(0L);
            lazySet(EmptyDisposable.INSTANCE);
            downstream.onComplete();
        }
    }

    public void setResource(Disposable d) {
        DisposableHelper.trySet(this, d);
    }
}

首先看 TimerObserversetResource(Disposable d)方法 里的 DisposableHelper.trySet(this, d);

代码语言:javascript
复制
public static boolean trySet(AtomicReference<Disposable> field, Disposable d) {
    if (!field.compareAndSet(null, d)) {
        if (field.get() == DISPOSED) {
            d.dispose();
        }
        return false;
    }
    return true;
}
  • d 不为 null,直接 return true;否则判断 是否为 DISPOSED 状态,是的话调用传进来的 Disposable 对象(也就是之前 Scheduler 构建的 DisposeTask 对象)的 dispose 方法。

scheduler.scheduleDirect(ios, delay, unit) 方法:

代码语言:javascript
复制
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

首先创建了一个 Worker,因为默认是 Schedulers.computation()中工作,查看源码可知 实际调用的是 ComputationSchedulercreateWorker 方法 。 Schedulers

代码语言:javascript
复制
...
static final class ComputationHolder {
    static final Scheduler DEFAULT = new ComputationScheduler();
}
...
static {
    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    ...
}

static final class ComputationTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return ComputationHolder.DEFAULT;
    }
}

RxJavaPlugins

代码语言:javascript
复制
public static Scheduler initComputationScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
    ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
    Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitComputationHandler;
    if (f == null) {
        return callRequireNonNull(defaultScheduler);
    }
    return applyRequireNonNull(f, defaultScheduler); // JIT will skip this
}

static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
    try {
        return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
    } catch (Throwable ex) {
        throw ExceptionHelper.wrapOrThrow(ex);
    }
}

f 默认为 null,所以返回的是 callRequireNonNull(defaultScheduler),然后实际调用的是 ComputationTaskcall 方法。返回的即为 ComputationScheduler 对象。

ComputationSchedulercreateWorker 方法 。

代码语言:javascript
复制
public ComputationScheduler() {
    this(THREAD_FACTORY);
}

public ComputationScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
    start();
}

public Worker createWorker() {
    return new EventLoopWorker(pool.get().getEventLoop());
}

pool.get()通过构造函数我们可知返回的为 NONE = new FixedSchedulerPool(0, THREAD_FACTORY); 所以 pool.get().getEventLoop() 返回的为 SHUTDOWN_WORKER = new PoolWorker(new RxThreadFactory("RxComputationShutdown"));。实际上是创建了一个 executorExecutors.newScheduledThreadPool(1, factory) ,即 factoryRxThreadFactory("RxComputationShutdown")单线程线程池对象PoolWorker对象

代码语言:javascript
复制
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
    this.cores = maxThreads;
    this.eventLoops = new PoolWorker[maxThreads];
    for (int i = 0; i < maxThreads; i++) {
        this.eventLoops[i] = new PoolWorker(threadFactory);
    }
}
public PoolWorker getEventLoop() {
    int c = cores;
    if (c == 0) {
        return SHUTDOWN_WORKER;
    }
    return eventLoops[(int)(n++ % c)];
}

所以createWorker 返回的是:poolWorkerfactoryRxThreadFactory("RxComputationShutdown")单线程线程池对象PoolWorker对象

代码语言:javascript
复制
static final class EventLoopWorker extends Scheduler.Worker {
    private final ListCompositeDisposable serial;
    private final CompositeDisposable timed;
    private final ListCompositeDisposable both;
    private final PoolWorker poolWorker;

    volatile boolean disposed;

    EventLoopWorker(PoolWorker poolWorker) {
        this.poolWorker = poolWorker;
        this.serial = new ListCompositeDisposable();
        this.timed = new CompositeDisposable();
        this.both = new ListCompositeDisposable();
        this.both.add(serial);
        this.both.add(timed);
    }
...

decoratedRun 即为 TimerObserver 对象。

代码语言:javascript
复制
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

然后构建了一个 DisposeTask 对象。

代码语言:javascript
复制
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
    final Runnable decoratedRun;
    final Worker w;
    Thread runner;

    DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
        this.decoratedRun = decoratedRun;
        this.w = w;
    }

    @Override
    public void run() {
        runner = Thread.currentThread();
        try {
            decoratedRun.run();
        } finally {
            dispose();
            runner = null;
        }
    }
    ...
}

createWorker 返回的 poolWorkerfactoryRxThreadFactory("RxComputationShutdown")单线程线程池对象PoolWorker对象,并执行 schedule 方法。 实际上是执行了 单线程线程池对象 Executors.newScheduledThreadPool(1, factory)schedule(task, delayTime, unit)方法,并将返回值 Future 对象 传给ScheduledRunnablesetFuture 方法。

代码语言:javascript
复制
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (disposed) {
        return EmptyDisposable.INSTANCE;
    }
    return scheduleActual(action, delayTime, unit, null);
}

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    ...
    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        ...
        RxJavaPlugins.onError(ex);
    }
    return sr;
}

线程池的schedule(task, delayTime, unit) 方法实际时延时 delayTime 执行 taskrun 方法。即为 执行 TimerObserver 对象的 run 方法。

代码语言:javascript
复制
public void subscribeActual(Observer<? super Long> observer) {
    TimerObserver ios = new TimerObserver(observer);
    observer.onSubscribe(ios);
    Disposable d = scheduler.scheduleDirect(ios, delay, unit);
    ios.setResource(d);
}

TimerObserver 对象的 run 方法: 即执行了 观察者onNext(0L)onComplete()

代码语言:javascript
复制
public void run() {
    if (!isDisposed()) {
        downstream.onNext(0L);
        lazySet(EmptyDisposable.INSTANCE);
        downstream.onComplete();
    }
}

interval 系列操作符
  • interval系列 包含 intervalintervalRange两个操作符,包含以下 6 个方法:
    • interval(long period, TimeUnit unit)
    • interval(long initialDelay, long period, TimeUnit unit)
    • interval(long period, TimeUnit unit, Scheduler scheduler)
    • interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
    • intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
    • intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

    分别返回的是 ObservableIntervalObservableIntervalRange 对象,默认的 SchedulerSchedulers.computation()

代码语言:javascript
复制
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) {
    return interval(initialDelay, period, unit, Schedulers.computation());
}
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
    return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) {
    return intervalRange(start, count, initialDelay, period, unit, Schedulers.computation());
}
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
    return RxJavaPlugins.onAssembly(new ObservableIntervalRange(start, end, Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}

ObservableInterval 源码:

  • 构建了 IntervalObserver 对象。
  • 因为默认Schedulers.computation() 所以 sch instanceof TrampolineScheduler不成立,除非我们手动传参 SchedulerSchedulers.trampoline()
  • 和前面的 ObservableTimer类似, 即为调用 ObservableIntervalrun 方法。只是返回的为PeriodicDirectTask对象。
  • setResourceObservableTimer类似,就不再赘述了。
代码语言:javascript
复制
public final class ObservableInterval extends Observable<Long> {
    final Scheduler scheduler;
    final long initialDelay;
    final long period;
    final TimeUnit unit;

    public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(Observer<? super Long> observer) {
        IntervalObserver is = new IntervalObserver(observer);
        observer.onSubscribe(is);

        Scheduler sch = scheduler;
        if (sch instanceof TrampolineScheduler) {
            Worker worker = sch.createWorker();
            is.setResource(worker);
            worker.schedulePeriodically(is, initialDelay, period, unit);
        } else {
            Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
            is.setResource(d);
        }
    }
    ...
}

sch.schedulePeriodicallyDirect(is, initialDelay, period, unit) 实际调用的为 schedulePeriodically方法:

  • interval 的间隔时间转化为 Nanoseconds
  • 然后设置 第一次的 响应时间为 当前时间+ 间隔时间 的 纳秒数。
  • 里面又将 PeriodicDirectTask对象 包装成 PeriodicTask 对象。
代码语言:javascript
复制
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
    Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
    if (d == EmptyDisposable.INSTANCE) {
        return d;
    }
    return periodicTask;
}

public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
    final SequentialDisposable first = new SequentialDisposable();
    final SequentialDisposable sd = new SequentialDisposable(first);
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    final long periodInNanoseconds = unit.toNanos(period);
    final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
    final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);

    Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
            periodInNanoseconds), initialDelay, unit);
    if (d == EmptyDisposable.INSTANCE) {
        return d;
    }
    first.replace(d);
    return sd;
}

PeriodicTask 对象的 run 方法

  • decoratedRun.run(); 又调用了 PeriodicDirectTask对象的 run 方法.
  • run 方法的最后 sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS)); 这里又 重复执行 这个任务,直到 IntervalObserver对象 isDisposed()true
代码语言:javascript
复制
 final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection {
    final Runnable decoratedRun;
    final SequentialDisposable sd;
    final long periodInNanoseconds;
    long count;
    long lastNowNanoseconds;
    long startInNanoseconds;

    PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
            long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
        this.decoratedRun = decoratedRun;
        this.sd = sd;
        this.periodInNanoseconds = periodInNanoseconds;
        lastNowNanoseconds = firstNowNanoseconds;
        startInNanoseconds = firstStartInNanoseconds;
    }

    @Override
    public void run() {
        decoratedRun.run();
        if (!sd.isDisposed()) {
            long nextTick;
            long nowNanoseconds = now(TimeUnit.NANOSECONDS);
            // If the clock moved in a direction quite a bit, rebase the repetition period
            if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
                    || nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
                nextTick = nowNanoseconds + periodInNanoseconds;
                /*
                 * Shift the start point back by the drift as if the whole thing
                 * started count periods ago.
                 */
                startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
            } else {
                nextTick = startInNanoseconds + (++count * periodInNanoseconds);
            }
            lastNowNanoseconds = nowNanoseconds;

            long delay = nextTick - nowNanoseconds;
            sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
        }
    }
    ...
}

PeriodicDirectTaskrun 方法:

  • 实际调用的即为IntervalObserverrun()
代码语言:javascript
复制
static final class PeriodicDirectTask
implements Disposable, Runnable, SchedulerRunnableIntrospection {
    final Runnable run;
    final Worker worker;
    volatile boolean disposed;

    PeriodicDirectTask(@NonNull Runnable run, @NonNull Worker worker) {
        this.run = run;
        this.worker = worker;
    }

    @Override
    public void run() {
        if (!disposed) {
            try {
                run.run();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                worker.dispose();
                throw ExceptionHelper.wrapOrThrow(ex);
            }
        }
    }
    ...
}

IntervalObserverrun()

  • 调用 观察者onNext 方法
代码语言:javascript
复制
static final class IntervalObserver
extends AtomicReference<Disposable>
implements Disposable, Runnable {
    final Observer<? super Long> downstream;
    long count;

    IntervalObserver(Observer<? super Long> downstream) {
        this.downstream = downstream;
    }

    @Override
    public void run() {
        if (get() != DisposableHelper.DISPOSED) {
            downstream.onNext(count++);
        }
    }
}

然后直到我们直接调用 dispose() 方法结束流程。

以上

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-05-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • timer 操作符
  • interval 系列操作符
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档