首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何解套数据流中的嵌套PCollection

如何解套数据流中的嵌套PCollection
EN

Stack Overflow用户
提问于 2019-04-06 00:36:22
回答 1查看 678关注 0票数 0

要连接两个嵌套结构的堆栈,我们需要在执行连接之前取消PCollection的嵌套,因为遇到了挑战(请参阅我的另一个堆栈溢出案例a link)。所以想知道如何解除PCollection的嵌套。如果有人能给出连接两个嵌套表或如何解除PCollections嵌套想法,那就太好了。

我刚刚注意到,我们使用PTransform "Unnest“(link)将集合从嵌套集合中解套出来。但我在网上找不到任何样品。然而,我只是尝试用下面的步骤来实现它,以转换嵌套集合,但仍然无法获得最后的未嵌套集合。

1) PCollection empCollection = ReadCollection();2)使用Pardo函数将值从PCollection (com.google.api.services.bigquery.model.TableRow)转换为PCollection(org.apache.beam.sdk.values.Row) 3)定义模式,如下所示:模式项目= Schema.builder().addStringField("empNo").addStringField("empName").addArrayField("Projects",模式雇员=PCollection ReadCollection(项目)).build();4)使用Unnest取消嵌套嵌套的集合

代码语言:javascript
运行
复制
PCollection<Row> pcColl = targetRowCollection.apply(Unnest.<Row>create().withFieldNameFunction(new SerializableFunction<java.util.List<java.lang.String>, java.lang.String>() {
@Override
public java.lang.String apply(java.util.List<java.lang.String> input) {
    return String.join("+", input);
    }
}));

5)使用Pardo函数将值从PCollection(org.apache.beam.sdk.values.Row)转换为PCollection (com.google.api.services.bigquery.model.TableRow)

有没有人可以帮我,用这个Unnest转换把嵌套的集合转换成未嵌套的集合。

EN

回答 1

Stack Overflow用户

发布于 2019-06-11 14:28:34

在python中用梁连接两个嵌套结构的Pcollection的代码:

代码语言:javascript
运行
复制
with beam.Pipeline(options=option) as p:

    source_record1 =  p | "get data1" >> beam.io.avroio.ReadFromAvro(input_file1)
    source_record2 =  p | "get data2" >> beam.io.avroio.ReadFromAvro(input_file2)

    #convert into <k,v> form
    keyed_record1 = source_record1 | beam.ParDo(addkeysnested(),join_fileld_names1)
    keyed_record2 = source_record2 | beam.ParDo(addkeysnested(),join_fileld_names2)

    #Apply join operation
    rjoin = ({'File1Info': keyed_record1, 'File2Info': keyed_record2}                     
               | beam.CoGroupByKey())


    class addkeysnested(beam.DoFn):
        def process(self,element,fieldName):
            tmp_record = element    
            fieldName = fieldName.split(".")
            for i in range(len(fieldName)):

                if i != len(fieldName) - 1 :
                    tmp_record = tmp_record[fieldName[i].strip()][0]

                else:
                    tmp_record = tmp_record[fieldName[i].strip()]   

        return [(tmp_record,element)]

注意:在上面的代码中,我们可以在任何级别的嵌套字段中获取键值,即personalInfo.Address.City,然后应用CoGroupByKey()来连接两个pcollection

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55539922

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档