我试图在上下文管理器中使用aiobotocore库,但我很难配置我的凭据。我需要创建一个类来配置我的AWS客户机,这样我就可以在这个库中使用put、read和delete函数。
为此使用了以下代码:
from contextlib import AsyncExitStack
from aiobotocore.session import AioSession
from credentials import aws_access_key_id, aws_secret_access_key
class AWSConnectionManager:
def __init__(self, aws_acces_key_id, aws_secret_access_key):
self.aws_acces_key_id = aws_acces_key_id
self.aws_secret_access_key = aws_secret_access_key
self._exit_stack = AsyncExitStack()
self._client = None
print('__init__')
async def __aenter__(self):
session = AioSession
self._client = await self._exit_stack.enter_async_context(session.create_client('s3'))
print('__aenter__')
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
print('__aexit__')
res = AWSConnectionManager(aws_acces_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)但是,它不通过aenter和aexit方法。对于上面的代码,我有以下输出:
__init__
<__main__.AWSConnectionManager object at 0x7f03921ac640>有人知道我的代码有什么问题吗?
发布于 2022-07-10 13:48:35
首先:您需要修复'session = AioSession‘=>’会话= AioSession()‘+添加返回+传递凭据
async def __aenter__(self):
session = AioSession()
self._client = await self._exit_stack.enter_async_context(
session.create_client(
's3',
aws_secret_access_key=self.aws_secret_access_key,
aws_access_key_id=self.aws_access_key_id,
)
)
return self第二:您需要为put_object/get_object编写/添加代理调用,或者通过重命名_client =>客户端使_client公开
async def save_file(self, content, s3_filename: str):
await self._client.put_object(Bucket=self.bucket, Body=content, Key=f'{s3_filename}')
async def load_file(self, name):
obj = await self._client.get_object(Bucket=self.bucket, Key=f'{name}')
return obj['Body'].read()现在你可以用
async with SkyFileStorageProxy() as storage:
await storage.load_file(name='test.txt')https://stackoverflow.com/questions/72776727
复制相似问题