前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >码农技术炒股之路——任务管理器

码农技术炒股之路——任务管理器

作者头像
方亮
发布2019-01-16 15:50:50
7810
发布2019-01-16 15:50:50
举报
文章被收录于专栏:方亮方亮

        系统任务和普通任务都是通过任务管理器调度的。它们的区别是:系统任务在程序运行后即不会被修改,而普通任务则会被修改。(转载请指明出于breaksoftware的csdn博客)

        为什么要有这样的设计?因为我希望它是一个可以不用停止服务就可以更新相关配置的系统。比如我们现在要加一个普通任务,我们只要修改下普通任务配置文件即可。再比如我们需要修改数据库中表结构,我们也不用停止服务修改代码来保证数据格式的一致性。

        我们程序需要知道配置文件是否被修改。如何去做?一种方法是借用一些系统方法监听相应配置文件的修改,一旦文件有变化,马上通知我们的主程序去处理。另一种则是采用轮询检查机制,即定期去生成差异结果。为了不让这个系统更加复杂,我选择后者。而它就是我所谓的系统任务。

        不管是系统任务还是普通任务,实现的类都要继承于job_base

代码语言:javascript
复制
from abc import ABCMeta,abstractmethod
class job_base:
    __metaclass__ = ABCMeta
    @abstractmethod
    def run(self):
        pass

        有这个限制主要是为了保证每个任务都是run方法。调度框架将执行该方法以完成任务执行。

        在《码农技术炒股之路——架构和设计》一文中,介绍了我们将基于APScheduler实现任务调度功能。首先我们需要启动BackgroundScheduler对象

代码语言:javascript
复制
from apscheduler.schedulers.background import BackgroundScheduler

@singleton
class job_center():
    def __init__(self):
        self._sched = None
        self._job_conf_path = ""
        self._job_id_handle = {}
        self._static_job_id_handle = {}
    
    def start(self):
        self._sched = BackgroundScheduler()
        self._sched.start()

        当我们需要加入任务时,则调用下面这个方法

代码语言:javascript
复制
    def add_jobs(self, jobs_info, is_static = False):
        if None == self._sched:
            LOG_WARNING("job center must call start() first")
            return

        for (job_name,job_info) in jobs_info.items():
            if is_static and job_name in self._static_job_id_handle.keys():
                continue
            job_type = job_info["type"]
            class_name = job_info["class"]
            job_handle = self._get_obj(class_name)
            if is_static:
                self._static_job_id_handle[job_name] = job_handle
            else:
                self._job_id_handle[job_name] = job_handle
            cmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"
            params = self._join_params(job_info)
            if 0 != len(params):
                cmd += " , "
                cmd += params
            cmd += ")"
            #print cmd
            eval(cmd)

        jobs_info保存的是任务配置文件中任务信息,我们看个样例

代码语言:javascript
复制
[update_share_base_info]
type=cron
class=update_stock_base_info
day_of_week=1-5
hour=9
minute=30
second=10
timezone = Asia/Shanghai

        这个配置中除了type和class,其他都是APScheduler框架中add_job方法中的参数。上面配置意思是:以上海时间,从周一到周五,早上9点30分10秒执行一次。

        add_jobs中通过class字段,获取该class对应的一个对象

代码语言:javascript
复制
    def _get_obj(self, _cls_name):  
        _packet_name = _cls_name  
        _module_home = __import__(_packet_name,globals(),locals(),[_cls_name])
        obj =  getattr(_module_home,_cls_name)  
        class_obj = obj()
        return class_obj

        这儿又要提到我之前特别强调过的单例使用方式。经过测试,《码农技术炒股之路——配置管理器、日志管理器》中单例的实现可以保证上面这个方法获取的是同一个对象,而网上其他单例模式则不行。         获取对象后,我们要组装出要执行的命令。cmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"中job_handle就是上面获取的对象,而run则是每个job都要有的方法。这也是为什么要求每个任务类都要继承于job_base的原因。

        之后调用_join_params将配置文件中其他信息组装成参数拼接出完整命令

代码语言:javascript
复制
    def _join_params(self, job_info):
        params = ""
        param = ""
        job_type = job_info["type"]
        for key in job_info.keys():
            if key in conf_keys.job_conf_info_dict[job_type]:
                if  0 != len(params):
                    params += ' , '
                value = job_info[key]
                if value.isdigit():
                    param = key + " = " + value
                else:
                    param = key + " = '" + value + "'"
                if 0 != len(param):
                    params += param
        return params

        如此我们配置中的任务就会被加入到APScheduler调度队列中。

        我们再看下如何删除一个任务

代码语言:javascript
复制
    def remove_jobs(self, jobs_info):
        if None == self._sched:
            LOG_WARNING("job center must call start() first")
            return

        for job_name in jobs_info.keys():
            self._sched.remove_job(job_name)
            self._job_id_handle.pop(job_name)

        第7行通过任务名称在APScheduler中把任务删除。第8行将任务对应的对象从列表中删除。为什么要使用_job_id_handle去保存这些任务对象呢?因为如果不在一个更大的生命周期内保存它,它就会被认为是一个局部变量,从而被释放,导致之后APScheduler再也调用不了它。         我们看个管理普通任务的系统任务代码

代码语言:javascript
复制
@singleton
class j_load_job_conf(job_base):
    def __init__(self):
        self._pre_jobs_info = {}
        self._frame_conf_inst = scheduler_frame_conf_inst()
        self._job_center = job_center()

    def run(self):
        section_name = "strategy_job"
        option_name = "conf_path"
        if False == self._frame_conf_inst.has_option(section_name, option_name):
            LOG_WARNING("no %s %s" % (section_name, option_name))
            return
        conf_path = self._frame_conf_inst.get(section_name, option_name)
        LOG_DEBUG("Load %s %s %s" % (section_name, option_name, conf_path))
        
        job_conf_parser_obj = job_conf_parser()
        jobs_info = job_conf_parser_obj.parse(conf_path)
        self._execute_jobs(jobs_info)

    def _execute_jobs(self, jobs_info):
        add_dict = {}
        remove_dict = {}
        modify_dict = {}

        frame_tools.dict_diff(jobs_info, self._pre_jobs_info, add_dict, remove_dict, modify_dict)

        add_jobs_info = dict(add_dict, **modify_dict)

        remove_jobs_info = {}
        for item in modify_dict.keys():
            remove_jobs_info[item] = self._pre_jobs_info[item]

        LOG_INFO("add jobs %s" % (json.dumps(add_jobs_info)))
        LOG_INFO("remove jobs %s" % (json.dumps(remove_jobs_info)))
        
        if 0 == len(add_jobs_info) and 0 == len(remove_jobs_info):
            return

        self._pre_jobs_info = jobs_info

        self._job_center.remove_jobs(remove_jobs_info)
        self._job_center.add_jobs(add_jobs_info)

        run方法将会定期执行。它会从固定目录读取普通任务配置文件信息。然后在_execute_jobs方法中,通过和上一次读取的任务信息对比,生成三个字典:需要删除的任务、需要新增的任务和需要修改的任务。需要修改的任务将变成先删除后新增的方式实现修改。所以最后操作的将是两个字段信息。         普通任务本文就不介绍了,之后介绍的每个抓取和离线计算业务都是普通任务。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017年05月12日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档