首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在wxPython TextCtrl中捕获在单独进程中运行的shell脚本的输出?

如何在wxPython TextCtrl中捕获在单独进程中运行的shell脚本的输出?
EN

Stack Overflow用户
提问于 2011-04-29 22:58:21
回答 2查看 2.2K关注 0票数 1

长时间运行的shell脚本会生成stdout和stderr,我希望在GUI中的textctrl中显示它们。这可以使用线程化并将GUI线程与shell脚本的线程分开。然而,当我实现多进程时,我遇到了一个障碍。下面是我的-stripped下行代码:

代码语言:javascript
运行
复制
#!/usr/bin/env python

import wx
import sys, subprocess
from multiprocessing import Process, Queue
from Queue import Empty

class MyFrame(wx.Frame):
    def __init__(self, *args, **kwds):
        wx.Frame.__init__(self, *args, **kwds)

        self.button = wx.Button(self, -1 , "Run")
        self.output = wx.TextCtrl(self, -1, '', style=wx.TE_MULTILINE|\
                                                      wx.TE_READONLY|wx.HSCROLL)

        self.Bind(wx.EVT_BUTTON, self.OnButton, self.button)

        sizer = wx.BoxSizer(wx.VERTICAL)
        sizer.Add(self.output, -1, wx.EXPAND, 0)
        sizer.Add(self.button)
        self.SetSizerAndFit(sizer)
        self.Centre()

    def OnButton(self, event):
        numtasks = 4 # total number of tasks to run
        numprocs = 2 # number of processors = number of parallel tasks
        work_queue = Queue()
        for i in xrange(numtasks):
             work_queue.put(i)
        processes = [Process(target=self.doWork, args=(work_queue, ))
                     for i in range(numprocs)]
        for p in processes:
            p.daemon = True
            p.start()

    def doWork(self, work_queue):
        while True:
            try:
                x = work_queue.get(block=False)
                self.runScript(x)
            except Empty:
                print "Queue Empty"
                break

    def runScript(self, taskno):
        print '## Now Running: ', taskno
        command = ['./script.sh']  
        proc = subprocess.Popen(command, shell=True, 
                                         stdout=subprocess.PIPE, 
                                         stderr=subprocess.STDOUT)
        while True:
            stdout = proc.stdout.readline()
            if not stdout:
                break
            #sys.stdout.flush() #no need for flush, apparently it is embedded in the multiprocessing module
            self.output.AppendText(stdout.rstrip()) #this is the part that doesn't work.

if __name__ == "__main__":
    app = wx.App(0)
    frame = MyFrame(None, title="shell2textctrl", size=(500,500))
    frame.Show(True)
    app.MainLoop()

我根据别人的建议尝试了很多解决方案。其中包括:使用wx.CallAfter或wx.lib.delayedresult使图形用户界面响应;几个线程配方(例如,wxApplication开发指南、线程池等...)并让多进程()从单独的线程运行;重新定义sys.stdout.write以写入文本have;等等。他们都没有成功。有没有人能提供一个解决方案和可用的代码?你可以在这里使用我忘记了谁写的这个脚本:

代码语言:javascript
运行
复制
#!/bin/tcsh -f
@ i = 1
while ($i <= 20)
 echo $i
 @ i += 1
 sleep 0.2
end
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2011-05-03 11:29:21

我找到了一个解决方案,它在Roger Stuckey的Simpler wxPython Multiprocessing Example框架之上实现了一个修改后的输出队列。下面的代码甚至比他的代码更简单;它也可以稍微清理一下。希望这对其他人有帮助。不过,我仍然有一种恼人的感觉,那就是应该有一种更直接的方法来做这件事。

代码语言:javascript
运行
复制
import getopt, math, random, sys, time, types, wx, subprocess

from multiprocessing import Process, Queue, cpu_count, current_process, freeze_support
from Queue import Empty

