我想并行地实现一些代码(使用64个工作人员)。特别是,我希望从S3桶中读取对象,进行一些处理,并编写它们。
类似于:
def read_file_from_s3(i):
j=0
for obj in bucket.objects.filter(Prefix=f'folder_name/'):
if i==j:
response = obj.get()
df = pd.read_csv(response['Body'])
df = df[df['some_col'].isin(some_list)] #this is the processing
df.to_csv(f's3://some_bucket_name/some_other_folder/file_{i}.csv',
index=False, storage_options = {'key': key, 'secret': secret})
j+=1
a_pool = multiprocessing.Pool(64)
a_pool.map(read_file_from_s3, list(range(64)))问题是,文件夹中有64个以上的对象(我不知道有多少)。我如何迭代所有对象,但仍然利用64个工作人员?
发布于 2022-09-25 17:41:27
您可以将迭代逻辑移动到创建多处理池的主进程中,并且只将S3键传递给工作人员以允许它获取对象。
例如:
import boto3
import multiprocessing
_bucket=None
def init_read_file_from_s3(bucket_name):
# Helper to create a S3 resource object for this worker
global _bucket
s3 = boto3.resource('s3')
_bucket = s3.Bucket(bucket_name)
def read_file_from_s3(values):
# Pull out the data
i, s3_key = values
# Perform work on a given S3 Key, use the single S3 bucket object the
# initializer created before this worker started
obj = _bucket.Object(s3_key)
response = obj.get()
# TODO: Operate on the object here, "i" is a unique ID for this item
# Inform the caller that we're done with this key
return s3_key
def main():
bucket_name = 'example-bucket'
prefix = 'example/prefix/'
# Get a s3 resource object to list the objects with
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
# Initialize all of the workers with the bucket name so they can create a S3 resource object
with multiprocessing.Pool(64, initializer=init_read_file_from_s3, initargs=(bucket_name,)) as a_pool:
# List the objects, and feed each key in turn to a worker
# Note that the object returned from filter() or all() isn't picklable, so we
# pass just the string of the S3 key
for result in a_pool.imap(
read_file_from_s3,
[(i,x.key) for i,x in enumerate(bucket.objects.filter(Prefix=prefix))]):
# Just show the key that was operated on
print("Done with: " + result)
if __name__ == "__main__":
main()https://stackoverflow.com/questions/73846167
复制相似问题