首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在元流中创建嵌套分支?

如何在元流中创建嵌套分支?
EN

Stack Overflow用户
提问于 2021-04-30 11:28:32
回答 1查看 400关注 0票数 4

我使用metaflow创建一个文本处理管道,如下所示:

代码语言:javascript
运行
复制
                                 ___F------
                     ______ D---|          |  
                    |           |___G---|  |__>  
          ____B-----|                   |----->H
         |          |______E_________________> ^
      A -|                                     |
         |____C________________________________|

根据文档branch允许并行计算步骤,并用于并行计算(B、C)、(D、E)和(F,G)。最后,所有分支都在H.连接,下面是实现此逻辑的代码:

代码语言:javascript
运行
复制
from metaflow import FlowSpec, step

class TextProcessing(FlowSpec):

  @step
  def a(self):
    ....

    self.next(self.b, self.c)

  @step
  def c(self):
    result1 = {}

    ....

    self.next(self.join)

  @step
  def b(self):
    ....

    self.next(self.d, self.e)

  @step
  def e(self):
    result2 = []
    .....

    self.next(self.join)

  @step
  def d(self):
    ....

    self.next(self.f, self.g)

  @step
  def f(self):
    result3 = []
    ....

    self.next(self.join)

  @step
  def g(self):
    result4 = []
    .....

    self.next(self.join)


  @step
  def join(self, results):
    data = [results.c.result, results.e.result2, result.f.result3, result.g.result4]
    print(data)

    self.next(self.end)

  @step
  def end(self):
    pass

etl = TextProcessing()

在运行python main.py run时,我得到以下错误:-

代码语言:javascript
运行
复制
Metaflow 2.2.10 executing TextProcessing for user:ubuntu
Validating your flow...
    Validity checker found an issue on line 83:
    Step join seems like a join step (it takes an extra input argument) but an incorrect number of steps (c, e, f, g) lead to it. This join was expecting 2 incoming paths, starting from splitted step(s) f, g.

有人能指出我哪里出了问题吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-04-30 15:27:10

在再次仔细阅读文档之后,我意识到我没有正确地处理连接。根据metaflow-2.2.10的文档:-

请注意,您可以任意嵌套分支,也就是说,您可以在分支中分支。只需记住加入您创建的所有分支。

这意味着每个分支都应该连接起来。为了连接来自分支的值,metaflow提供了merge_artifacts实用程序函数来帮助传播明确的值。

因为工作流中有三个分支,因此添加了三个连接步骤来合并结果。

下列改变对我有效:-

代码语言:javascript
运行
复制
from metaflow import FlowSpec, step

class TextProcessing(FlowSpec):

  @step
  def a(self):
    ....

    self.next(self.b, self.c)

  @step
  def c(self):
    result1 = {}

    ....

    self.next(self.merge_3)

  @step
  def b(self):
    ....

    self.next(self.d, self.e)

  @step
  def e(self):
    result2 = []
    .....

    self.next(self.merge_2)

  @step
  def d(self):
    ....

    self.next(self.f, self.g)

  @step
  def f(self):
    result3 = []
    ....

    self.next(self.merge_1)

  @step
  def g(self):
    result4 = []
    .....

    self.next(self.merge_1)

  @step
  def merge_1(self, results):
    self.result = {
      'result4' : results.g.result4,
      'result3' : results.f.result3
    }

    self.next(self.merge_2)

  @step
  def merge_2(self, results):
    self.result = { 'result2' : results.e.result2, **results.merge_1.result }
    self.merge_artifacts(results, include=['result'])
    self.next(self.merge_3)

  @step
  def merge_3(self, results):
    self.result = { 'c' : results.c.result1, **results.merge_2.result }
    self.merge_artifacts(results, include=['result'])
    self.next(self.end)

  @step
  def end(self):
    print(self.result)

etl = TextProcessing()
票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67333103

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档