首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何通过input()控制线程?

如何通过input()控制线程?
EN

Stack Overflow用户
提问于 2018-09-14 12:04:15
回答 1查看 163关注 0票数 1

我希望运行与主代码并行的进程的代码,但也希望通过命令提示符访问其参数或启动/停止进程。

我的机器是64位的win7。我想说的是:

代码语言:javascript
复制
from multiprocessing import Process

class dllapi():
    ...

def apiloop(params, args):
    apiclient = dllapi(**args)
    while True:
        apiclient.cycle()
        params = [....]

def mainloop(args):
    p = Process(target = apiloop, args=(params, args, ))
    while True:
        cmd = input()
        if cmd == 'kill':
            p.terminate()
        if cmd == 'stop':
            pass # no idea
        if cmd == 'resume':
            pass # no idea
        if cmd == 'report':
            print (params)

我希望让它变得简单。我确实尝试将apiloop作为线程,但input()可能会冻结程序,并停止apiloop的工作,直到我按下enter……

为了共享来自apiloop进程的参数,我确实尝试了队列和管道,但在我看来,队列需要.join等待,直到apiloop完成并且管道有缓冲区限制。

(实际上我可以让apiclient.cycle每隔1秒运行一次,但我希望让apiclient保持活动状态)

我想知道是否值得深入研究多进程(例如,我也会尝试管理器...)或者有更适合我的情况的其他方法。先谢谢你...

*更新: 201809170953*

manager的一些进展如下:

代码语言:javascript
复制
from multiprocessing import Process, Manager

class dllapi():
    ...
class webclientapi():
    ...

def apiloop(args, cmd, params):
    apiclient = dllapi(**args)
    status = True
    while True:
        # command from main
        if cmd == 'stop':
            status = False
        elif cmd == 'start':
            status = True
        cmd = None
        # stop or run
        if status == True:
            apiclient.cycle()
        # update parameters
        params['status'] = status

def uploadloop(cmds, params):
    uploadclient = webclientapi()
    status = True
    while True:
        # command from main
        if cmd == 'stop':
            status = False
        elif cmd == 'start':
            status = True
        cmd = None
        # stop or run
        if status == True:
            # upload 'status' from apiclient to somewhere
            uploadclient.cycle(params['status'])

