我想使用boto3雅典娜客户端函数start_query_execution
来运行来自Python的查询。这里的函数文档:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.start_query_execution
用法类似于:
query = "SELECT * FROM TABLE"
athena_client = boto3.client("athena")
start_response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": ATHENA.database_name},
ResultConfiguration={
"OutputLocation": s3_output,
},
)
--我正在寻找一个函数/包装器,以确保该查询成功运行,并且只在它运行到完成之后才返回。在我的搜索中找不到aws包装器。
发布于 2022-01-11 06:43:01
我实现了一个泛型函数,它执行特定的查询,并通过每隔一段时间轮询查询ID来确保它成功运行:
import time
import logging
import boto3
def run_query(query: str, s3_output: str) -> None:
"""Generic function to run athena query and ensures it is successfully completed
Parameters
----------
query : str
formatted string containing athena sql query
s3_output : str
query output path
"""
athena_client = boto3.client("athena")
start_response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": ATHENA.database_name},
ResultConfiguration={
"OutputLocation": s3_output,
},
)
query_id = start_response["QueryExecutionId"]
while True:
finish_state = athena_client.get_query_execution(QueryExecutionId=query_id)[
"QueryExecution"
]["Status"]["State"]
if finish_state == "RUNNING" or finish_state == "QUEUED":
time.sleep(10)
else:
break
assert finish_state == "SUCCEEDED", f"query state is {finish_state}"
logging.info(f"Query {query_id} complete")
https://stackoverflow.com/questions/70662594
复制相似问题