前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >带你认识 flask 后台作业

带你认识 flask 后台作业

作者头像
公众号---人生代码
发布2019-12-10 18:56:52
2.8K0
发布2019-12-10 18:56:52
举报
文章被收录于专栏:人生代码人生代码人生代码

01

任务类别简介

任务进程为后台作业提供了一个便捷的解决方案。Worker过程独立于应用程序运行,甚至可以位于不同的系统上。应用程序和worker之间的通信是通过消息完成的。通过与物理相互作用来监视其进度。下图展示了一个典型的实现:

Python中最流行的任务类别是Celery。这是一个相当复杂的重叠,它有很多选项并支持多个消息示例。另一个流行的Python任务位置是Redis Queue(RQ),它牺牲了一些替代,,仅支持Redis消息本身,但作为交换,它的建立要比Celery简单长度

Celery和RQ都非常适合在Flask应用程序中支持后台任务,所以我可以选择更简单的RQ。不过,用Celery实现相同的功能其实也不难。如果您对Celery更有吸引力,可以阅读我的博客中的将Celery与Flask文章一起使用

02

使用RQ

RQ是一个标准的Python三方重叠,用pip安装:

(venv) $ pip install rq
(venv) $ pip freeze > requirements.txt

正如我前面提到的,应用和RQ worker之间的通信将在Redis消息中执行,因此你需要运行Redis服务器。有很多途径来安装和运行Redis服务器,可以下载其内核并执行编译和安装。如果你使用的是Windows中,微软在此处维护了Redis的的安装程序。在Linux的上,你可以通过操作系统的软件包管理器安装Redis的。Mac OS X的用户可以运行brew install redis,使用然后redis-server命令手动启动服务

除了确保服务正在运行并可以识别RQ访问之外,你不需要与Redis进行其他交互

03

创建任务

一个任务,不过是一个Python函数而已。以下是一个示例任务,我将其引入一个新的app / tasks.py模块:

app / tasks.py:示例后台任务

import time

def example(seconds):
    print('Starting task')
    for i in range(seconds):
        print(i)
        time.sleep(1)
    print('Task completed')

该任务将秒数作为参数,然后在该时间量内等待,并每秒打印一次计数器

04

运行 RQ 工人

任务准备就绪,可以通过rq worker来启动一个worker进程了:

(venv) $ rq worker microblog-tasks
18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.1
18:55:06 Cleaning registries for queue: microblog-tasks
18:55:06
18:55:06 *** Listening on microblog-tasks...

microblog-tasks如果您想启动多个worker来扩展量子,您只需要运行rq worker来生成更多连接到同一个模型的进程,就可以使用Worker进程现在连接到了Redis,并在称为的上面上查看可能的分配给它的任何作业。在生产环境中,您可能希望至少运行可用的CPU数量的工人。。然后,,当作业出现在特定位置时,任何可用的worker进程都可以获取它

05

执行任务

现在打开第二个终端窗口并激活虚拟环境。我将使用shell会话来启动worker中的example()任务:

>>> from redis import Redis
>>> import rq
>>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://'))
>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.get_id()
'c651de7f-21a8-4068-afd5-8b982a6f6d32'

如果采用的是Redis服务器运行在不同的主机或端口号上,则使用RQ的Queue类表示从应用程序端看到的任务类型。Redis则需要使用其他URL。

队列的enqueue()方法用于将作业添加到队列中。第一个参数是要执行的任务的名称,可直接传入函数对象或导入字符串。我发现传入字符串更加方便,因为不需要在应用程序对enqueue()预期的任何剩余参数将被传递给worker中运行的函数。

enqueue()只要进行了调用,运行着RQ worker的终端窗口上就会出现一些活动。你会看到example()函数正在运行,并且连续打印一次计数器。同时,你的其他终端不会被分开,你可以继续在shell在上面的示例中,我调用job.get_id()方法来获取分配给任务的唯一标识符。你可以尝试使用另一个有趣表达式来检查worker上的函数是否已完成:

