专栏首页Python无止境分布式场景下使用APScheduler

分布式场景下使用APScheduler

简介

APScheduler是一个定时任务框架,其主要功能就是方便控制不同类型的定时任务,本身并没有考虑分布式多实例情况下的一些问题,本篇文章就来简单谈谈APScheduler在简单分布式场景下的使用。

分布式带来的问题

比如有个服务A,服务A中有使用APScheduler添加任务的逻辑,每次添加的任务会在随后固定的某个时间点被APScheduler调用执行。

在单节点情况下,这并没有什么问题,但随着业务加大,你可能要开启多个服务A来做负载时,此时APScheduler就会出现重复执行任务的问题。

为了方便说明,这里使用MongoDB作为APScheduler的jobstore,使用线程池作为它的执行器。(如果你不明白我在说啥,建议看看此前APScheduler的文章)

scheduler = BlockingScheduler(
    jobstores={"default": mongostore},
    executors={"default": ThreadPoolExecutor(10)},
    job_defaults={"coalesce": True, "max_instances": 3},
    timezone='Asia/Shanghai',
)

如果开启了多个服务A,服务A中都使用了相同配置的scheduler,此时就会出现任务重复执行的问题。

为何会有这个问题?一起来阅读一下相关源码,一探究竟。

因为使用了BlockingScheduler作为调度器,所以直接看到该类的代码

# apscheduler/schedulers/blocking.py

class BlockingScheduler(BaseScheduler):
    """
    A scheduler that runs in the foreground
    (:meth:`~apscheduler.schedulers.base.BaseScheduler.start` will block).
    """
    _event = None

    # ... 省略部分代码

    def _main_loop(self):
        wait_seconds = TIMEOUT_MAX
        while self.state != STATE_STOPPED:
            # 等待事件通知,wait_seconds为等待事件通知的超时时间
            # wait()方法会阻塞线程,直到事件标志状态为true。
            self._event.wait(wait_seconds)
            # clear()方法将事件标志状态设置为false
            self._event.clear()
            wait_seconds = self._process_jobs()

_main_loop方法会构成主循环,其具体的执行逻辑在 _process_jobs方法中, _process_jobs方法部分代码如下。

# apscheduler/schedulers/base.py/BaseScheduler

def _process_jobs(self):
        """
        Iterates through jobs in every jobstore, starts jobs that are due and figures out how long
        to wait for the next round.

        If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least
        ``jobstore_retry_interval`` seconds.

        """
        if self.state == STATE_PAUSED:
            self._logger.debug('Scheduler is paused -- not processing jobs')
            return None

        self._logger.debug('Looking for jobs to run')
        now = datetime.now(self.timezone) # 当前时间
        next_wakeup_time = None
        events = []

        with self._jobstores_lock:
            # 从_jobstores中获取当前要处理的任务
            for jobstore_alias, jobstore in self._jobstores.items():
                try:
                    # 以当前时间为基准,判断是否到了执行时间
                    due_jobs = jobstore.get_due_jobs(now)
                except Exception as e:
                    # Schedule a wakeup at least in jobstore_retry_interval seconds
                    # 在 jobstore 重试间隔时间(秒)内唤醒
                    self._logger.warning('Error getting due jobs from job store %r: %s',
                                         jobstore_alias, e)
                    # 唤醒时间
                    retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)
                    if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
                        next_wakeup_time = retry_wakeup_time

                    continue
        # ... 省略部分代码

_process_jobs方法通过 due_jobs=jobstore.get_due_jobs(now)获取jobstore中的任务对象,通过前面的配置可知,mongodb是这里的jobstore。

看到mongodb对应jobstore的代码。

# apscheduler/jobstores/mongodb.py/MongoDBJobStore

def get_due_jobs(self, now):
        timestamp = datetime_to_utc_timestamp(now)
        return self._get_jobs({'next_run_time': {'$lte': timestamp}})

getduejobs方法主要调用 _get_jobs方法去获取任务对象,要关注的重点是,它使用了lte以及时间戳作为参数,简单用过mongodb的朋友都知道lte其实就是小于等于的意思,简单而言,只要小于或等于timestamp这个时间戳的任务都会被获取。

都看到这了,顺便看一下 _get_jobs方法的代码吧。

def _reconstitute_job(self, job_state):
        # 反序列化,获取任务对象参数
        job_state = pickle.loads(job_state) 
        job = Job.__new__(Job) 
        job.__setstate__(job_state)
        job._scheduler = self._scheduler
        job._jobstore_alias = self._alias
        return job

    def _get_jobs(self, conditions):
        jobs = []
        failed_job_ids = []
        for document in self.collection.find(conditions, ['_id', 'job_state'],
                                             sort=[('next_run_time', ASCENDING)]):
            try:
                jobs.append(self._reconstitute_job(document['job_state']))
            except BaseException:
                self._logger.exception('Unable to restore job "%s" -- removing it',
                                       document['_id'])
                failed_job_ids.append(document['_id'])

        # Remove all the jobs we failed to restore
        if failed_job_ids:
            self.collection.remove({'_id': {'$in': failed_job_ids}})

        return jobs # 返回所有小于等于某一时间戳的任务对象

