有没有办法获得存储在步骤中的元流工件的完整s3 url路径?
我查看了Metaflow的DataArtifact类,但没有看到明显的s3路径属性。
发布于 2021-02-25 09:42:16
是的,你能做到
Flow('MyFlow')[42]['foo'].task.artifacts.bar._object['location']
其中MyFlow
是流的名称,42
是运行ID,foo
是正在考虑的步骤,bar
是该步骤的工件。
发布于 2021-06-30 02:02:55
根据@Savin的回答,我编写了一个助手函数,在给定运行ID和工件名称的情况下获取工件的S3地址:
from metaflow import Flow, Metaflow, Run
from typing import List, Union
def get_artifact_s3url_from_run(
run: Union[str, Run], name: str, legacy_names: List[str] = [], missing_ok: bool = False
) -> str:
"""
Given a MetaFlow Run and a key, scans the run's tasks and returns the artifact's S3 URL with that key.
NOTE: use get_artifact_from_run() if you want the artifact itself, not the S3 URL to the artifact.
This allows us to find data artifacts even in flows that did not finish. If we change the name of an artifact,
we can support backwards compatibility by also passing in the legacy keys. Note: we can avoid this by resuming a
specific run and adding a node which re-maps the artifact to another key. This will assign the run a new ID.
Args:
missing_ok: whether to allow an artifact to be missing
name: name of the attribute to look for in task.data
run: a metaflow.Run() object, or a run ID
legacy_names: backup names to check
Returns:
the value of the attribute. If attribute is not found
Raises:
DataartifactNotFoundError if artifact is not found and missing_ok=False
ValueError if Flow not found
ValueError if Flow is found but run ID is not.
"""
namespace(None) # allows us to access all runs in all namespaces
names_to_check = [name] + legacy_names
if isinstance(run, str):
try:
run = Run(run)
except Exception as e:
# run ID not found. see if we can find other runs and list them
flow = run.split(sep="/")[0]
try:
flow = Flow(flow)
raise ValueError(f"Could not find run ID {run}. Possible values: {flow.runs()}") from e
except Exception as e2:
raise ValueError(f"Could not find flow {flow}. Available flows: {Metaflow().flows}") from e2
for name_ in names_to_check:
for step_ in run:
for task in step_:
print(f"task {task} artifacts: {task.artifacts} \n \n")
if task.artifacts is not None and name_ in task.artifacts:
# https://stackoverflow.com/a/66361249/4212158
return getattr(task.artifacts, name_)._object["location"]
if not missing_ok:
raise DataArtifactNotFoundError(
f"No data artifact with name {name} found in {run}. Also checked legacy names: {legacy_names}"
)
https://stackoverflow.com/questions/62851369
复制相似问题