def mainloop(args):

    manager = Manager()
    mpcmds = {}
    mpparams = {}
    mps = {}

    mpcmds   ['apiloop'] = manager.Value('u', 'start')
    mpparams ['apiloop'] = manager.dict()
    mps      ['apiloop'] = Process(target = apiloop, args=(args, mpcmds['apiloop'], mpparams['apiloop'])

    mpcmds   ['uploadloop'] = manager.Value('u', 'start')
    # mpparams ['uploadloop'] is directly from mpparams ['apiloop']
    mps      ['uploadloop'] = Process(target = uploadloop, args=(mpcmds['uploadloop'], mpparams['apiloop'])

    for key, mp in mps.items():
        mp.daemon = True
        mp.start()

    while True:
        cmd = input().split(' ')
        # kill daemon process with exit()
        if cmd[0] == 'bye':
            exit()
        # kill individual process
        if cmd[0] == 'kill':
            mps[cmd[1]].terminate()
        # stop individual process via command
        if cmd[0] == 'stop':
            mpcmds[cmd[1]] = 'stop'
        # stop individual process via command
        if cmd[0] == 'start':
            mpcmds[cmd[1]] = 'start'
        # report individual process info via command
        if cmd[0] == 'report':
            print (mpparams ['apiloop'])

希望这能帮到什么人。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-09-17 01:18:01

我将向您展示如何仅使用线程来解决一般问题,因为这是您首先尝试的方法,并且您的示例没有引出子进程的需求。

在下面的示例中,您的dllapi类被命名为Zoo,并且它是threading.Thread的子类化,添加了一些方法以允许执行控制。它在初始化时需要一些data,它的cycle-method只是重复迭代这些数据,并计算它看到特定项的次数。

代码语言:javascript
复制
import time
import logging
from queue import Queue
from threading import Thread

from itertools import count, cycle


class Zoo(Thread):

    _ids = count(1)

    def __init__(self, cmd_queue, data, *args,
             log_level=logging.DEBUG, **kwargs):

        super().__init__()
        self.name = f'{self.__class__.__name__.lower()}-{next(self._ids)}'
        self.data = data
        self.log_level = log_level
        self.args = args
        self.kwargs = kwargs

        self.logger = self._init_logging()
        self.cmd_queue = cmd_queue

        self.data_size = len(data)
        self.actual_item = None
        self.iter_cnt = 0
        self.cnt = count(1)
        self.cyc = cycle(self.data)

    def cycle(self):
        item = next(self.cyc)
        if next(self.cnt) % self.data_size == 0:  # new iteration round
            self.iter_cnt += 1
        self.actual_item = f'{item}_{self.iter_cnt}'

    def run(self):
        """
        Run is the main-function in the new thread. Here we overwrite run
        inherited from threading.Thread.
        """
        while True:
            if self.cmd_queue.empty():
                self.cycle()
                time.sleep(1)  # optional heartbeat
            else:
                self._get_cmd()
                self.cmd_queue.task_done()  # unblocks prompter

    def stop(self):
        self.logger.info(f'stopping with actual item: {self.actual_item}')
        # do clean up
        raise SystemExit

    def pause(self):
        self.logger.info(f'pausing with actual item: {self.actual_item}')
        self.cmd_queue.task_done()  # unblocks producer joining the queue
        self._get_cmd()  # just wait blockingly until next command

    def resume(self):
        self.logger.info(f'resuming with actual item: {self.actual_item}')

    def report(self):
        self.logger.info(f'reporting with actual item: {self.actual_item}')
        print(f'completed {self.iter_cnt} iterations over data')

    def _init_logging(self):
        fmt = '[%(asctime)s %(levelname)-8s %(threadName)s' \
          ' %(funcName)s()] --- %(message)s'
        logging.basicConfig(format=fmt, level=self.log_level)
        return logging.getLogger()

    def _get_cmd(self):
        cmd = self.cmd_queue.get()
        try:
            self.__class__.__dict__[cmd](self)
        except KeyError:
            print(f'Command `{cmd}` is unknown.')

input是一个阻塞函数。你需要在一个单独的线程中外包它,这样它就不会阻塞你的主线程。在下面的示例中,input包装在Prompter中,这是一个子类化threading.Thread的类。Prompter将输入传递到命令队列中。此命令队列由Zoo读取。

代码语言:javascript
复制
class Prompter(Thread):
    """Prompt user for command input.
    Runs in a separate thread so the main-thread does not block.
    """
    def __init__(self, cmd_queue):
        super().__init__()
        self.cmd_queue = cmd_queue

    def run(self):
        while True:
            cmd = input('prompt> ')
            self.cmd_queue.put(cmd)
            self.cmd_queue.join()  # blocks until consumer calls task_done()


if __name__ == '__main__':

    data = ['ape', 'bear', 'cat', 'dog', 'elephant', 'frog']

    cmd_queue = Queue()
    prompter = Prompter(cmd_queue=cmd_queue)
    prompter.daemon = True

    zoo = Zoo(cmd_queue=cmd_queue, data=data)

    prompter.start()
    zoo.start()

终端中的会话示例:

代码语言:javascript
复制
$python control_thread_over_prompt.py
prompt> report
[2018-09-16 17:59:16,856 INFO     zoo-1 report()] --- reporting with actual item: dog_0
completed 0 iterations over data
prompt> pause
[2018-09-16 17:59:26,864 INFO     zoo-1 pause()] --- pausing with actual item: bear_2
prompt> resume
[2018-09-16 17:59:33,291 INFO     zoo-1 resume()] --- resuming with actual item: bear_2
prompt> report
[2018-09-16 17:59:38,296 INFO     zoo-1 report()] --- reporting with actual item: ape_3
completed 3 iterations over data
prompt> stop
[2018-09-16 17:59:42,301 INFO     zoo-1 stop()] --- stopping with actual item: elephant_3
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52324820

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档