前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >100行代码实现任务队列

100行代码实现任务队列

作者头像
goodspeed
发布2020-12-25 10:45:53
4030
发布2020-12-25 10:45:53
举报
文章被收录于专栏:厉害了程序员厉害了程序员

最近刚看完python多线程,为了加深印象,按照1分钟实现“延迟消息”功能的思路,实现了一个简易版的异步队列。

高效延时消息,包含两个重要的数据结构: 1.环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组) 2.任务集合,环上每一个slot是一个Set 同时,启动一个timer,这个timer每隔1s,在上述环形队列中移动一格,有一个Current Index指针来标识正在检测的slot。 Task结构中有两个很重要的属性: (1)Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务 (2)Task-Function:需要执行的任务指针

下边是代码(代码不止100行,但是在200行内,也算100行了。)

代码语言:javascript
复制
#! -*- coding: utf-8 -*-try:    import cPickle as pickleexcept ImportError:    import pickletry:    import simplejson as jsonexcept ImportError:    import jsonimport osimport errnoimport Queueimport randomimport loggingfrom functools import wrapsfrom threading import Timer, RLock, Threadfrom time import sleep, timefrom base64 import b64encode, b64decode# json 的数据结构# tasks = {#     index: {#         cycle_num: [(func, bargs)]#     }# }logging.basicConfig(level=logging.DEBUG,
                   format='(%(asctime)-15s) %(message)s',)
tasks_file = 'tasks.json'flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY# 为了防止任务太多需要生成过多的线程,我们使用Queue 来限制生成的线程数量WORKER_NUMS = 2q = Queue.Queue(WORKER_NUMS)lock = RLock()def check_file():
   try:
       file_handle = os.open(tasks_file, flags)    except OSError as e:        if e.errno == errno.EEXIST:  # Failed as the file already exists.
           pass
       else:            raise
   else:        with os.fdopen(file_handle, 'w') as file_obj:
           file_obj.write("{}")def set_delay_task(func_name, *args, **kwargs):
   # 使用锁来保证每次只要一个线程写入文件,防止数据出错
   with lock:        with open(tasks_file, 'r+') as json_file:
           count_down = kwargs.pop('count_down', 0)
           tasks = json.load(json_file)            # 执行时间
           exec_time = int(time()) + count_down            # 循环索引
           index = str(exec_time % 3600)            # 圈数
           cycle_num = str(exec_time / 3600 + 1)
           dargs = pickle.dumps((args, kwargs))
           bargs = b64encode(dargs)
           index_data = tasks.get(index, {})
           index_data.setdefault(cycle_num, []).append((func_name, bargs))
           tasks[index] = index_data
           json_file.seek(0)
           json.dump(tasks, json_file)
           logging.debug('Received task: %s' % func_name)def get_delay_tasks():
   with open(tasks_file, 'r+') as json_file:
       tasks = json.load(json_file)        # 执行时间
       current_time = int(time())        # 循环索引
       index = str(current_time % 3600)        # 圈数
       cycle_num = str(current_time / 3600 + 1)
       current_tasks = tasks.get(index, {}).get(cycle_num, [])
   tasks = []    for func, bargs in current_tasks:
       dargs = b64decode(bargs)
       args, kwargs = pickle.loads(dargs)
       tasks.append((func, (args, kwargs)))    return tasksdef get_method_by_name(method_name):
   possibles = globals().copy()
   possibles.update(locals())
   method = possibles.get(method_name)    return methoddef create_task(task_class, func, task_name=None, **kwargs):   def execute(self):
       args, kwargs = self.data or ((), {})        return func(*args, **kwargs)   attrs = {        'execute': execute,        'func_name': func.__name__,        '__module__': func.__module__,        '__doc__': func.__doc__
   }
   attrs.update(kwargs)   klass = type(
       task_name or func.__name__,
       (task_class,),
       attrs
   )    return klassclass Hu(object):   def __init__(self, func_name=None):
       self.func_name = func_name
       check_file()    def task(self):
       def deco(func):
           self.func_name = func.__name__
           klass = create_task(Hu, func, self.func_name)
           func.delay = klass(func_name=klass.func_name).delay            @wraps(func)
           def wrapper(*args, **kwargs):
               return func(*args, **kwargs)            return wrapper        return deco    def delay(self, *args, **kwargs):
       _args = [self.func_name]
       _args.extend(args)
       Timer(0, set_delay_task, _args, kwargs).start()        return Truedef boss():
   while True:
       current_tasks = get_delay_tasks()        for func, params in current_tasks:            # Task accepted: auth.tasks.send_msg
           logging.debug('Task accepted: %s' % func)
           q.put((func, params))
       sleep(1)def worker():
   while True:
       func, params = q.get()        print 'get task: %s\n' % func
       method = get_method_by_name(func)
       args, kwargs = params        # Task auth.tasks.send_msgsucceeded in
       start_time = time()
       method(*args, **kwargs)
       end_time = time()
       logging.debug('Task %s succeeded in %s' % (str(func), end_time - start_time))
       q.task_done()def main():
   check_file()
   print('starting at:', time())    for target in (boss, worker):
       t = Thread(target=target)
       t.start()
   print('all DONE at:', time())hu = Hu()# 使用方式如下:@hu.task()def test(num):
   sleep(2)    print 'test: %s' % numif __name__ == '__main__':    for i in range(10):
       test.delay(i, count_down=random.randint(1, 10))
   main()# output(2017-03-21 15:59:20,394) Received task: test
(2017-03-21 15:59:20,396) Received task: test
(2017-03-21 15:59:20,397) Received task: test
(2017-03-21 15:59:20,398) Received task: test
(2017-03-21 15:59:20,400) Received task: test
(2017-03-21 15:59:20,401) Received task: test
(2017-03-21 15:59:20,403) Received task: test
(2017-03-21 15:59:20,404) Received task: test
(2017-03-21 15:59:20,406) Received task: test
(2017-03-21 15:59:20,408) Received task: test
get task: test(2017-03-21 15:59:21,395) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
test: 2get task: test(2017-03-21 15:59:23,399) Task test succeeded in 2.0037419796(2017-03-21 15:59:24,404) Task accepted: test
test: 1get task: test

按照1分钟实现“延迟消息”功能的思路。队列的数据结构为

代码语言:javascript
复制
{
   index: {
       cycle_num: [(func, bargs)]
   }
}

index的值为 1-3600。每小时一个循环。 cycle_num 则是 由 (时间戳 / 3600 + 1) 计算得到的值,是圈数。

每当有任务加入,我们计算出index和cycle_num 将参数和方法名写入json文件。 读取任务时,计算当前 index和cycle_num, 取出需要执行的任务,使用多线程的形式执行。

为了防止任务太多需要生成过多的线程,我们使用Queue 来限制生成的线程数量。

加锁的主要作用是防止多线程同时操作文件读写,影响数据一致性。

当然,也可以使用redis 存储队列,因为 redis 是单线程操作,可以防止多线程操作影响数据一致性的问题。 这一部分有需要的可以自己实现。

参考:

  • python线程笔记
  • 1分钟实现“延迟消息”功能
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-03-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 四月 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档