我有一个Apache管道,它只使用Java在python管道中的防火墙转换,它借助一个公开了存储读取转换的扩展服务。
在这里,我试图列出集合中名为pokemon的所有文档
这是我的python管道:
def run():
"""Main function that defines pipeline and runs it."""
pipeline_options = get_pipeline_options(**vars(args))
request = ListDocumentsRequest()
request.parent = 'projects/PROJECT_ID/databases'
request.collection_id = 'pokemon'
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (pipeline
| 'Create Requests' >> beam.Create([request.transaction])
.with_output_types(typing.List[bytes])
| 'Read from JavaFirestore' >> beam.ExternalTransform(
'my.beam.transform.firestore_list_documents',
ImplicitSchemaPayloadBuilder({'parent': request.parent,
'collectionId': request.collection_id}),
"localhost:12345")
| 'Write' >> WriteToText('output_files/output')
)这是在扩展服务中公开转换/URN的Java代码:
public static class FirestoreListDocumentsBuilder implements
ExternalTransformBuilder<FirestoreTransformsConfiguration, PCollection<ListDocumentsRequest>, PCollection<Document>> {
@Override
public PTransform<PCollection<ListDocumentsRequest>, PCollection<Document>> buildExternal(
FirestoreTransformsConfiguration configuration) {
return FirestoreIO.v1().read().listDocuments().build();
}
}
@AutoService(ExternalTransformRegistrar.class)
public class FirestoreTransformsRegistrar implements ExternalTransformRegistrar {
final static String URN_LIST_DOCS = "my.beam.transform.firestore_list_documents";
@Override
public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
return ImmutableMap.of(
URN_LIST_DOCS,new FirestoreListDocumentsBuilder()
);
}
}但是,在运行python管道时,我得到了以下错误:
java.lang.IllegalArgumentException: Unknown Coder URN beam:coder:pickled_python:v1. Known URNs: [beam:coder:avro:generic:v1, beam:coder:bytes:v1, beam:coder:bool:v1, beam:coder:string_utf8:v1, beam:coder:kv:v1, beam:coder:varint:v1, beam:coder:interval_window:v1, beam:coder:iterable:v1, beam:coder:timer:v1, beam:coder:length_prefix:v1, beam:coder:global_window:v1, beam:coder:windowed_value:v1, beam:coder:param_windowed_value:v1, beam:coder:double:v1, beam:coder:row:v1, beam:coder:sharded_key:v1, beam:coder:custom_window:v1]我很难找到解决这个问题的办法,有人能帮我吗?
发布于 2022-02-11 23:43:10
这是因为(令人惊讶的)列表不是一个“标准”的跨语言编码器。尝试使用with_output_types(typing.Iterable[bytes])代替。(您的java代码必须进行更改,才能与PCollection of Iterable<byte[]>匹配。)
https://stackoverflow.com/questions/71077551
复制相似问题