首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Boto3与多处理

Boto3与多处理
EN

Stack Overflow用户
提问于 2022-09-25 16:35:33
回答 1查看 41关注 0票数 0

我想并行地实现一些代码(使用64个工作人员)。特别是,我希望从S3桶中读取对象,进行一些处理,并编写它们。

类似于:

代码语言:javascript
运行
复制
def read_file_from_s3(i):

    j=0

    for obj in bucket.objects.filter(Prefix=f'folder_name/'):
    
        if i==j:

            response = obj.get()

            df = pd.read_csv(response['Body'])

            df = df[df['some_col'].isin(some_list)] #this is the processing

            df.to_csv(f's3://some_bucket_name/some_other_folder/file_{i}.csv', 
                      index=False, storage_options = {'key': key, 'secret': secret})
        j+=1

a_pool = multiprocessing.Pool(64)
a_pool.map(read_file_from_s3, list(range(64)))

问题是,文件夹中有64个以上的对象(我不知道有多少)。我如何迭代所有对象,但仍然利用64个工作人员?

EN

回答 1

Stack Overflow用户

发布于 2022-09-25 17:41:27

您可以将迭代逻辑移动到创建多处理池的主进程中,并且只将S3键传递给工作人员以允许它获取对象。

例如:

代码语言:javascript
运行
复制
import boto3
import multiprocessing

_bucket=None
def init_read_file_from_s3(bucket_name):
    # Helper to create a S3 resource object for this worker
    global _bucket
    s3 = boto3.resource('s3')
    _bucket = s3.Bucket(bucket_name)

def read_file_from_s3(values):
    # Pull out the data
    i, s3_key = values

    # Perform work on a given S3 Key, use the single S3 bucket object the
    # initializer created before this worker started
    obj = _bucket.Object(s3_key)
    response = obj.get()
    # TODO: Operate on the object here, "i" is a unique ID for this item

    # Inform the caller that we're done with this key
    return s3_key

def main():
    bucket_name = 'example-bucket'
    prefix = 'example/prefix/'

    # Get a s3 resource object to list the objects with
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)

    # Initialize all of the workers with the bucket name so they can create a S3 resource object
    with multiprocessing.Pool(64, initializer=init_read_file_from_s3, initargs=(bucket_name,)) as a_pool:
        # List the objects, and feed each key in turn to a worker
        # Note that the object returned from filter() or all() isn't picklable, so we
        # pass just the string of the S3 key
        for result in a_pool.imap(
            read_file_from_s3, 
            [(i,x.key) for i,x in enumerate(bucket.objects.filter(Prefix=prefix))]):
            
            # Just show the key that was operated on
            print("Done with: " + result)

if __name__ == "__main__":
    main()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73846167

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档