专栏首页码匠的流水账聊聊flink的slot.idle.timeout配置
原创

聊聊flink的slot.idle.timeout配置

本文主要研究一下flink的slot.idle.timeout配置

JobManagerOptions

flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java

@PublicEvolving
public class JobManagerOptions {
    //......
​
    /**
     * The timeout in milliseconds for a idle slot in Slot Pool.
     */
    public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT =
        key("slot.idle.timeout")
            // default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery
            .defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
            .withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");
​
    //......
}
  • slot.idle.timeout默认为HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue(),即50000L毫秒

SlotPool

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java

public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions {
​
    /** The interval (in milliseconds) in which the SlotPool writes its slot distribution on debug level. */
    private static final int STATUS_LOG_INTERVAL_MS = 60_000;
​
    private final JobID jobId;
​
    private final SchedulingStrategy schedulingStrategy;
​
    private final ProviderAndOwner providerAndOwner;
​
    /** All registered TaskManagers, slots will be accepted and used only if the resource is registered. */
    private final HashSet<ResourceID> registeredTaskManagers;
​
    /** The book-keeping of all allocated slots. */
    private final AllocatedSlots allocatedSlots;
​
    /** The book-keeping of all available slots. */
    private final AvailableSlots availableSlots;
​
    /** All pending requests waiting for slots. */
    private final DualKeyMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;
​
    /** The requests that are waiting for the resource manager to be connected. */
    private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
​
    /** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor). */
    private final Time rpcTimeout;
​
    /** Timeout for releasing idle slots. */
    private final Time idleSlotTimeout;
​
    private final Clock clock;
​
    /** Managers for the different slot sharing groups. */
    protected final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers;
​
    /** the fencing token of the job manager. */
    private JobMasterId jobMasterId;
​
    /** The gateway to communicate with resource manager. */
    private ResourceManagerGateway resourceManagerGateway;
​
    private String jobManagerAddress;
​
    //......
​
    /**
     * Start the slot pool to accept RPC calls.
     *
     * @param jobMasterId The necessary leader id for running the job.
     * @param newJobManagerAddress for the slot requests which are sent to the resource manager
     */
    public void start(JobMasterId jobMasterId, String newJobManagerAddress) throws Exception {
        this.jobMasterId = checkNotNull(jobMasterId);
        this.jobManagerAddress = checkNotNull(newJobManagerAddress);
​
        // TODO - start should not throw an exception
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException("This should never happen", e);
        }
​
        scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
​
        if (log.isDebugEnabled()) {
            scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);
        }
    }
​
    /**
     * Check the available slots, release the slot that is idle for a long time.
     */
    private void checkIdleSlot() {
​
        // The timestamp in SlotAndTimestamp is relative
        final long currentRelativeTimeMillis = clock.relativeTimeMillis();
​
        final List<AllocatedSlot> expiredSlots = new ArrayList<>(availableSlots.size());
​
        for (SlotAndTimestamp slotAndTimestamp : availableSlots.availableSlots.values()) {
            if (currentRelativeTimeMillis - slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) {
                expiredSlots.add(slotAndTimestamp.slot);
            }
        }
​
        final FlinkException cause = new FlinkException("Releasing idle slot.");
​
        for (AllocatedSlot expiredSlot : expiredSlots) {
            final AllocationID allocationID = expiredSlot.getAllocationId();
            if (availableSlots.tryRemove(allocationID) != null) {
​
                log.info("Releasing idle slot [{}].", allocationID);
                final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
                    allocationID,
                    cause,
                    rpcTimeout);
​
                freeSlotFuture.whenCompleteAsync(
                    (Acknowledge ignored, Throwable throwable) -> {
                        if (throwable != null) {
                            if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
                                log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " +
                                    "Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(),
                                    throwable);
                                tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
                            } else {
                                log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " +
                                    "longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId());
                            }
                        }
                    },
                    getMainThreadExecutor());
            }
        }
​
        scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
    }
