我正在尝试编写一个python脚本,扫描文件夹并收集更新的SQL脚本,然后自动为SQL脚本拉取数据。在代码中,while循环正在扫描新的SQL文件,并发送到数据拉取函数。我很难理解如何使用while循环创建一个动态队列,而且还需要多个进程来运行队列中的任务。
下面的代码有一个问题,即while循环迭代在移动到下一次迭代之前将在一个长作业上工作,并收集其他作业来填充空闲的处理器。
更新:
感谢@
the_queue =队列()
the_pool = Pool(4,worker_main,(the_queue,))
auto_data_pull.py
。你需要添加你自己的工作功能。一个包含以下内容的“批处理脚本”:启动C:\Anaconda2\python.exe C:\Users\bin\auto_data_pull.pyc。添加一个由启动计算机触发的任务,运行“批处理脚本”就行了。它起作用了。
Python代码:
from glob import glob
import os, time
import sys
import CSV
import re
import subprocess
import pandas as PD
import pypyodbc
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = compute(func, args)
output.put(result)
#
# Function used to compute result
#
def compute(func, args):
result = func(args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
def query_sql(sql_file): #test func
#jsl file processing and SQL querying, data table will be saved to csv.
fo_name = os.path.splitext(sql_file)[0] + '.csv'
fo = open(fo_name, 'w')
print sql_file
fo.write("sql_file {0} is done\n".format(sql_file))
return "Query is done for \n".format(sql_file)
def check_files(path):
"""
arguments -- root path to monitor
returns -- dictionary of {file: timestamp, ...}
"""
sql_query_dirs = glob(path + "/*/IDABox/")
files_dict = {}
for sql_query_dir in sql_query_dirs:
for root, dirs, filenames in os.walk(sql_query_dir):
[files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for
filename in filenames if filename.endswith('.jsl')]
return files_dict
##### working in single thread
def single_thread():
path = "Y:/"
before = check_files(path)
sql_queue = []
while True:
time.sleep(3)
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]
before = after
sql_queue = added + updated
# print sql_queue
for sql_file in sql_queue:
try:
query_sql(sql_file)
except:
pass
##### not working in queue
def multiple_thread():
NUMBER_OF_PROCESSES = 4
path = "Y:/"
sql_queue = []
before = check_files(path) # get the current dictionary of sql_files
task_queue = Queue()
done_queue = Queue()
while True: #while loop to check the changes of the files
time.sleep(5)
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]
before = after
sql_queue = added + updated
TASKS = [(query_sql, sql_file) for sql_file in sql_queue]
# Create queues
#submit task
for task in TASKS:
task_queue.put(task)
for i in range(NUMBER_OF_PROCESSES):
p = Process(target=worker, args=(task_queue, done_queue)).start()
# try:
# p = Process(target=worker, args=(task_queue))
# p.start()
# except:
# pass
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
# single_thread()
if __name__ == '__main__':
# freeze_support()
multiple_thread()
参考:
使用python脚本:http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html
发布于 2017-09-30 03:56:21
我已经弄清楚了。感谢您的回复激发了我的想法。现在,脚本可以运行while循环来监视文件夹中新更新/添加的SQL脚本,然后将数据提取分发到多个线程。解决方案来自queue.get()和queue.put()。我假设queue对象自己负责通信。
这是最终的代码--
from glob import glob
import os, time
import sys
import pypyodbc
from multiprocessing import Process, Queue, Event, Pool, current_process, freeze_support
def query_sql(sql_file): #test func
#jsl file processing and SQL querying, data table will be saved to csv.
fo_name = os.path.splitext(sql_file)[0] + '.csv'
fo = open(fo_name, 'w')
print sql_file
fo.write("sql_file {0} is done\n".format(sql_file))
return "Query is done for \n".format(sql_file)
def check_files(path):
"""
arguments -- root path to monitor
returns -- dictionary of {file: timestamp, ...}
"""
sql_query_dirs = glob(path + "/*/IDABox/")
files_dict = {}
try:
for sql_query_dir in sql_query_dirs:
for root, dirs, filenames in os.walk(sql_query_dir):
[files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for
filename in filenames if filename.endswith('.jsl')]
except:
pass
return files_dict
def worker_main(queue):
print os.getpid(),"working"
while True:
item = queue.get(True)
query_sql(item)
def main():
the_queue = Queue()
the_pool = Pool(4, worker_main,(the_queue,))
path = "Y:/"
before = check_files(path) # get the current dictionary of sql_files
while True: #while loop to check the changes of the files
time.sleep(5)
sql_queue = []
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]
before = after
sql_queue = added + updated
if sql_queue:
for jsl_file in sql_queue:
try:
the_queue.put(jsl_file)
except:
print "{0} failed with error {1}. \n".format(jsl_file, str(sys.exc_info()[0]))
pass
else:
pass
if __name__ == "__main__":
main()
发布于 2017-09-20 10:00:55
在multiple_thread()
中的什么地方定义了sql_file
?
multiprocessing.Process(target=query_sql, args=(sql_file)).start()
您没有在方法中定义sql_file
,而且还在for循环中使用了该变量。该变量的作用域仅限于for循环。
发布于 2017-09-25 20:23:06
尝试替换此命令:
result = func(*args)
通过以下方式:
result = func(args)
https://stackoverflow.com/questions/46223523
复制相似问题