我有一个包含股票代码(如tickers = ['AAPL','XOM','GOOG']
)的向量,在我的“传统”python程序中,我会循环这个tickers
向量,选择一个像AAPL
这样的代码串,导入一个包含AAPL
股票回报的csv文件,将返回作为一个公共函数的输入,最后生成一个csv文件作为输出。我有4000多个代码,应用于每个代码的功能需要时间来处理。我可以访问一个带有mpi4py
包的计算机集群,每个任务可以访问大约100个处理器。我很清楚(并且能够实现)这个mpi
示例在python中:
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
data = [i for i in range(8)]
# dividing data into chunks
chunks = [[] for _ in range(size)]
for i, chunk in enumerate(data):
chunks[i % size].append(chunk)
else:
data = None
chunks = None
data = comm.scatter(chunks, root=0)
print str(rank) + ': ' + str(data)
[cha@cluster] ~/utils> mpirun -np 3 ./mpi.py
2: [2, 5]
0: [0, 3, 6]
1: [1, 4, 7]
因此,在这个例子中,我们有一个大小为8的数据向量,并为每个处理器(总共3)分配相同数量的数据元素。我如何使用类似的上面的例子,并分配给每个处理器一个股票代码,并应用需要运行的功能为每个滴答?我如何告诉python,一旦处理器空闲,返回tickers
向量并处理尚未处理的ticker
?
发布于 2014-06-16 22:00:25
有另一种方式来思考这个。您有100个处理器处理4000块数据。可以这样看待的一种方法是,每个处理器都获得一组数据以供操作。平均分配,每个处理器将得到40个代码处理。处理器1将得到0-39,处理器2将得到40-79,等等.
这样想,您不需要担心当处理器完成其任务时会发生什么。只要有一个循环:
block_size = len(tickers) / size # this will be 40 in your example
for i in range(block_size):
ticker = tickers[rank * block_size + i]
process(ticker)
def process(ticker):
# load data
# process data
# output data
这有道理吗?
编辑
如果您想要更多地阅读,这实际上只是排-主要顺序索引的一个变化,这是一种访问存储在单个内存维度中的多维数据的常见方法。
https://stackoverflow.com/questions/24224414
复制相似问题