前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的ScheduledExecutor

聊聊flink的ScheduledExecutor

原创
作者头像
code4it
发布2019-03-13 09:36:13
9840
发布2019-03-13 09:36:13
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的ScheduledExecutor

Executor

java.base/java/util/concurrent/Executor.java

代码语言:javascript
复制
public interface Executor {
​
    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
  • jdk的Executor接口定义了execute方法,接收参数类型为Runnable

ScheduledExecutor

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java

代码语言:javascript
复制
public interface ScheduledExecutor extends Executor {
​
    /**
     * Executes the given command after the given delay.
     *
     * @param command the task to execute in the future
     * @param delay the time from now to delay the execution
     * @param unit the time unit of the delay parameter
     * @return a ScheduledFuture representing the completion of the scheduled task
     */
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
​
    /**
     * Executes the given callable after the given delay. The result of the callable is returned
     * as a {@link ScheduledFuture}.
     *
     * @param callable the callable to execute
     * @param delay the time from now to delay the execution
     * @param unit the time unit of the delay parameter
     * @param <V> result type of the callable
     * @return a ScheduledFuture which holds the future value of the given callable
     */
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
​
    /**
     * Executes the given command periodically. The first execution is started after the
     * {@code initialDelay}, the second execution is started after {@code initialDelay + period},
     * the third after {@code initialDelay + 2*period} and so on.
     * The task is executed until either an execution fails, or the returned {@link ScheduledFuture}
     * is cancelled.
     *
     * @param command the task to be executed periodically
     * @param initialDelay the time from now until the first execution is triggered
     * @param period the time after which the next execution is triggered
     * @param unit the time unit of the delay and period parameter
     * @return a ScheduledFuture representing the periodic task. This future never completes
     * unless an execution of the given task fails or if the future is cancelled
     */
    ScheduledFuture<?> scheduleAtFixedRate(
        Runnable command,
        long initialDelay,
        long period,
        TimeUnit unit);
​
    /**
     * Executed the given command repeatedly with the given delay between the end of an execution
     * and the start of the next execution.
     * The task is executed repeatedly until either an exception occurs or if the returned
     * {@link ScheduledFuture} is cancelled.
     *
     * @param command the task to execute repeatedly
     * @param initialDelay the time from now until the first execution is triggered
     * @param delay the time between the end of the current and the start of the next execution
     * @param unit the time unit of the initial delay and the delay parameter
     * @return a ScheduledFuture representing the repeatedly executed task. This future never
     * completes unless the execution of the given task fails or if the future is cancelled
     */
    ScheduledFuture<?> scheduleWithFixedDelay(
        Runnable command,
        long initialDelay,
        long delay,
        TimeUnit unit);
}
  • ScheduledExecutor接口继承了Executor,它定义了schedule、scheduleAtFixedRate、scheduleWithFixedDelay方法,其中schedule方法可以接收Runnable或者Callable,这些方法返回的都是ScheduledFuture;该接口有两个实现类,分别是ScheduledExecutorServiceAdapter及ActorSystemScheduledExecutorAdapter

ScheduledExecutorServiceAdapter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java

代码语言:javascript
复制
public class ScheduledExecutorServiceAdapter implements ScheduledExecutor {
​
    private final ScheduledExecutorService scheduledExecutorService;
​
    public ScheduledExecutorServiceAdapter(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
    }
​
    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return scheduledExecutorService.schedule(command, delay, unit);
    }
​
    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return scheduledExecutorService.schedule(callable, delay, unit);
    }
​
    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
    }
​
    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }
​
    @Override
    public void execute(Runnable command) {
        scheduledExecutorService.execute(command);
    }
}
  • ScheduledExecutorServiceAdapter实现了ScheduledExecutor接口,它使用的是jdk的ScheduledExecutorService来实现,使用了scheduledExecutorService的schedule、scheduleAtFixedRate、scheduleWithFixedDelay、execute方法

ActorSystemScheduledExecutorAdapter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java

代码语言:javascript
复制
public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {
​
    private final ActorSystem actorSystem;
​
    public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) {
        this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService");
    }
​
    @Override
    @Nonnull
    public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L);
​
        Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);
​
        scheduledFutureTask.setCancellable(cancellable);
​
        return scheduledFutureTask;
    }
​
    @Override
    @Nonnull
    public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L);
​
        Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);
​
        scheduledFutureTask.setCancellable(cancellable);
​
        return scheduledFutureTask;
    }
​
    @Override
    @Nonnull
    public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
            command,
            triggerTime(unit.toNanos(initialDelay)),
            unit.toNanos(period));
​
        Cancellable cancellable = actorSystem.scheduler().schedule(
            new FiniteDuration(initialDelay, unit),
            new FiniteDuration(period, unit),
            scheduledFutureTask,
            actorSystem.dispatcher());
