首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用自定义扩展服务运行梁时未知编码器URN beam:coder:pickled_python:v1

使用自定义扩展服务运行梁时未知编码器URN beam:coder:pickled_python:v1
EN

Stack Overflow用户
提问于 2022-02-11 09:09:12
回答 2查看 464关注 0票数 0

我有一个Apache管道,它只使用Java在python管道中的防火墙转换,它借助一个公开了存储读取转换的扩展服务。

在这里,我试图列出集合中名为pokemon的所有文档

这是我的python管道:

代码语言:javascript
复制
def run():
      """Main function that defines pipeline and runs it."""
      pipeline_options = get_pipeline_options(**vars(args))

      request = ListDocumentsRequest()
      request.parent = 'projects/PROJECT_ID/databases'
      request.collection_id = 'pokemon'

      with beam.Pipeline(options=pipeline_options) as pipeline:
        _ = (pipeline
            | 'Create Requests' >> beam.Create([request.transaction])
                .with_output_types(typing.List[bytes])
            | 'Read from JavaFirestore' >> beam.ExternalTransform(
                'my.beam.transform.firestore_list_documents',
                ImplicitSchemaPayloadBuilder({'parent': request.parent,
                                            'collectionId': request.collection_id}),
                "localhost:12345")
            | 'Write' >> WriteToText('output_files/output')
            )

这是在扩展服务中公开转换/URN的Java代码:

代码语言:javascript
复制
public static class FirestoreListDocumentsBuilder implements
      ExternalTransformBuilder<FirestoreTransformsConfiguration, PCollection<ListDocumentsRequest>, PCollection<Document>> {
  
    @Override
    public PTransform<PCollection<ListDocumentsRequest>, PCollection<Document>> buildExternal(
      FirestoreTransformsConfiguration configuration) {
      return FirestoreIO.v1().read().listDocuments().build();
    }
  }

@AutoService(ExternalTransformRegistrar.class)
public class FirestoreTransformsRegistrar implements ExternalTransformRegistrar {

  final static String URN_LIST_DOCS = "my.beam.transform.firestore_list_documents";  

  @Override
  public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
    return ImmutableMap.of(
      URN_LIST_DOCS,new FirestoreListDocumentsBuilder()
    );
  }
}

但是,在运行python管道时,我得到了以下错误:

代码语言:javascript
复制
java.lang.IllegalArgumentException: Unknown Coder URN beam:coder:pickled_python:v1. Known URNs: [beam:coder:avro:generic:v1, beam:coder:bytes:v1, beam:coder:bool:v1, beam:coder:string_utf8:v1, beam:coder:kv:v1, beam:coder:varint:v1, beam:coder:interval_window:v1, beam:coder:iterable:v1, beam:coder:timer:v1, beam:coder:length_prefix:v1, beam:coder:global_window:v1, beam:coder:windowed_value:v1, beam:coder:param_windowed_value:v1, beam:coder:double:v1, beam:coder:row:v1, beam:coder:sharded_key:v1, beam:coder:custom_window:v1]

我很难找到解决这个问题的办法,有人能帮我吗?

EN

Stack Overflow用户

发布于 2022-02-11 23:43:10

这是因为(令人惊讶的)列表不是一个“标准”的跨语言编码器。尝试使用with_output_types(typing.Iterable[bytes])代替。(您的java代码必须进行更改,才能与PCollection of Iterable<byte[]>匹配。)

票数 2
EN
查看全部 2 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71077551

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档