>>> job.is_finished
False

如果你像我在上面的示例中那样传递了23,那么函数将运行约23秒。在那之后,job.is_finished表达式将True转化为。就是这么简单,炫酷否?

一旦函数完成,worker又回到等待作业的状态,所以如果你想进行更多的实验,你可以用不同的参数重复执行enqueue()调用。),但最终会被删除。这很重要,任务类别不保留已执行作业的历史记录

06

报告任务进度

通常,对于长期运行的任务,您需要将一些进度信息提供给应用程序,从而可以将其显示给用户。RQ通过使用作业对象的meta属性来支持这一点。让我重新编写example()任务来编写进度报告:

app / tasks.py::带进度的示例后台任务

import time
from rq import get_current_job

def example(seconds):
    job = get_current_job()
    print('Starting task')
    for i in range(seconds):
        job.meta['progress'] = 100.0 * i / seconds
        job.save_meta()
        print(i)
        time.sleep(1)
    job.meta['progress'] = 100
    job.save_meta()
    print('Task completed')

这个新版本的example()使用RQ的get_current_job()函数来获取一个作业实例,该实例与提交任务时返回给应用程序的实例类似。作业对象的meta属性是一个字典,任务可以编写任何想要的与应用程序通信的自定义数据。在此示例中,我编写了progress,表示完成任务的百分比。每次进程更新时,我都调用job.save_meta()指示RQ将数据写入Redis,应用程序可以在其中找到它。

在应用程序方面(目前只是一个Python shell),我可以运行此任务,然后监视进度,如下所示:

>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.meta
{}
>>> job.refresh()
>>> job.meta
{'progress': 13.043478260869565}
>>> job.refresh()
>>> job.meta
{'progress': 69.56521739130434}
>>> job.refresh()
>>> job.meta
{'progress': 100}
>>> job.is_finished
True

如您所见,在另一侧,meta属性可以被重新读。需要调用refresh()方法来从Redis更新内容

07

任务的数据库表示

对于Web应用程序,情况会变得更复杂一些,因为一旦任务传递请求的处理而启动,该请求随即结束,而该任务因为我希望应用程序跟踪每个用户正在运行的任务,所以我需要使用数据库表来维护状态。你可以在下面看到新的Task模型实现:

app / models.py:任务模型

# ...
import redis
import rq

class User(UserMixin, db.Model):
    # ...
    tasks = db.relationship('Task', backref='user', lazy='dynamic')

# ...

class Task(db.Model):
    id = db.Column(db.String(36), primary_key=True)
    name = db.Column(db.String(128), index=True)
    description = db.Column(db.String(128))
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    complete = db.Column(db.Boolean, default=False)

    def get_rq_job(self):
        try:
            rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis)
        except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):
            return None
        return rq_job

    def get_progress(self):
        job = self.get_rq_job()
        return job.meta.get('progress', 0) if job is not None else 100

模型这个状语从句:以前的模型有一个有趣的区别的英文id主键字段的英文字符串类型,而不是整数类型。这是因为对于这个模型,我不会依赖数据库自己的主键生成,而是使用由RQ生成的作业标识符。

该模型将存储符合任务命名规范的名称(会传递给RQ),适用于向用户显示的任务描述,该任务的所属用户的关系以及任务是否已完成的布尔值。complete字段的目的是将正在运行的任务与已完成的任务分开,因为运行中的任务需要特殊处理才能显示最新进度。

get_rq_job()辅助方法可以用给定的任务ID加载RQ Job实例。的英文这通过Job.fetch()完成的,它会从Redis的存在中的数据中加载Job实例。get_progress()方法建立在get_rq_job()的基础之上,并返回任务的进度百分比。该方法做一些有趣的假设,如果模型中的作业ID不存在于RQ变量中,则表示作业已完成和数据已过期并已从该中删除,因此在这种情况下返回的百分比为100。同时,如果job存在,但'meta'属性中找到进度相关的信息,那么可以安全地进行该作业计划运行,但还没有启动,所以在这种情况下进度是0。