​
        scheduledFutureTask.setCancellable(cancellable);
​
        return scheduledFutureTask;
    }
​
    @Override
    @Nonnull
    public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
            command,
            triggerTime(unit.toNanos(initialDelay)),
            unit.toNanos(-delay));
​
        Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit);
​
        scheduledFutureTask.setCancellable(cancellable);
​
        return scheduledFutureTask;
    }
​
    @Override
    public void execute(@Nonnull Runnable command) {
        actorSystem.dispatcher().execute(command);
    }
​
    private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) {
        return actorSystem.scheduler().scheduleOnce(
            new FiniteDuration(delay, unit),
            runnable,
            actorSystem.dispatcher());
    }
​
    private long now() {
        return System.nanoTime();
    }
​
    private long triggerTime(long delay) {
        return now() + delay;
    }
​
    private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
​
        private long time;
​
        private final long period;
​
        private volatile Cancellable cancellable;
​
        ScheduledFutureTask(Callable<V> callable, long time, long period) {
            super(callable);
            this.time = time;
            this.period = period;
        }
​
        ScheduledFutureTask(Runnable runnable, long time, long period) {
            super(runnable, null);
            this.time = time;
            this.period = period;
        }
​
        public void setCancellable(Cancellable newCancellable) {
            this.cancellable = newCancellable;
        }
​
        @Override
        public void run() {
            if (!isPeriodic()) {
                super.run();
            } else if (runAndReset()){
                if (period > 0L) {
                    time += period;
                } else {
                    cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS);
​
                    // check whether we have been cancelled concurrently
                    if (isCancelled()) {
                        cancellable.cancel();
                    } else {
                        time = triggerTime(-period);
                    }
                }
            }
        }
​
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean result = super.cancel(mayInterruptIfRunning);
​
            return result && cancellable.cancel();
        }
​
        @Override
        public long getDelay(@Nonnull  TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }
​
        @Override
        public int compareTo(@Nonnull Delayed o) {
            if (o == this) {
                return 0;
            }
​
            long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
            return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0;
        }
​
        @Override
        public boolean isPeriodic() {
            return period != 0L;
        }
    }
}
  • ActorSystemScheduledExecutorAdapter实现了ScheduledExecutor接口,它使用的是actorSystem来实现;其中execute方法使用的是actorSystem.dispatcher().execute方法
  • schedule及scheduleWithFixedDelay方法调用的是internalSchedule方法,它使用的是actorSystem.scheduler().scheduleOnce方法,只是它们的ScheduledFutureTask不同,其中schedule方法的ScheduledFutureTask的period为0,而scheduleWithFixedDelay方法的ScheduledFutureTask的period为unit.toNanos(-delay);ScheduledFutureTask的run方法会对period进行判断,小于等于0的,会再次调用internalSchedule方法,来实现以FixedDelay进行调度的效果
  • scheduleAtFixedRate方法,它使用的是actorSystem.scheduler().schedule方法,其ScheduledFutureTask的period即为方法参数的period,没有像scheduleWithFixedDelay方法那样用unit.toNanos(-delay)作为period

小结

  • ScheduledExecutor接口继承了Executor,它定义了schedule、scheduleAtFixedRate、scheduleWithFixedDelay方法,其中schedule方法可以接收Runnable或者Callable,这些方法返回的都是ScheduledFuture;该接口有两个实现类,分别是ScheduledExecutorServiceAdapter及ActorSystemScheduledExecutorAdapter
  • ScheduledExecutorServiceAdapter实现了ScheduledExecutor接口,它使用的是jdk的ScheduledExecutorService来实现,使用了scheduledExecutorService的schedule、scheduleAtFixedRate、scheduleWithFixedDelay、execute方法
  • ActorSystemScheduledExecutorAdapter实现了ScheduledExecutor接口,它使用的是actorSystem来实现;其中execute方法使用的是actorSystem.dispatcher().execute方法;schedule及scheduleWithFixedDelay方法调用的是internalSchedule方法,它使用的是actorSystem.scheduler().scheduleOnce方法,只是它们的ScheduledFutureTask不同,其中schedule方法的ScheduledFutureTask的period为0,而scheduleWithFixedDelay方法的ScheduledFutureTask的period为unit.toNanos(-delay);ScheduledFutureTask的run方法会对period进行判断,小于等于0的,会再次调用internalSchedule方法,来实现以FixedDelay进行调度的效果;scheduleAtFixedRate方法,它使用的是actorSystem.scheduler().schedule方法,其ScheduledFutureTask的period即为方法参数的period,没有像scheduleWithFixedDelay方法那样用unit.toNanos(-delay)作为period

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Executor
  • ScheduledExecutor
    • ScheduledExecutorServiceAdapter
      • ActorSystemScheduledExecutorAdapter
      • 小结
      • doc
      相关产品与服务
      大数据
      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档