前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >定时触发函数的Python实现

定时触发函数的Python实现

原创
作者头像
mariolu
发布2021-01-28 01:36:30
1.7K0
发布2021-01-28 01:36:30
举报
文章被收录于专栏:CDN及云技术分享

一、使用场景:

定时触发器在生产环境经常用到,比如说定时load一段活动配置,定时做清理存储动作,定时检查进程运行健康状态,定时上报事件日志等。

定时触发器的实现原理,一般是依赖io非阻塞复用(比如epoll的定时fd)。

二、基本设计:

  • 定时时间下一次时间点计算功能
  • 检测函数执行是否成功,以及事后回调,事后回调必须完成是否重新调度或者删除任务
  • 删除任务可由函数执行失败触发(因为一次失败的任务,下次可能还会失败),或者提供手动cancel()接口来取消任务
代码语言:javascript
复制
class TriggerFunction(object):
    def try_to_call(self, trigger_finished_cb):

        if self._delete:
            trigger_finished_cb(self)
            return
            
        def go():
            try:
                r = self._func(*self._args, **self._kwargs)
                if r == False:
                    self._delete = True
            except:
                log.error("Exception in trigger function %r" % self._func)
            finally:

                now = time.time()
                self._next_call_timestamp = now + self._interval

                trigger_finished_cb(self)

         go()
         
    def cancel(self):
        self._delete = True
        self._func = None
        self._args = None
        self._kwargs = None

事后函数在另一个调度class,里面有这个trigger_finished方法:

代码语言:javascript
复制

  def trigger_finished(trigger_func):
      if not trigger_func._delete:
          self._schedule(trigger_func)
      self._running_triggers.remove(trigger_func)

三、多线程环境下更多设计:

  • 考虑到函数可能被多次同时调用(想象一下,如果每秒定时的任务队列,如果上一次的函数执行时间过长,超过1s,那么下下一秒的任务会第二次同时进入函数),所以一次调用执行过程中必须不能被打扰,必须加一个锁保护。
  • 考虑不用锁的实现,在python里面有个叫greenlet协程设计
  • 是否是每次都准时+1个周期的隔离点调用,还是说这个定时周期不包括函数的执行时间。
  • 如果是定时间的调用,想象一下有多个定时器在同时调用,那么在同一时间可能会形成性能高峰,所以需要加入加一个随机偏差值提供给用户选择。
  • 调度函数执行如果在主线程,应该不应该阻塞主线程,而是在事件线程中操作

按照设计,更新下代码

代码语言:javascript
复制
class TriggerFunction(object):
    def try_to_call(self, trigger_finished_cb):
        /*更新部分,判断是否取到锁*/
        if self._lock:
            return
        /*更新部分*/

        if self._delete:
            trigger_finished_cb(self)
            return
            
        def go():
            try:
                /*更新部分,取到锁*/
                self._lock = True
                /*更新部分*/
                r = self._func(*self._args, **self._kwargs)
                if r == False:
                    self._delete = True
            except:
                log.error("Exception in trigger function %r" % self._func)
            finally:
                /*更新部分,释放锁*/
                self._lock = False
                /*更新部分*/

                /*更新部分,是否按时执行,计算下一个时间间隔*/
                now = time.time()
                if self._always_on_time and self._interval:
                    while self._next_call_timestamp < now:
                        self._next_call_timestamp += self._interval
                else:
                    self._next_call_timestamp = now + self._interval
                /*更新部分*/

                /*更新部分,是否随机执行*/
                if self._diffusion:
                    self._next_call_timestamp += \
                        self._interval * random.uniform(-0.2, 0.2)
                /*更新部分*/

                trigger_finished_cb(self)

        /*更新部分,是否随机执行*/
        try:
            if greenlet:
                g = greenlet.greenlet(go)
                g.switch()
            else:
                go()
        /*greelet协程无锁设计*/
         
    def cancel(self):
        self._delete = True
        self._func = None
        self._args = None
        self._kwargs = None

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、使用场景:
  • 二、基本设计:
  • 三、多线程环境下更多设计:
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档