改进更改数据库,需要生成新的迁移,然后升级数据库:

(venv) $ flask db migrate -m "tasks"
(venv) $ flask db upgrade

新模型也可以添加到shell一部分中,盔甲在shell会话中访问它时无需导入:

microblog.py:添加任务模型到shell上下文中

from app import create_app, db, cli
from app.models import User, Post, Message, Notification, Task

app = create_app()
cli.register(app)

@app.shell_context_processor
def make_shell_context():
    return {'db': db, 'User': User, 'Post': Post, 'Message': Message,
            'Notification': Notification, 'Task': Task}

08

将RQ与 Flask 集合在一起

Redis服务的连接URL需要添加到配置中:

class Config(object):
    # ...
    REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'

与往常一样,Redis连接URL将来自环境变量,如果该变量未定义,则替换为该服务在当前主机的端口上运行并使用URL。

应用工厂函数将负责初始化Redis和RQ:

app / __ init__.py:整合RQ

# ...
from redis import Redis
import rq

# ...

def create_app(config_class=Config):
    # ...
    app.redis = Redis.from_url(app.config['REDIS_URL'])
    app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)

    # ...

app.task_queue将成为提交任务的重量。将附加到应用上会提供很大的便利,因为我可以在应用的任何地方使用current_app.task_queue来访问它。为了方便应用的任何部分提交或检查任务,我可以在User模型中创建一些辅助方法:

app / models.py:用户模型中的任务辅助方法

# ...

class User(UserMixin, db.Model):
    # ...

    def launch_task(self, name, description, *args, **kwargs):
        rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id,
                                                *args, **kwargs)
        task = Task(id=rq_job.get_id(), name=name, description=description,
                    user=self)
        db.session.add(task)
        return task

    def get_tasks_in_progress(self):
        return Task.query.filter_by(user=self, complete=False).all()

    def get_task_in_progress(self, name):
        return Task.query.filter_by(name=name, user=self,
                                    complete=False).first()

launch_task()方法是将任务提交到RQ,然后将其添加到数据库中。name参数是函数名称,如app / tasks.py中所定义的那样。提交给RQ时,该函数已app.tasks.预先添加到该名称中以构建符合规范的函数名称。description参数是对呈现给用户的任务的友好描述。对于导出用户动态的函数,我将名称设置为export_posts,将描述设置为Exporting posts...。其余参数将传递给任务函数。launch_task()函数首先调用队列的enqueue()方法来提交作业。返回的作业对象包含由RQ分配的任务ID,因此我可以使用它在我的数据库中创建相应的Task对象

请注意,launch_task()将新的任务对象添加到会话中,但不会发出提交。替代,最好在更高层次函数中的数据库会话上进行操作,因为它允许您在替代事务中组合由替代这不是一个严格的规则,并且,在本章后面的子函数中也会存在一个例外的提交

get_tasks_in_progress()方法返回该用户未完成任务的列表。稍后您会看到,我使用此方法在将有关正在运行的任务的信息渲染到用户的页面中

最后,get_task_in_progress()是上一个方法的简化版本并返回指定的任务。我阻止用户同时启动两个或多个相同类型的任务,因此在启动任务之前,可以使用此方法来确定前一个任务是否还在运行

09

利用 RQ 任务发送电子邮件

不要认为本节偏离主题,我在上面说过,当后台完成任务完成时,将使用包含所有用户动态的JSON文件向用户发送电子邮件。我在第十章中生成的电子邮件功能需要通过两种方式进行扩展。首先,我需要添加对文件附件的支持,刹车我可以附加JSON文件。串行,send_email()函数总是使用后台线程初始化发送电子邮件。当我要从后台任务发送电子邮件时(已经是初步的了),基于线程的二级后台任务没有什么意义,所以我需要同时支持同步和异步电子邮件的发送。

