首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >无法序列化DoFnWithExecutionInformation,在生成TableRow时获取此错误

无法序列化DoFnWithExecutionInformation,在生成TableRow时获取此错误
EN

Stack Overflow用户
提问于 2022-11-13 02:16:46
回答 2查看 55关注 0票数 0

我正试图将PCollection of Strings转换成BQ的集合。我的Apache版本是2.41和JAVA 11。我尝试了多种方法,但无法修复这个错误。TableSchema从avro文件中加载,并将其作为ValueProvider提供给pcollection。

请帮我把这个修好。

代码:

代码语言:javascript
运行
复制
public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(DirectRunner.class);
        options.setTempLocation("data/temp/");
        Pipeline p = Pipeline.create(options);

        BeamShemaUtil beamShemaUtil = new BeamShemaUtil("data/ship_data_schema.avsc");
        TableSchema tableSchema = beamShemaUtil.convertBQTableSchema();
        ValueProvider<TableSchema> ts= ValueProvider.StaticValueProvider.of(tableSchema);

        PCollection<String>   pc1 = p.apply(TextIO.read().from("data/ship_data.csv"));
        PCollection<TableRow> pc2 = pc1.apply(MapElements.via(new ConvertStringToTableRow(ts))) ;
        PipelineResult result = p.run();
        result.waitUntilFinish();

SimpleFunction类

代码语言:javascript
运行
复制
 public static  class ConvertStringToTableRow extends SimpleFunction<String, TableRow> {
        ValueProvider<TableSchema>  tableSchema;
        public  ConvertStringToTableRow(ValueProvider<TableSchema> tableSchema) {
            this.tableSchema = tableSchema;
        }
public TableRow buildTableRow(TableSchema sc,String[] arr) {
            List<TableFieldSchema> fieldSchemaList = sc.getFields();
            List<String> data = Arrays.stream(arr).collect(Collectors.toList());
            TableRow row = new TableRow();
            TableCell record = new TableCell();
            List<TableCell> tc = new ArrayList<TableCell>();

            for ( int i = 0; i < fieldSchemaList.size(); i++  ){
                TableFieldSchema sc2 = fieldSchemaList.get(i);
                String fieldName = sc2.getName();
                String fieldType = sc2.getType();
                String fieldValue = data.get(i);

                if (fieldValue.isEmpty()) {
                    record.set(fieldName,null);
                    tc.add(record);
                }
                else {
                    switch (fieldType) {
                        case "STRING":
                           record.set(fieldName,fieldValue);
                           tc.add(record);
                        case "BYTES":
                            record.set(fieldName,fieldValue.getBytes());
                            tc.add(record);
                        case "INT64":
                            record.set(fieldName,Integer.valueOf(fieldValue));
                            tc.add(record);
                        case "INTEGER":
                            record.set(fieldName,Integer.valueOf(fieldValue));
                            tc.add(record);
                        case "FLOAT64":
                            record.set(fieldName,Float.valueOf(fieldValue));
                            tc.add(record);
                        case "FLOAT":
                            record.set(fieldName,Float.valueOf(fieldValue));
                            tc.add(record);
                        case "BOOL":
                        case "BOOLEAN":
                        case "NUMERIC":
                            record.set(fieldName,Integer.valueOf(fieldValue));
                            tc.add(record);
                        case "TIMESTAMP":
                        case "TIME":
                        case "DATE":
                        case "DATETIME":
                        case "STRUCT":
                        case "RECORD":
                        default:
                            //  row.set(fieldName,fieldValue);
                            //  throw new UnsupportedOperationException("Unsupported BQ Data Type");
                    }
                }

            }
            return  row.setF(tc);

        }
@Override
        public TableRow apply(String element) {
            String[] arr = element.split(",");

           // BeamShemaUtil beamShemaUtil = new BeamShemaUtil("data/ship_data_schema.avsc");
           // TableSchema tableSchema = beamShemaUtil.convertBQTableSchema();
            TableRow row = buildTableRow(tableSchema.get(), arr);
            return row;
        }

错误信息:

代码语言:javascript
运行
复制
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.transforms.MapElements$1@270a620, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[], fieldAccessDescriptor=*}}
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:59)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:737)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:268)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:877)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:264)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:225)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:191)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:248)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:788)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:803)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:274)
    at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:290)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
    at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:268)
    at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:218)
    at org.apache.beam.runners.direct.DirectRunner.performRewrites(DirectRunner.java:254)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:175)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
    at BuildWriteBQTableRowExample01.main(BuildWriteBQTableRowExample01.java:50)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.model.TableSchema
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
    ... 26 more

Process finished with exit code 1
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-11-13 23:08:41

我建议你一个解决方案,这并不完美,但我希望它能有所帮助。

您可以为表架构使用自己的结构,并将TableFieldSchema转换为实现Serializable的自定义创建对象,例如:

代码语言:javascript
运行
复制
public class MyTableSchemaFields implement Serializable {
     private String fieldName;
     private String fieldType;
     
     // Constructor
     .....

     // Getters and setters
     .......
}

public List<MyTableSchemaFields> toMyTableSchemaFields(final List<TableFieldSchema> schemaFields) {
    return schemaFields.stream()
                .map(this::toMyTableSchemaField)
                .collect(Collectors.toList());
}


public List<MyTableSchemaFields> toMyTableSchemaField(final TableFieldSchema schemaField) {
    MyTableSchemaFields field = new MyTableSchemaFields();
    field.setFieldName(schemaField.getName());
    field.setFieldType(schemaField.getType());

    return field;
}

然后,在您的程序的其余部分,使用MyTableSchemaFields而不是TableFieldSchema

代码语言:javascript
运行
复制
public static  class ConvertStringToTableRow extends SerializableFunction<String, TableRow> {
        List<MyTableSchemaFields>  schemaFields;
        public  ConvertStringToTableRow(List<MyTableSchemaFields> schemaFields) {
            this.schemaFields = schemaFields;
        }

public TableRow buildTableRow(List<MyTableSchemaFields> schemaFields,String[] arr) {
            ...........

对于ConvertStringToTableRow类,我在示例中使用了一个SerializableFunction,而不是SimpleFunction

票数 0
EN

Stack Overflow用户

发布于 2022-11-13 04:34:18

TableSchema不是可序列化,所以JVM/Runner不能复制包装在StaticValueProvider中的实例。这与这里看到的问题类似:使用Apache从Dynamo读取特定记录

有关更多信息,请查看https://beam.apache.org/documentation/programming-guide/#user-code-serializability

在您的特定场景中,我的建议是在TableSchema本身内创建ValueProvider,而不是依赖序列化。虽然我还没有用您的代码进行测试,但我相信类似的内容已经足够了:

代码语言:javascript
运行
复制
PCollection<String>   pc1 = p.apply(TextIO.read().from("data/ship_data.csv"));
PCollection<TableRow> pc2 = pc1.apply(MapElements.via(
             new ConvertStringToTableRow(
                () -> new BeamShemaUtil("data/ship_data_schema.avsc").convertBQTableSchema()
             )));
PipelineResult result = p.run();
result.waitUntilFinish();
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74418008

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档