我正试图将PCollection of Strings转换成BQ的集合。我的Apache版本是2.41和JAVA 11。我尝试了多种方法,但无法修复这个错误。TableSchema从avro文件中加载,并将其作为ValueProvider提供给pcollection。
请帮我把这个修好。
代码:
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
options.setTempLocation("data/temp/");
Pipeline p = Pipeline.create(options);
BeamShemaUtil beamShemaUtil = new BeamShemaUtil("data/ship_data_schema.avsc");
TableSchema tableSchema = beamShemaUtil.convertBQTableSchema();
ValueProvider<TableSchema> ts= ValueProvider.StaticValueProvider.of(tableSchema);
PCollection<String> pc1 = p.apply(TextIO.read().from("data/ship_data.csv"));
PCollection<TableRow> pc2 = pc1.apply(MapElements.via(new ConvertStringToTableRow(ts))) ;
PipelineResult result = p.run();
result.waitUntilFinish();
SimpleFunction类
public static class ConvertStringToTableRow extends SimpleFunction<String, TableRow> {
ValueProvider<TableSchema> tableSchema;
public ConvertStringToTableRow(ValueProvider<TableSchema> tableSchema) {
this.tableSchema = tableSchema;
}
public TableRow buildTableRow(TableSchema sc,String[] arr) {
List<TableFieldSchema> fieldSchemaList = sc.getFields();
List<String> data = Arrays.stream(arr).collect(Collectors.toList());
TableRow row = new TableRow();
TableCell record = new TableCell();
List<TableCell> tc = new ArrayList<TableCell>();
for ( int i = 0; i < fieldSchemaList.size(); i++ ){
TableFieldSchema sc2 = fieldSchemaList.get(i);
String fieldName = sc2.getName();
String fieldType = sc2.getType();
String fieldValue = data.get(i);
if (fieldValue.isEmpty()) {
record.set(fieldName,null);
tc.add(record);
}
else {
switch (fieldType) {
case "STRING":
record.set(fieldName,fieldValue);
tc.add(record);
case "BYTES":
record.set(fieldName,fieldValue.getBytes());
tc.add(record);
case "INT64":
record.set(fieldName,Integer.valueOf(fieldValue));
tc.add(record);
case "INTEGER":
record.set(fieldName,Integer.valueOf(fieldValue));
tc.add(record);
case "FLOAT64":
record.set(fieldName,Float.valueOf(fieldValue));
tc.add(record);
case "FLOAT":
record.set(fieldName,Float.valueOf(fieldValue));
tc.add(record);
case "BOOL":
case "BOOLEAN":
case "NUMERIC":
record.set(fieldName,Integer.valueOf(fieldValue));
tc.add(record);
case "TIMESTAMP":
case "TIME":
case "DATE":
case "DATETIME":
case "STRUCT":
case "RECORD":
default:
// row.set(fieldName,fieldValue);
// throw new UnsupportedOperationException("Unsupported BQ Data Type");
}
}
}
return row.setF(tc);
}
@Override
public TableRow apply(String element) {
String[] arr = element.split(",");
// BeamShemaUtil beamShemaUtil = new BeamShemaUtil("data/ship_data_schema.avsc");
// TableSchema tableSchema = beamShemaUtil.convertBQTableSchema();
TableRow row = buildTableRow(tableSchema.get(), arr);
return row;
}
错误信息:
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.transforms.MapElements$1@270a620, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[], fieldAccessDescriptor=*}}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:59)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:737)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:268)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:877)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:264)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:225)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:191)
at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:248)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:788)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:803)
at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:274)
at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:290)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:268)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:218)
at org.apache.beam.runners.direct.DirectRunner.performRewrites(DirectRunner.java:254)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:175)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at BuildWriteBQTableRowExample01.main(BuildWriteBQTableRowExample01.java:50)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.model.TableSchema
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
... 26 more
Process finished with exit code 1
发布于 2022-11-13 23:08:41
我建议你一个解决方案,这并不完美,但我希望它能有所帮助。
您可以为表架构使用自己的结构,并将TableFieldSchema
转换为实现Serializable
的自定义创建对象,例如:
public class MyTableSchemaFields implement Serializable {
private String fieldName;
private String fieldType;
// Constructor
.....
// Getters and setters
.......
}
public List<MyTableSchemaFields> toMyTableSchemaFields(final List<TableFieldSchema> schemaFields) {
return schemaFields.stream()
.map(this::toMyTableSchemaField)
.collect(Collectors.toList());
}
public List<MyTableSchemaFields> toMyTableSchemaField(final TableFieldSchema schemaField) {
MyTableSchemaFields field = new MyTableSchemaFields();
field.setFieldName(schemaField.getName());
field.setFieldType(schemaField.getType());
return field;
}
然后,在您的程序的其余部分,使用MyTableSchemaFields
而不是TableFieldSchema
:
public static class ConvertStringToTableRow extends SerializableFunction<String, TableRow> {
List<MyTableSchemaFields> schemaFields;
public ConvertStringToTableRow(List<MyTableSchemaFields> schemaFields) {
this.schemaFields = schemaFields;
}
public TableRow buildTableRow(List<MyTableSchemaFields> schemaFields,String[] arr) {
...........
对于ConvertStringToTableRow
类,我在示例中使用了一个SerializableFunction
,而不是SimpleFunction
。
发布于 2022-11-13 04:34:18
TableSchema不是可序列化,所以JVM/Runner不能复制包装在StaticValueProvider中的实例。这与这里看到的问题类似:使用Apache从Dynamo读取特定记录
有关更多信息,请查看https://beam.apache.org/documentation/programming-guide/#user-code-serializability。
在您的特定场景中,我的建议是在TableSchema本身内创建ValueProvider,而不是依赖序列化。虽然我还没有用您的代码进行测试,但我相信类似的内容已经足够了:
PCollection<String> pc1 = p.apply(TextIO.read().from("data/ship_data.csv"));
PCollection<TableRow> pc2 = pc1.apply(MapElements.via(
new ConvertStringToTableRow(
() -> new BeamShemaUtil("data/ship_data_schema.avsc").convertBQTableSchema()
)));
PipelineResult result = p.run();
result.waitUntilFinish();
https://stackoverflow.com/questions/74418008
复制相似问题