幸运的是,Flask-Mail支持附件,所以我需要做的就是扩展send_email()函数的控件关键字参数,然后在Message对象中配置它。选择在前台发送电子邮件时,我只需要添加一个sync=True的关键字参数即可:

app / email.py:发送带附件的邮件

# ...

def send_email(subject, sender, recipients, text_body, html_body,
               attachments=None, sync=False):
    msg = Message(subject, sender=sender, recipients=recipients)
    msg.body = text_body
    msg.html = html_body
    if attachments:
        for attachment in attachments:
            msg.attach(*attachment)
    if sync:
        mail.send(msg)
    else:
        Thread(target=send_async_email,
            args=(current_app._get_current_object(), msg)).start()

消息类的attach()方法接受三个定义附件的参数:文件名,媒体类型和实际文件数据。文件名就是收件人看到的与附件关联的名称。媒体类型定义了这种附件的类型,这有助于电子邮件读者适当地渲染它。例如,如果您发送为image/png媒体类型,则电子邮件阅读器会知道该附件是一个图像,在这种情况下,它可以显示它。对于用户动态数据文件,我将使用JSON格式,该格式使用application/json媒体类型。最后一个参数包含附件内容的字符串或字节序列。

简单来说,send_email()attachments参数将成为一个元组列表,每个元组将有三个元素对应于attach()的三个参数。因此,我需要转换列表中的每个元素作为参数发送给attach()。在Python中,如果你想将列表或元组中的每个元素作为参数传递给函数,你可以使用func(*args)将这个列表或元祖解包成函数中的多个参数,而不必枯燥地一个个地传递,如func(args[0], args[1], args[2])。例如,如果如果没有,调用将会引发一个参数,即列表。你有一个列表args = [1, 'foo']func(*args)将会传递两个参数,就和你调用func(1, 'foo')一样。*args

如电子邮件的同步发送,我需要做的就是,当syncTrue的时候恢复成调用mail.send(msg)

10

任务助手

尽管我上面使用的example()任务是一个简单的独立函数,但已添加用户动态的函数却需要应用中具有的某些功能,例如访问数据库和发送电子邮件。因为这将在单独的进程中运行,所以我需要初始化Flask-SQLAlchemy和Flask-Mail,而Flask-Mail又需要Flask应用程序实例以从中获取它们的配置。因此,我将在app / tasks.py模块的顶部添加Flask应用程序实例和应用程序:

app / tasks.py:创建应用及其自身

from app import create_app

app = create_app()
app.app_context().push()

当使用flask命令时,根目录中的microblog.py模块创建应用实例,但RQ worker实际上却一无所知,所以当任务函数时,应用程序在此模块中创建,因为这是RQ worker要导入的唯一模块。你已经在好几个地方app.app_context()看到了方法,按下一个使其使应用成为“当前”的应用实例,这样一来Flask-SQLAlchemy等插件才可以使用current_app.config获取它们的配置。。根本没有,current_app表达式会返回一个错误。

然后我开始考虑如何在这个函数运行时报告进度。另外通过job.meta字典传递进度信息之外,我还想将通知推送给客户端,刹车自动动态更新完成百分比。逐步,我将使用我在第二十一章中生成的通知机制。更新将以与未读消息徽章非常类似的方式工作。当服务器渲染模板时,则包含从job.meta获得的“静态”进度信息,但一旦页面置于客户端的浏览器中,通知将使用通知来动态更新百分比。由于通知的原因,更新正在运行的任务的进度将比上一个示例中的操作稍微多一些,所以我将创建一个专用于更新进度的包装函数:

app / tasks.py:设置任务进度

from rq import get_current_job
from app import db
from app.models import Task

# ...

def _set_task_progress(progress):
    job = get_current_job()
    if job:
        job.meta['progress'] = progress
        job.save_meta()
        task = Task.query.get(job.get_id())
        task.user.add_notification('task_progress', {'task_id': job.get_id(),
                                                     'progress': progress})
        if progress >= 100:
            task.complete = True
        db.session.commit()

