首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Python脚本使用while循环不断更新作业脚本并多处理队列中的任务

Python脚本使用while循环不断更新作业脚本并多处理队列中的任务
EN

Stack Overflow用户
提问于 2017-09-14 23:51:52
回答 3查看 1.1K关注 0票数 19

我正在尝试编写一个python脚本,扫描文件夹并收集更新的SQL脚本,然后自动为SQL脚本拉取数据。在代码中,while循环正在扫描新的SQL文件,并发送到数据拉取函数。我很难理解如何使用while循环创建一个动态队列,而且还需要多个进程来运行队列中的任务。

下面的代码有一个问题,即while循环迭代在移动到下一次迭代之前将在一个长作业上工作,并收集其他作业来填充空闲的处理器。

更新:

感谢@

  1. 捕捉到了错误,现在错误消息消失了。更改代码后,python代码可以在一次迭代中获取所有作业脚本,并将脚本分发给四个处理器。然而,它会被一个长时间的作业挂起,去下一次迭代,扫描并提交新添加的作业脚本。你知道如何重构代码吗?
  2. 我终于想出了解决方案,请看下面的答案。原来我要找的是

the_queue =队列()

the_pool = Pool(4,worker_main,(the_queue,))

  • 对于那些遇到类似想法的人来说,下面是这个自动化脚本将共享驱动器转换为“SQL拉取服务器”或任何其他作业队列“服务器”的整个体系结构。答案中所示的python脚本auto_data_pull.py。你需要添加你自己的工作功能。一个包含以下内容的“批处理脚本”:

启动C:\Anaconda2\python.exe C:\Users\bin\auto_data_pull.pyc。添加一个由启动计算机触发的任务,运行“批处理脚本”就行了。它起作用了。

Python代码:

from glob import glob
import os, time
import sys
import CSV
import re
import subprocess
import pandas as PD
import pypyodbc
from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = compute(func, args)
        output.put(result)

#
# Function used to compute result
#

def compute(func, args):
    result = func(args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)


def query_sql(sql_file): #test func
    #jsl file processing and SQL querying, data table will be saved to csv.
    fo_name = os.path.splitext(sql_file)[0] + '.csv'
    fo = open(fo_name, 'w')
    print sql_file
    fo.write("sql_file {0} is done\n".format(sql_file))
    return "Query is done for \n".format(sql_file)


def check_files(path):
    """
    arguments -- root path to monitor
    returns   -- dictionary of {file: timestamp, ...}
    """
    sql_query_dirs = glob(path + "/*/IDABox/")

    files_dict = {}
    for sql_query_dir in sql_query_dirs:
        for root, dirs, filenames in os.walk(sql_query_dir):
            [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for 
                     filename in filenames if filename.endswith('.jsl')]
    return files_dict


##### working in single thread
def single_thread():
    path = "Y:/"

    before = check_files(path)
    sql_queue  = [] 

    while True:
        time.sleep(3)
        after = check_files(path)
        added = [f for f in after if not f in before]
        deleted = [f for f in before if not f in after]
        overlapped = list(set(list(after)) & set(list(before)))
        updated = [f for f in overlapped if before[f] < after[f]]  

        before = after

        sql_queue = added + updated
        # print sql_queue
        for sql_file in sql_queue:
            try:
                query_sql(sql_file)
            except:
                pass


##### not working in queue
def multiple_thread():

    NUMBER_OF_PROCESSES = 4
    path = "Y:/"

    sql_queue  = [] 
    before = check_files(path) # get the current dictionary of sql_files
    task_queue = Queue()
    done_queue = Queue()

    while True:         #while loop to check the changes of the files
        time.sleep(5)
        after = check_files(path)
        added = [f for f in after if not f in before]
        deleted = [f for f in before if not f in after]
        overlapped = list(set(list(after)) & set(list(before)))
        updated = [f for f in overlapped if before[f] < after[f]]  

        before = after  
        sql_queue = added + updated   

        TASKS = [(query_sql, sql_file) for sql_file in sql_queue]
        # Create queues

        #submit task
        for task in TASKS:
            task_queue.put(task)

        for i in range(NUMBER_OF_PROCESSES):
                p = Process(target=worker, args=(task_queue, done_queue)).start()          
            # try:
            #     p = Process(target=worker, args=(task_queue))
            #     p.start()

            # except:
            #     pass 

        # Get and print results
        print 'Unordered results:'
        for i in range(len(TASKS)):
            print '\t', done_queue.get()
        # Tell child processes to stop
        for i in range(NUMBER_OF_PROCESSES):
            task_queue.put('STOP')        

# single_thread()
if __name__ == '__main__':
    # freeze_support()
    multiple_thread()

参考:

使用python脚本:http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html

  • Multiprocessing:监视
  1. 文件更改

https://docs.python.org/2/library/multiprocessing.html

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2017-09-30 03:56:21

我已经弄清楚了。感谢您的回复激发了我的想法。现在,脚本可以运行while循环来监视文件夹中新更新/添加的SQL脚本,然后将数据提取分发到多个线程。解决方案来自queue.get()和queue.put()。我假设queue对象自己负责通信。

这是最终的代码--

from glob import glob
import os, time
import sys
import pypyodbc
from multiprocessing import Process, Queue, Event, Pool, current_process, freeze_support

def query_sql(sql_file): #test func
    #jsl file processing and SQL querying, data table will be saved to csv.
    fo_name = os.path.splitext(sql_file)[0] + '.csv'
    fo = open(fo_name, 'w')
    print sql_file
    fo.write("sql_file {0} is done\n".format(sql_file))
    return "Query is done for \n".format(sql_file)


def check_files(path):
    """
    arguments -- root path to monitor
    returns   -- dictionary of {file: timestamp, ...}
    """
    sql_query_dirs = glob(path + "/*/IDABox/")

    files_dict = {}
    try:
        for sql_query_dir in sql_query_dirs:
            for root, dirs, filenames in os.walk(sql_query_dir):
                [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for 
                         filename in filenames if filename.endswith('.jsl')]
    except:
        pass

    return files_dict


def worker_main(queue):
    print os.getpid(),"working"
    while True:
        item = queue.get(True)
        query_sql(item)

def main():
    the_queue = Queue()
    the_pool = Pool(4, worker_main,(the_queue,))

    path = "Y:/"
    before = check_files(path) # get the current dictionary of sql_files
    while True:         #while loop to check the changes of the files
        time.sleep(5)
        sql_queue  = [] 
        after = check_files(path)
        added = [f for f in after if not f in before]
        deleted = [f for f in before if not f in after]
        overlapped = list(set(list(after)) & set(list(before)))
        updated = [f for f in overlapped if before[f] < after[f]]  

        before = after  
        sql_queue = added + updated   
        if sql_queue:
            for jsl_file in sql_queue:
                try:
                    the_queue.put(jsl_file)
                except:
                    print "{0} failed with error {1}. \n".format(jsl_file, str(sys.exc_info()[0]))
                    pass
        else:
            pass

if __name__ == "__main__":
    main()  
票数 1
EN

Stack Overflow用户

发布于 2017-09-20 10:00:55

multiple_thread()中的什么地方定义了sql_file

multiprocessing.Process(target=query_sql, args=(sql_file)).start()

您没有在方法中定义sql_file,而且还在for循环中使用了该变量。该变量的作用域仅限于for循环。

票数 2
EN

Stack Overflow用户

发布于 2017-09-25 20:23:06

尝试替换此命令:

result = func(*args)

通过以下方式:

result = func(args)
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46223523

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档