class MyFrame(wx.Frame):
    def __init__(self, parent, id, title):
        wx.Frame.__init__(self, parent, id, title, wx.Point(700, 500), wx.Size(300, 200))

        self.panel = wx.Panel(self, wx.ID_ANY)

        #widgets
        self.start_bt = wx.Button(self.panel, wx.ID_ANY, "Start")
        self.Bind(wx.EVT_BUTTON, self.OnButton, self.start_bt)

        self.output_tc = wx.TextCtrl(self.panel, wx.ID_ANY, style=wx.TE_MULTILINE|wx.TE_READONLY)

        # sizer
        self.sizer = wx.GridBagSizer(5, 5)
        self.sizer.Add(self.start_bt, (0, 0), flag=wx.ALIGN_CENTER|wx.LEFT|wx.TOP|wx.RIGHT, border=5)
        self.sizer.Add(self.output_tc, (1, 0), flag=wx.EXPAND|wx.LEFT|wx.RIGHT|wx.BOTTOM, border=5)
        self.sizer.AddGrowableCol(0)
        self.sizer.AddGrowableRow(1)
        self.panel.SetSizer(self.sizer)

        # Set some program flags
        self.keepgoing = True
        self.i = 0
        self.j = 0


    def OnButton(self, event):
        self.start_bt.Enable(False)

        self.numtasks = 4
        self.numproc = 2
        #self.numproc = cpu_count()
        self.output_tc.AppendText('Number of processes = %d\n' % self.numproc)

        # Create the queues
        self.taskQueue = Queue()
        self.outputQueue = Queue()

        # Create the task list
        self.Tasks = range(self.numtasks)

        # The worker processes...
        for n in range(self.numproc):
            process = Process(target=self.worker, args=(self.taskQueue, self.outputQueue))
            process.start()

        # Start processing tasks
        self.processTasks(self.update)

        if (self.keepgoing):
            self.start_bt.Enable(True)

    def processTasks(self, resfunc=None):
        self.keepgoing = True

        # Submit first set of tasks
        numprocstart = min(self.numproc, self.numtasks)
        for self.i in range(numprocstart):
            self.taskQueue.put(self.Tasks[self.i])

        self.j = -1 # done queue index
        self.i = numprocstart - 1 # task queue index
        while (self.j < self.i):
            # Get and print results
            self.j += 1
            output = None
            while output != 'STOP!':
                try:
                    output = self.outputQueue.get() 
                    if output != 'STOP!': 
                        resfunc(output)
                except Empty:
                    break

            if ((self.keepgoing) and (self.i + 1 < self.numtasks)):
                # Submit another task
                self.i += 1
                self.taskQueue.put(self.Tasks[self.i])

    def update(self, output):
        self.output_tc.AppendText('%s PID=%d Task=%d : %s\n' % output)
        wx.YieldIfNeeded()

    def worker(self, inputq, outputq):
        while True:
            try:
                tasknum = inputq.get()
                print '## Now Running: ', tasknum #this goes to terminal/console. Add it to outputq if you'd like it on the TextCtrl.
                command = ['./script.sh']
                p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
                while True:
                    r = p.stdout.readline()
                    if not r:
                        outputq.put('STOP!')
                        break
                    outputq.put(( current_process().name, current_process().pid, tasknum, r.rstrip()))
            except Empty:
                break

    # The worker must not require any existing object for execution!
    worker = classmethod(worker)

class MyApp(wx.App):
    def OnInit(self):
        self.frame = MyFrame(None, -1, 'stdout to GUI using multiprocessing')
        self.frame.Show(True)
        self.frame.Center()
        return True

if __name__ == '__main__':
    freeze_support()
    app = MyApp(0)
    app.MainLoop()
票数 1
EN

Stack Overflow用户

发布于 2011-04-30 04:38:07

我不知道这是否会对您有所帮助,但我在这里有一个使用子过程的示例:

http://www.blog.pythonlibrary.org/2010/06/05/python-running-ping-traceroute-and-more/

请参阅pingIP和tracertIP方法。如果您的shell脚本是用Python编写的,那么您可以向其中添加某种生成器,就像我在那篇文章中用来发送回GUI的生成器一样。你可能已经读过LongRunningProcess维基的文章了,但我在这里写了我自己的教程:

http://www.blog.pythonlibrary.org/2010/05/22/wxpython-and-threads/

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/5833716

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档