任务导出可以调用_set_task_progress()来记录进度百分比。函数该首先将百分比写入job.meta字典搜索并将其保存到Redis的,然后从数据库加载相应的任务对象,使用并task.user已有的add_notification()方法将通知推送给请求该任务的用户。通知将被命名为task_progress,并且伴随关联的数据将成为具有两个关联的字典:任务标识符和进度数值。稍后我将添加JavaScript代码来处理这种新的通知类型

该函数查看进度来确认任务函数是否已完成,并在这种情况下下更新数据库中任务对象的complete属性。数据库提交调用通过add_notification()添加的任务和通知对象都立即保存到数据库。任务,确保不执行任何数据库更改,因为执行本次调用父父的更改也写入数据库

11

实现导出任务

现在所有的准备工作已经完成,可以开始编写导出函数了。这个函数的高层结构如下:

app / tasks.py:导出用户动态通用结构

def export_posts(user_id):
    try:
        # read user posts from database
        # send email with data to user
    except:
        # handle unexpected errors

请求处理器中的应用程序可以防止意外错误,因为Flask自身捕获异常,然后将其整个任务包装在try / except中。将运行在由RQ控制的单独前进中,而不是烧瓶,因此如果发生任何意外错误,任务将中止,RQ将向控制台显示错误,然后返回等待新的作业。worker的输出或将其记录到文件中,否则将永远不会发现有错误。

让我们从上面带有注释的三部分中最简单的错误处理部分开始梳理:

app / tasks.py:更新用户动态错误处理

import sys
# ...

def export_posts(user_id):
    try:
        # ...
    except:
        _set_task_progress(100)
        app.logger.error('Unhandled exception', exc_info=sys.exc_info())

发生意外错误时,我将通过将进度设置为100%来将任务标记为完成,然后使用Flask应用程序中的日志记录器对象记录错误以及如何跟踪信息(调用sys.exc_info()来获得)。记录器来记录错误的好处在于,你可以观察到你为瓶应用实现的任何日志记录机制。例如,在第七章中,我配置了要发送到管理员电子邮件地址的错误。只要使用app.logger,我也可以得到这些错误信息

接下来,我将编写实际的起始代码,它只需发出一个数据库查询并在循环中遍历结果,随之而来的累积在字典中:

app / tasks.py:从数据库读取用户动态

import time
from app.models import User, Post

# ...

