前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >每日一题:如何实现异步任务处理来解决耗时操作问题

每日一题:如何实现异步任务处理来解决耗时操作问题

作者头像
用户7685359
发布2020-08-21 19:52:24
1K0
发布2020-08-21 19:52:24
举报
文章被收录于专栏:FluentStudyFluentStudy

今天说一个在实际项目中特别实用的解决并发耗时问题的办法:异步任务处理。这里采用 redis list 结构来实现。涉及知识点: 1、redis list 结构 2、阻塞、非阻塞、同步、异步的概念 3、如何实现一个异步处理任务

实战

同步、异步、阻塞、非阻塞

首先来说同步和异步,这两个概念是针对通信双方消息传送的响应来说的,如果 A 请求 B,B 马上响应 A,这是同步,而如果 A 请求 B ,B 说好的,我已经接受任务,然后把任务交由 C,而 C 是一个专门负责处理任务的,这种模型就是异步。

而阻塞和非阻塞,通常对应于程序请求响应,比如我们常见的 IO 处理,一个进程在处理 IO 操作时,需要等待 IO 操作完成,如果等着,就是阻塞,如果这个时候它不等着,而是转身先去做别的事情,走到 IO 操作完成再回来处理,这种就是非阻塞。

在网上看到一个比较好的关于这两组概念的区别,即同步和异步是描述行为的,即我是如果去处理当有类似阻塞这种情况的请求,比如 IO 操作,比如耗时操作,而阻塞和非阻塞描述的是一种状态。

redis 列表介绍

首先在面试中经常会问一个问题:redis 支持哪几种数据结构?

网上一般的回答是五种,分别是 string、list、set、sorted set、hash,在官方文档还有其他三种,Bitmaps、HyperLogLogs,还有 stream,参考链接如下:

代码语言:javascript
复制
https://redis.io/topics/data-types-intro

其中 stream 类型是在 5.0 中引入的。这里不详细介绍每种的用法,感兴趣的读者可以自行查看官方文档。因为要用到 list 这种数据结构,所以简单介绍下。

redis 的 list 结构,也可以称为是队列,它和 Python 中的 list 相似,它可以按照插入的顺序存储数据,但不同的是 Python 中的 list 底层是数组实现的,而 redis 中的 list 底层是链表实现的,所以 redis 中的 list 无论是在头部还是在尾部插入元素,时间复杂度都是常量级别的。通常的操作命令:

代码语言:javascript
复制
> rpush mylist 1 2 3 4 5 "foo bar"
(integer) 9
> lrange mylist 0 -1
1) "first"
2) "A"
3) "B"
4) "1"
5) "2"
6) "3"
7) "4"
8) "5"
9) "foo bar"

当然,与之对应的还有 lpush, lpop 等,所有操作指令可以在以下链接进行查看:

代码语言:javascript
复制
https://redis.io/commands#list

数据结构存在的意义是根据不同的场景可以达到更高的效率,那 list 结构有什么作用呢?通常有以下两个作用:

1、存储最近的数据,比如用一个列表存储用户访问的记录,每次访问时插入,而如果需要取最近访问的 10 条,只需要使用 lrange(key, 0, 9) 来获取即可

2、存储任务,即作为一个要处理的任务列表。也就是我们这篇文章里要介绍的异步任务,它去存储需要异步处理的任务列表

此外,除了普通的列表,还有两种特殊列表的存在,一种是限制列表,即限制列表的长度,比如我们只需要记录用户最近访问的30条记录,这样做的好处是保证数据不会无限增长,从业务需求上来说也是合理的,因为比较老的数据已经没有太大的价值(当然如果非要保存这些旧的数据,也可以考虑持久化)。我们可以使用 ltrim 指令来完成,示例如下:

代码语言:javascript
复制
> rpush mylist 1 2 3 4 5
(integer) 5
> ltrim mylist 0 2
OK
> lrange mylist 0 -1
1) "1"
2) "2"
3) "3"

另外一种是阻塞列表,即使用 brpop 或者 blpop 。我们都知道 redis 是单线程的,通常是一个命令执行完,才会去执行下一个命令,那试想这样一种情况,如果一个 redis 列表一直为空,我们一直循环去判断它有没有数据,这样是不是很浪费资源,做了很多无用功,所以通常我们需要在读数据时,判断如果没有数据就阻塞几秒,然后再去读数据。这里可以说得有些难理解,后面我们讲了异步实现代码之后,就好理解了。我们先来看一下 brpop 使用示例:

代码语言:javascript
复制
# 如果没有数据,停 5s
> brpop tasks 5
1) "tasks"
2) "do_something"

异步任务处理代码实现

实现的原理其实也很简单,当客户端发起请求,服务端接收到请求,可能需要完成很多复杂的逻辑,有些甚至是包括一些耗时的操作,这个时候我们可以将这个耗时的操作当作一个任务,把它塞到 redis 的 list 中,而在另一边开启一个常驻脚本,即 while True 的循环,一直去从 list 中读取数据(管理这种常驻脚本,也可以使用 pm2,或者其他工具,这里不做介绍,简单地直接 run 一个 python 脚本)。

这里我们模拟服务端接收到请求后,往 redis list 里塞数据,实现代码如下:

代码语言:javascript
复制
# -*- coding: utf-8 -*-
"""
@Time    : 2019/7/6 下午10:51
@Author  : yrr
@File    : task_client.py
@Desc    : push task to redis list
"""

# need install redis, pip install redis
import redis
import json


def main():
    conn = redis.Redis(host='localhost', port=6379, db=0)
    # Redis Lists are simply lists of strings, sorted by insertion order.
    # so we need json.dumps
    conn.lpush("async_task_list", json.dumps({
        # 参数可自定义规则
        'name': 'test',
        'data': {'name': 'demon'}
    }))


if __name__ == '__main__':
    main()

而异步任务脚本实现如下:

代码语言:javascript
复制
# -*- coding: utf-8 -*-
"""
@Time    : 2019/7/6 下午10:57
@Author  : yrr
@File    : task_async.py
@Desc    : task async handle
"""
import redis
import json


def handle_test(data):
    print("has get the data by redis list...")
    print("the name is %s" % data['name'])


TASKS_FUNC_MAP = {'test': handle_test}


def main():
    conn = redis.Redis(host='localhost', port=6379, db=0)
    while True:
        # 需要保证异常不影响正常异步任务
        # 所以需要用 try 来控制
        try:
            # 这里需要用到阻塞式列表,因为一直 for 循环会消耗cpu资源
            # 可以设置阻塞时间
            # 这也是一个面试考点
            task = conn.brpop('async_task_list', 2)
            if not task:
                continue
            _, data = task
            print("curent handle data: %s" % (data,))
            data = json.loads(data)
            fname = TASKS_FUNC_MAP.get(data['name'])
            if not fname:
                # 通常这里需要报错,暂时处理为打印错误信息
                print('not func handle, please check')
                continue
            fname(data)
        except Exception as e:
            # 这里涉及到错误任务的重试机制,可以根据业务需求来做
            # 这里暂时不处理,只打印出异步信息
            print(e)


if __name__ == '__main__':
    main()

实现效果如下图:

你点的每个赞,我都认真当成了喜欢

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-07-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FluentStudy 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 同步、异步、阻塞、非阻塞
  • redis 列表介绍
  • 异步任务处理代码实现
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档