首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >在python中使用多线程队列的正确方式?

在python中使用多线程队列的正确方式?
EN

Stack Overflow用户
提问于 2015-05-19 01:22:08
回答 2查看 2.4K关注 0票数 16

我正在尝试使用python中的队列,它将是多线程的。我只想知道我使用的方法是正确的还是错误的。如果我做了一些多余的事情,或者如果有更好的方法,我应该使用。

我正在尝试从一个表中获取新的请求,并使用一些逻辑来调度它们,以执行一些操作,比如运行查询。

因此,在这里,我从主线程为队列派生了一个单独的线程。

代码语言:javascript
复制
if __name__=='__main__':

  request_queue = SetQueue(maxsize=-1)
  worker = Thread(target=request_queue.process_queue)
  worker.setDaemon(True)
  worker.start()


  while True:
    try:
      #Connect to the database get all the new requests to be verified
      db = Database(username_testschema, password_testschema, mother_host_testschema, mother_port_testschema, mother_sid_testschema, 0)
      #Get new requests for verification
      verify_these = db.query("SELECT JOB_ID FROM %s.table WHERE     JOB_STATUS='%s' ORDER BY JOB_ID" %
                             (username_testschema, 'INITIATED'))

      #If there are some requests to be verified, put them in the queue.
      if len(verify_these) > 0:
        for row in verify_these:
          print "verifying : %s" % row[0]
          verify_id = row[0]
          request_queue.put(verify_id)
    except Exception as e:
      logger.exception(e)
    finally:
      time.sleep(10)

现在,在Setqueue类中,我有一个process_queue函数,该函数用于处理添加到队列的每次运行中的前2个请求。

代码语言:javascript
复制
'''
Overridding the Queue class to use set as all_items instead of list to ensure unique items added and processed all the time,
'''

class SetQueue(Queue.Queue):
  def _init(self, maxsize):
    Queue.Queue._init(self, maxsize)
    self.all_items = set()

  def _put(self, item):
    if item not in self.all_items:
      Queue.Queue._put(self, item)
      self.all_items.add(item)

  '''
  The Multi threaded queue for verification process. Take the top two items, verifies them in a separate thread and sleeps for 10 sec.
  This way max two requests per run will be processed.
  '''
  def process_queue(self):
    while True:
      scheduler_obj = Scheduler()

      try:
        if self.qsize() > 0:
          for i in range(2):
            job_id = self.get()
            t = Thread(target=scheduler_obj.verify_func, args=(job_id,))
            t.start()

          for i in range(2):
            t.join(timeout=1)
            self.task_done()

      except Exception as e:
        logger.exception(
          "QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")
      finally:
        time.sleep(10)

我想看看我的理解是否正确,是否会有任何问题。

因此,在main函数中运行的主线程连接到数据库时,会获得新的请求并将其放入队列。队列的工作线程(守护进程)继续从队列中获取新请求,并派生执行处理的非守护线程,由于联接的超时为1,因此工作线程将继续接受新请求而不会被阻塞,其子线程将继续在后台进行处理。对,是这样?

因此,如果主进程退出,这些进程在它们完成工作之前不会被终止,但工作守护进程线程将退出。怀疑:如果父进程是守护进程,子进程是非守护进程,如果父进程退出,子进程是否退出?)。

我也在这里阅读:- David beazley multiprocessing

david beazley在using a Pool as a Thread Coprocessor一节中尝试解决类似的问题。所以我应该遵循他的步骤:- 1.创建一个进程池。2.打开一个线程,就像我为request_queue 3所做的那样。

代码语言:javascript
复制
  def process_verification_queue(self):
    while True:
      try:
        if self.qsize() > 0:
          job_id = self.get()
          pool.apply_async(Scheduler.verify_func, args=(job_id,))
      except Exception as e:
        logger.exception("QUEUE EXCEPTION : Exception occured while    processing requests in the VERIFICATION QUEUE")

使用池中的进程并并行运行verify_func。这会给我带来更好的性能吗?

EN

回答 2

Stack Overflow用户

发布于 2015-06-10 11:09:46

虽然可以为队列创建一个新的独立线程,并按照您正在做的方式单独处理数据,但我认为每个独立工作线程将消息发布到它们已经“知道”的队列中是更常见的。然后,通过将消息从该队列中拉出,从其他线程处理该队列。

设计理念

我设想您的应用程序将是三个线程。主线程和两个辅助线程。1个工作线程将从数据库获取请求并将其放入队列中。另一个工作线程将处理来自队列数据

主线程将通过使用线程函数.join()来等待其他线程完成

您可以保护线程有权访问的队列,并通过使用互斥使其线程安全。我在其他语言的许多其他设计中也看到过这种模式。

推荐阅读

Brett Slatkin的"Effective Python“就是这个问题的一个很好的例子。

他没有继承队列,而是在他的类MyQueue中创建了一个包装器,并添加了一个get()和put(消息)函数。

他甚至在他的Github存储库中提供了源代码

https://github.com/bslatkin/effectivepython/blob/master/example_code/item_39.py

我与这本书或它的作者没有任何关系,但我强烈推荐它,因为我从它中学到了很多东西:)

票数 3
EN

Stack Overflow用户

发布于 2015-06-14 21:56:23

我喜欢这种对使用线程和进程之间的优势和差异的解释-- ".....But有一线希望:进程可以同时在多个执行线程上取得进展。由于父进程不与其子进程共享GIL,所有进程都可以同时执行(受硬件和操作系统的限制)……“

他对如何绕过GIL以及如何提高性能有一些很好的解释

点击此处阅读更多信息:

http://jeffknupp.com/blog/2013/06/30/pythons-hardest-problem-revisited/

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30309321

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档