def export_posts(user_id):
    try:
        user = User.query.get(user_id)
        _set_task_progress(0)
        data = []
        i = 0
        total_posts = user.posts.count()
        for post in user.posts.order_by(Post.timestamp.asc()):
            data.append({'body': post.body,
                         'timestamp': post.timestamp.isoformat() + 'Z'})
            time.sleep(5)
            i += 1
            _set_task_progress(100 * i // total_posts)

        # send email with data to user
    except:
        # ...

时间格式将采用ISO 8601标准。我使用的Python的datetime对象不存储时区,因此在以ISO格式导出时间后,我添加了'Z',它表示UTC

我维护了一个计数器i,并且在进入循环之前还需要发出一个额外的数据库查询,查询total_posts导致用户动态的总数。使用了itotal_posts,在每个循环迭代我都可以使用从0到100的数字来更新任务进度

您可能会好奇我为什么会在每个循环time.sleep(5)迭代中加入调用。最终是我想要延长增量所需的时间,刹车在用户动态不多的情况下也可以方便地查看到逐步进度的增长

下面是函数的最后部分,将会带上data附件发送邮件给用户:

app / tasks.py:发送带用户动态的邮件给用户

import json
from flask import render_template
from app.email import send_email

# ...

def export_posts(user_id):
    try:
        # ...

        send_email('[Microblog] Your blog posts',
                sender=app.config['ADMINS'][0], recipients=[user.email],
                text_body=render_template('email/export_posts.txt', user=user),
                html_body=render_template('email/export_posts.html', user=user),
                attachments=[('posts.json', 'application/json',
                              json.dumps({'posts': data}, indent=4))],
                sync=True)
    except:
        # ...

只是其实对send_email()函数的调用。附件被定义为一个元组,其中有三个元素被传递给瓶邮件的Message对象的attach()方法。元组中的第三个元素是附件内容,它是用Python中的json.dumps()函数生成的。

这里引用了一对新模板,它们以纯文本和HTML格式提供电子邮件正文的内容。这是文本模板的内容:

app / templates / email / export_posts.txt:更新用户动态文本邮件模板

Dear {{ user.username }},

Please find attached the archive of your posts that you requested.

Sincerely,

The Microblog Team

这是HTML版本的邮件模板:

app / templates / email / export_posts.html:更新用户动态HTML邮件模板

<p>Dear {{ user.username }},</p>
<p>Please find attached the archive of your posts that you requested.</p>
<p>Sincerely,</p>
<p>The Microblog Team</p>

12

应用中的导出功能

剩下的就是将这个功能连接到应用,刹车用户发起请求并通过电子邮件发送用户动态给他们

下面是新的export_posts视图函数:

app / main / routes.py:导出用户动态路由和视图函数

@bp.route('/export_posts')
@login_required
def export_posts():
    if current_user.get_task_in_progress('export_posts'):
        flash(_('An export task is currently in progress'))
    else:
        current_user.launch_task('export_posts', _('Exporting posts...'))
        db.session.commit()
    return redirect(url_for('main.user', username=current_user.username))

该功能首先检查用户是否有未完成的任务,并在这种情况下只是闪现消息。对同一用户同时执行两个发起任务是没有意义的,可以避免。我可以使用前面实现的get_task_in_progress()方法来检查这种情况

如果一个用户没有正在运行的导出任务,则调用launch_task()来启动它。第一个参数是将传递给RQ worker的函数的名称,改为为app.tasks.。第二个参数只是一个友好的文本描述,将会显示给用户。这两个值都会被写入数据库中的任务对象。该函数以重定向到用户个人主页结束

我认为最合适的地方是在用户个人主页,只有在用户查看他们自己的主页时,链接在“编辑个人资料”链接下面显示:

app / templates / user.html:用户个人主页的导出链接

...
<p>
    <a href="{{ url_for('main.edit_profile') }}">
        {{ _('Edit your profile') }}
    </a>
</p>
{% if not current_user.get_task_in_progress('export_posts') %}
<p>
    <a href="{{ url_for('main.export_posts') }}">
        {{ _('Export your posts') }}
    </a>
</p>
...
{% endif %}

此链接的渲染是有条件的,因为我不希望它在用户已经有发起任务执行时出现。

如果你想尝试一下,你可以按如下方式启动应用和RQ worker:

  • 确保Redis正在运行
  • :一个终端窗口,启动至少一个RQ worker实例。本处你可以运行命令rq worker microblog-tasks
  • 再打开另一个终端窗口,使用flask run (记得先设置 FLASK_APP变量)命令启动Flask应用

13

进度通知

为了完善这个功能,我想在后台任务运行时提醒用户任务完成的进度。在浏览Bootstrap组件选项时,我决定在导航栏的下方使用一个Alert组件。横条。我用蓝色的警报框来渲染闪现的消息。现在我要添加一个绿色的警报框来显示任务进度。样式如下:

app / templates / base.html:基础模板中的导出进度Alert组件

...
{% block content %}
    <div class="container">
        {% if current_user.is_authenticated %}
        {% with tasks = current_user.get_tasks_in_progress() %}
        {% if tasks %}
            {% for task in tasks %}
            <div class="alert alert-success" role="alert">
                {{ task.description }}
                <span id="{{ task.id }}-progress">{{ task.get_progress() }}</span>%
            </div>
            {% endfor %}
        {% endif %}
        {% endwith %}
        {% endif %}
        ...
{% endblock %}
...

外部条件在用户未登录时跳过所有与Alert相关的标记。而对于已登录的用户,我通过称为创建的get_tasks_in_progress()方法来获取当前的任务列表。在当前版本的应用中,我最多只能得到一个结果,因为我可以多个替换任务同时执行,但将来我可能要支持可以共存的其他类型的任务,所以以通用的方式渲染Alert可以节省我以后的时间。

对于每项任务,我都会在页面上渲染一个警报元素。警报的颜色由第二个CSS样式控制,本处是alert-success,而在闪现消息是alert-info,引导文档所有游戏有关警报的HTML结构的详细信息。警报文本包括存储在Task模型中的description细分,后面跟着完成百分比。

被百分比封装在具有id属性的<span>元素中。原因是我要在收到通知时用的JavaScript刷新百分比。我给任务ID附加末尾-progress来构造id属性。当有通知到达时,通过其中的任务ID,我可以很容易地使用#<task.id>-progress选择器找到正确的<span>元素来更新。

如果您此时进行尝试,则每次导航到新页面时都会看到“静态”的进度更新。您可以注意到,在启动导出任务后,您可以自由导航到应用程序的不同页面,正在运行的任务的状态始终都会展示出来

为了对span>元素的百分比的动态更新做准备,我将在JavaScript端编写一个辅助函数:

app / templates / base.html:动态更新任务进度的辅助函数

...
{% block scripts %}
    ...
    <script>
        ...
        function set_task_progress(task_id, progress) {
            $('#' + task_id + '-progress').text(progress);
        }
</script>
    ...
{% endblock %}

这个函数接受一个任务id和一个进度值,并使用jQuery为这个任务定位<span>元素,转换为新进度作为其内容写入。无需验证页面上是否存在该元素,因为如果没有找到该元素,jQuery将不会执行任何操作。

app / tasks.py中的_set_task_progress()函数每次更新进度时调用add_notification(),就会产生新的通知。而我在第二十一章明智地以完全通用的方式实现了通知功能。所以当浏览器定期向服务器发送时通知更新请求时,浏览器会获得通过add_notification()方法添加的任何通知

但是,这些JavaScript代码只能识别具有unread_message_count名称的那些通知,并忽略其余部分。我现在需要做的是扩展该函数,通过调用我上面定义的set_task_progress()函数来处理task_progress通知。以下是处理通知更新版本JavaScript代码:

app / templates / base.html:通知处理器

for (var i = 0; i < notifications.length; i++) {
    switch (notifications[i].name) {
        case 'unread_message_count':
            set_message_count(notifications[i].data);
            break;
        case 'task_progress':
            set_task_progress(
                notifications[i].data.task_id,
                notifications[i].data.progress);
            break;
    }
    since = notifications[i].timestamp;
}

现在我需要处理两个不同的通知,我决定用一个switch语句替换检查unread_message_count通知名称的if语句,该语句包含我现在需要支持的每个通知。如果你对“ C”系列语言不熟悉,就可能从未见过是switch语句,它提供了一种方便的语法,可以替代一长串的if/elseif语句。这是一个很棒的特性,因为当我需要支持更多通知时,只需简单地添加case块即可。

回顾一下,RQ任务附加到task_progress通知的数据是一个包含两个元素task_idprogress的字典,这两个元素是我调用set_task_progress()的两个参数。

如果您现在运行该应用,则绿色Alert插入的进度指示器将每10秒刷新一次(因为刷新通知的时间间隔是10秒)。

如果您要维护非英语语言文件,则需要使用Flask-Babel刷新翻译文件,然后添加新的翻译:

(venv) $ flask translate update

如果您使用的是编码翻译,那么我已经为你完成了翻译工作,因此可以从下载包中提取app / translations / es / LC_MESSAGES / messages.po文件,从而将其添加到您的项目中。

翻译文件到位后,还要编译翻译文件:

(venv) $ flask translate compile
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 CryptoCode 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档