到这里就很清楚APScheduler会出现重复执行任务问题的原因。

启动多个服务A,相当于运行同一份代码多次,此时APSCheduler的配置都是相同的,即多个APScheduler实例连接同一个mongodb,此时mongodb中存在一个任务就有可能被APScheduler消费多次。

使用分布式锁

要解决APScheduler多实例重复执行任务的问题,最常见的解决方案就是使用分布式锁,而分布式锁中最常见的就是基于Redis构建的字段锁。

Redis字段锁很容易理解,就是通过set命令在redis中设置一个字段,如果字段存在,则是加锁状态,而字段不存在,则是解锁状态。

设计Redis锁时,需要考虑操作原子性,避免同时去获取Redis字段的情况出现,还需要考虑字段超时,避免因逻辑错误出现的长时间死锁,所以设计Redis字段锁还是需要一些tick的,这里分享一种写法,如下。

@contextmanager
def redis_lock(name, timeout=(24 + 2) * 60 * 60):
    try:
        today_string = datetime.datetime.now().strftime("%Y-%m-%d")
        key = f"servername.lock.{name}.{today_string}"
        log.info(f"<Redis Lock> {key}")
        # 原子性的锁: 不存在,创建锁,返回1,相当于获取锁;存在,创建锁失败,返回0,相当于获取锁失败;过一段时间超时,避免死锁
        # nx: 不存在,key值设置为value,返回1,存在,不操作,返回0
        # ex: 设置超时
        lock = bonus_redis.set(key, value=1, nx=True, ex=timeout)
        yield lock
    finally:
        bonus_redis.delete(key) # 释放锁

通过上面方法设置的锁与常用的锁不同。

如果程序没有获得常用的锁,则会阻塞等待锁,而这里涉及的锁并不会等待,它的作用只是保证被锁方法在特定时间段内只执行一次。

此外还要考虑的是加锁位置,因为APScheduler会获取小于某个时间戳下的所有任务,那为了避免任务被重复执行,最直观的做法就是在任务函数中加上锁,例子如下。

# 要被执行的任务函数
def tick():
    with redislock() as lock:
        if lock:
            print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    # 添加到相应的jobstore中
    scheduler.add_job(tick, 'interval', seconds=3) # 添加一个任务,3秒后运行
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

结尾

至此分布式场景下使用APScheduler的方法就介绍完了,核心思想就是确保多个APScheduler实例对同一任务只会执行一次,感谢你的阅读。

如果文章对你有所帮助,点击「在看」支持二两,叩谢豪恩。

本文分享自微信公众号 - Python猫(python_cat)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-01-17

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 花10分钟让你彻底学会定时任务框架apscheduler

    说到定时任务,你会想起 linux 自带的 crontab ,windows 自带的任务计划,都可以实现守时任务。没错,操作系统基本都会提供定时任务的实现,但是...

    Python猫
  • 如何在 Python 中用中文做数学运算?

    花下猫语:在 Python 中是否可以实现中文数字的四则运算呢?答案是肯定的。今天分享的文章,会对这个问题给出令人满意的解答。这个操作可能不会被大家用于实际的项...

    Python猫
  • 用 Python 实现简易 Web 服务器

    花下猫语:上篇文章推荐了一本神书《500 Lines or Less》(点此阅读),有几位读者留言问是否有中文版。很遗憾,还没有。不过我在文中也说了,Githu...

    Python猫
  • 码农技术炒股之路——任务管理器

            系统任务和普通任务都是通过任务管理器调度的。它们的区别是:系统任务在程序运行后即不会被修改,而普通任务则会被修改。(转载请指明出于breakso...

    方亮
  • Python学习总结5--有序列表list和tuple

        2. 用len()函数可以获得list元素的个数     3. 用索引来访问list中每一个位置的元素,记得索引是从0开始的 如...

    曼路
  • 国内外首个 AIOps 企业峰会(附AIOps白皮书)

    织云平台团队
  • 如何遍历文件夹下上亿文件而不栈溢出

    序:一个文件夹下面有很多层的小文件,如何算出这个文件夹下面有多少文件?递归遍历,简单暴力,递归在一般情况确实是比较方便的解决方案,但是当文件夹深度多深,递归的反...

    intsmaze-刘洋
  • 如何遍历文件夹下上亿文件而不栈溢出

    序:一个文件夹下面有很多层的小文件,如何算出这个文件夹下面有多少文件?递归遍历,简单暴力,递归在一般情况确实是比较方便的解决方案,但是当文件夹深度多深,递归的反...

    intsmaze-刘洋
  • 如何将数字转换成口语中的文本串

    今天突发奇想, 写一个将数字转换成中文字符串的函数. 并不是将 1234 转成 '1234' , 而是将 1234 转成 '一千二百三十四'.

    烟草的香味
  • Spring中的@Qualifier注解你会用吗

    本文小胖哥将带你来了解一下Spring中的@Qualifier注解,它解决了哪些问题,以及如何使用它。我们还将了解它与@Primary注解的不同之处。

    码农小胖哥

扫码关注云+社区

领取腾讯云代金券