首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Django中测试django-rq (Python)的最佳实践

在Django中测试django-rq (Python)的最佳实践
EN

Stack Overflow用户
提问于 2012-07-01 13:27:04
回答 7查看 6.7K关注 0票数 14

我将在我的项目中开始使用django-rq。

Django与基于Redis的Python队列库RQ集成。

使用RQ测试django应用程序的最佳实践是什么?

例如,如果我想将我的应用程序作为一个黑匣子进行测试,在用户执行一些操作之后,我想要执行当前队列中的所有作业,然后检查我的DB中的所有结果。我怎么能在我的django测试中做到呢?

EN

回答 7

Stack Overflow用户

发布于 2012-09-05 02:22:58

我刚刚找到了django-rq,它允许您在一个测试环境中旋转一个工作人员,该环境执行队列中的任何任务,然后退出。

代码语言:javascript
复制
from django.test impor TestCase
from django_rq import get_worker

class MyTest(TestCase):
    def test_something_that_creates_jobs(self):
        ...                      # Stuff that init jobs.
        get_worker().work(burst=True)  # Processes all jobs then stop.
        ...                      # Asserts that the job stuff is done.
票数 9
EN

Stack Overflow用户

发布于 2013-01-26 19:07:31

我把我的rq测试分成几个部分。

  1. 测试我是否正确地将东西添加到队列中(使用模拟)。
  2. 假设如果将某些内容添加到队列中,则最终将对其进行处理。(rq的测试套件应该涵盖这一点)。
  3. 测试,给定正确的输入,我的任务就像预期的那样工作。(正常代码测试)。

正在测试的代码:

代码语言:javascript
复制
def handle(self, *args, **options):
    uid = options.get('user_id')

    # @@@ Need to exclude out users who have gotten an email within $window
    # days.
    if uid is None:
        uids = User.objects.filter(is_active=True, userprofile__waitlisted=False).values_list('id', flat=True)
    else:
        uids = [uid]

    q = rq.Queue(connection=redis.Redis())

    for user_id in uids:
        q.enqueue(mail_user, user_id)

我的测试:

代码语言:javascript
复制
class DjangoMailUsersTest(DjangoTestCase):
    def setUp(self):
        self.cmd = MailUserCommand()

    @patch('redis.Redis')
    @patch('rq.Queue')
    def test_no_userid_queues_all_userids(self, queue, _):
        u1 = UserF.create(userprofile__waitlisted=False)
        u2 = UserF.create(userprofile__waitlisted=False)
        self.cmd.handle()
        self.assertItemsEqual(queue.return_value.enqueue.mock_calls,
                              [call(ANY, u1.pk), call(ANY, u2.pk)])

    @patch('redis.Redis')
    @patch('rq.Queue')
    def test_waitlisted_people_excluded(self, queue, _):
        u1 = UserF.create(userprofile__waitlisted=False)
        UserF.create(userprofile__waitlisted=True)
        self.cmd.handle()
        self.assertItemsEqual(queue.return_value.enqueue.mock_calls, [call(ANY, u1.pk)])
票数 4
EN

Stack Overflow用户

发布于 2013-03-22 01:53:25

我学习了一块地,它让你做到了:

代码语言:javascript
复制
from django.test impor TestCase
from django_rq import get_queue

class MyTest(TestCase):
    def test_something_that_creates_jobs(self):
        queue = get_queue(async=False)
        queue.enqueue(func) # func will be executed right away
        # Test for job completion

这将使测试RQ作业更容易。希望这能帮上忙!

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/11282346

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档