这是一个困扰我两天的问题。
我可以从本地读取json数据,但是当我把它写成拼板时会出错.
我的代码如下:
public class parquet_save_convert {
private static final Schema SCHEMA = new Schema.Parser().parse(
"{ \n" +
" \"namespace\": \"com.navteq.avro\", \n" +
" \"name\": \"FacebookUser\", \n" +
" \"type\": \"record\",\n" +
" \"fields\": [\n" +
" {\"name\": \"event_level\", \"type\": \"string\"},\n" +
" {\"name\": \"spm_page\", \"type\": \"string\"},\n" +
" {\"name\": \"spm_module\", \"type\": \"string\"} ]\n" +
"}");
public static void main(String[] args) {
Gson gson=new GsonBuilder().create();
String outputPath = "./output/parquet";
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.read().from("./input/event_type.json"))
.apply(ParDo.of(new DoFn<String,GenericRecord>(){
@ProcessElement
public void processElement(ProcessContext c){
HashMap<String,String> map= gson.fromJson(c.element().toString(),HashMap.class);
GenericRecord osRecord = new GenericData.Record(SCHEMA);
map.forEach((k,v)->{
osRecord.put(k,v);
});
c.output(osRecord);
}
}))
.setCoder(AvroCoder.of(GenericRecord.class,SCHEMA))
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(SCHEMA)).to(outputPath)
.withSuffix(".parquet"));
pipeline.run().waitUntilFinish();}
我输入的数据如下:
{"event_level":"item","spm_page":"Activity","spm_module":"click"}
{"event_level":"page","spm_page":"Activity","spm_module":"action"}
{"event_level":"page","spm_page":"Activity","spm_module":"click"}
{"event_level":"item","spm_page":"Activity","spm_module":"action"}
例外是:
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=parquet_save_convert$1@5d10455d, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
我也谷歌这个问题,但没有得到一个答案,它使我感到非常沮丧。
提前谢谢。
发布于 2020-09-10 22:16:55
我猜您的Gson对象是不可序列化的。解决方案可能是将其初始化为DoFn的SetUp方法中的局部变量。
https://stackoverflow.com/questions/63793745
复制相似问题