​
    //......
}
  • SlotPool在start方法里头,调用scheduleRunAsync方法,延时idleSlotTimeout调度执行checkIdleSlot;checkIdleSlot方法会挨个检查availableSlots的SlotAndTimestamp,判断当前时间与slotAndTimestamp.timestamp的时间差是否超过idleSlotTimeout,超过的话,则放入expiredSlots,之后对expiredSlots挨个进行availableSlots.tryRemove,然后调用TaskManagerGateway.freeSlot进行释放,之后再次调用scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout)进行下一次的延时调度检测

RpcEndpoint

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java

public abstract class RpcEndpoint implements RpcGateway {
    //......
​
    /**
     * Execute the runnable in the main thread of the underlying RPC endpoint, with
     * a delay of the given number of milliseconds.
     *
     * @param runnable Runnable to be executed
     * @param delay    The delay after which the runnable will be executed
     */
    protected void scheduleRunAsync(Runnable runnable, Time delay) {
        scheduleRunAsync(runnable, delay.getSize(), delay.getUnit());
    }
​
    /**
     * Execute the runnable in the main thread of the underlying RPC endpoint, with
     * a delay of the given number of milliseconds.
     *
     * @param runnable Runnable to be executed
     * @param delay    The delay after which the runnable will be executed
     */
    protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
        rpcServer.scheduleRunAsync(runnable, unit.toMillis(delay));
    }
​
    //......
}
  • RpcEndpoint提供了scheduleRunAsync,其最后调用的是rpcServer.scheduleRunAsync

小结

  • slot.idle.timeout默认为HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue(),即50000L毫秒
  • SlotPool在start方法里头,调用scheduleRunAsync方法,延时idleSlotTimeout调度执行checkIdleSlot;checkIdleSlot方法会挨个检查availableSlots的SlotAndTimestamp,判断当前时间与slotAndTimestamp.timestamp的时间差是否超过idleSlotTimeout,超过的话,则放入expiredSlots,之后对expiredSlots挨个进行availableSlots.tryRemove,然后调用TaskManagerGateway.freeSlot进行释放,之后再次调用scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout)进行下一次的延时调度检测
  • RpcEndpoint提供了scheduleRunAsync,其最后调用的是rpcServer.scheduleRunAsync

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊rocketmq的NettyClientConfig

    org/apache/rocketmq/remoting/netty/NettyClientConfig.java

    codecraft
  • 聊聊artemis的DelayedAddRedistributor

    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis...

    codecraft
  • 聊聊consul的NewService

    consul-api-1.4.1-sources.jar!/com/ecwid/consul/v1/agent/model/NewService.java

    codecraft
  • SLua-在Lua中实现Unity中的生命周期

    Enumerable.Cast(IEnumerable) Method : Casts the elements of an IEnumerable to t...

    祝你万事顺利
  • 使用QEMU chroot进行固件本地调试

    QEMU是我们在调试一些不同架构的程序时经常使用的虚拟机软件。它有两种运行模式,全系统模拟(System mode)和单程序运行(User mode)。Syst...

    绿盟科技研究通讯
  • TCP协议的解析

    版权声明:本文为博主原创文章,转载请注明博客地址: https://blog.csdn.net/z...

    zy010101
  • 深入理解MySQL 5.7 GTID系列(四): PREVIOUS GTID EVENT

    之所以把MySQL.GTID_EXECUTED表的作用和PREVIOUS GTID EVENT的改变放到一起进行描述是因为它们后面文章探讨的基础。这部分使用到了...

    wubx
  • 有关抽奖的一个算法

    很早以前看到了这个算法,忘记叫什么名字了,这里就索性叫抽奖算法吧,知道的朋友不要扔砖头

    冰封一夏
  • [系列] - go-gin-api 路由中间件 - Jaeger 链路追踪(五)

    上篇文章分享了,路由中间件 - 捕获异常,这篇文章咱们分享:路由中间件 - Jaeger 链路追踪。

    新亮
  • Java Web技术经验总结(十三)

    阿杜

扫码关注云+社区

领取腾讯云代金券