前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kubernetes 是怎么实现定时任务的

Kubernetes 是怎么实现定时任务的

作者头像
CS实验室
发布2021-03-22 12:40:02
7390
发布2021-03-22 12:40:02
举报
文章被收录于专栏:CS实验室CS实验室CS实验室

Kubernetes 是怎么实现定时任务的

Kubernetes 的各个组件都有一定的定时任务,比如日志的处理、任务的查询、缓存的使用等。Kubernetes 中的定时任务都是通过 wait 包实现的,比如在 Kubelet 中启动探针的检查:

// Start syncing probe status. This should only be called once.
func (m *manager) Start() {
    // Start syncing readiness.
    go wait.Forever(m.updateReadiness, 0)
    // Start syncing startup.
    go wait.Forever(m.updateStartup, 0)
}

Golang 的定时任务

在讲 Kubernetes 的 wait 库之前,先看下 Golang 应该怎么实现一个定时任务。

Golang 中的 time 库包含了很多和时间相关的工具,其中包括了 Ticker 和 Timer 两个定时器。

Ticker 只要完成定义,从计时开始,不需要其他操作,每间隔固定时间便会触发。

package main

import (
    "fmt"
    "time"
)

func main() {

    d := time.Duration(time.Second * 5)
    ticker := time.NewTicker(d)

    defer ticker.Stop()

    for {
        <-ticker.C

        fmt.Println("Hello World")
    }

}

而对于 Timer,在超时之后需要重置才能继续触发。

package main

import (
    "fmt"
    "time"
)

func main() {

    d := time.Duration(time.Second * 5)
    timer := time.NewTimer(d)

    defer timer.Stop()

    for {
        <-timer.C

        fmt.Println("Hello World")
        timer.Reset(d)
    }

}

需要注意的,无论哪种计时器, .C 都是一个 chanTime 类型且容量为 1 的单向 Channel,当有超过 1 个数据的时候便会被阻塞,以此保证不会被触发多次。

func NewTimer(d Duration) *Timer {
    c := make(chan Time, 1)
    t := &Timer{
        C: c,
        r: runtimeTimer{
            when: when(d),
            f:    sendTime,
            arg:  c,
        },
    }
    startTimer(&t.r)
    return t
}

Kubernetes 的 wait 库

常用 API

wait 库中实现了多种常用的 API,以提供定时执行函数的能力。

定期执行一个函数,永不停止
// Forever calls f every period for ever.
//
// Forever is syntactic sugar on top of Until.
func Forever(f func(), period time.Duration)

该函数支持一个函数参数和一个间隔时间,该函数会定期执行,不会停止。

定期执行一个函数,可以接受停止信号
// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{})

该函数支持提供一个函数、间隔时间和发生 stop 信号的 channel,和 Forever 类似,不过可以通过向 stopCh 发布消息来停止。

定期检查先决条件
// Poll tries a condition func until it returns true, an error, or the timeout
// is reached.
//
// Poll always waits the interval before the run of 'condition'.
// 'condition' will always be invoked at least once.
//
// Some intervals may be missed if the condition takes too long or the time
// window is too short.
//
// If you want to Poll something forever, see PollInfinite.
func Poll(interval, timeout time.Duration, condition ConditionFunc) error

该函数将以 interval 为间隔,定期检查 condition 是否检查成功。

定期检查先决条件,直到检查成功或停止
// PollUntil tries a condition func until it returns true, an error or stopCh is
// closed.
//
// PollUntil always waits interval before the first run of 'condition'.
// 'condition' will always be invoked at least once.
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error

核心代码

wait 库的定时任务 API 是基于 JitterUntil 实现的。

// JitterUntil loops until stop channel is closed, running f every period.
//
// If jitterFactor is positive, the period is jittered before every run of f.
// If jitterFactor is not positive, the period is unchanged and not jittered.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
//
// Close stopCh to stop. f may not be invoked if stop channel is already
// closed. Pass NeverStop to if you don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
    var t *time.Timer
    var sawTimeout bool

    for {
        select {
        case <-stopCh:
            return
        default:
        }

        jitteredPeriod := period
        if jitterFactor > 0.0 {
            jitteredPeriod = Jitter(period, jitterFactor)
        }

        if !sliding {
            t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
        }

        func() {
            defer runtime.HandleCrash()
            f()
        }()

        if sliding {
            t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
        }

        // NOTE: b/c there is no priority selection in golang
        // it is possible for this to race, meaning we could
        // trigger t.C and stopCh, and t.C select falls through.
        // In order to mitigate we re-check stopCh at the beginning
        // of every loop to prevent extra executions of f().
        select {
        case <-stopCh:
            return
        case <-t.C:
            sawTimeout = true
        }
    }
}

JitterUntil 的 5 个参数:

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 CS实验室 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Golang 的定时任务
  • Kubernetes 的 wait 库
    • 常用 API
      • 定期执行一个函数,永不停止
      • 定期执行一个函数,可以接受停止信号
      • 定期检查先决条件
      • 定期检查先决条件,直到检查成功或停止
    • 核心代码
    相关产品与服务
    容器服务
    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档