我正在尝试创建一个管道,在GCS文件夹中等待新的csv文件来处理它们并将输出写入BigQuery。
我编写了以下代码:
public static void main(String[] args) {
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class));
TableReference tableRef = new TableReference();
tableRef.setProjectId(PROJECT_ID);
tableRef.setDatasetId(DATASET_ID);
tableRef.setTableId(TABLE_ID);
//Pipeline p = Pipeline.create(PipelineOptionsFactory.as(Options.class));
// Read files as they arrive in GS
p.apply("ReadFile", TextIO.read()
.from("gs://mybucket/*.csv")
.watchForNewFiles(
// Check for new files every 30 seconds
Duration.standardSeconds(30),
// Never stop checking for new files
Watch.Growth.<String>never()
)
)
.apply(ParDo.of(new DoFn<String, Segment>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] items = c.element().split(",");
if (items[0].startsWith("_", 1)) {
// Skip header (the header is starting with _comment)
LOG.info("Skipped header");
return;
}
Segment segment = new Segment(items);
c.output(segment);
}
}))
.apply(ParDo.of(new FormatSegment()))
.apply(BigQueryIO.writeTableRows()
.to(tableRef)
.withSchema(FormatSegment.getSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
// Run the pipeline.
p.run();
}
如果我删除了watchForNewFiles
部分,我的代码工作得很好(我看到了有关写到GCS位置的并行化的信息日志,最后的输出被写入BigQuery)。
但是如果我让watchForNewFiles
(上面的代码),那么我只看到一个信息日志(关于写到GCS位置)和执行卡住了。BigQuery中没有日志,也没有错误,也没有输出。
有什么想法吗?
发布于 2018-03-19 07:24:13
看起来,在使用waitForNewFiles()
时,我们必须使用BigQueryIO.Write.Method.STREAMING_INSERTS
方法写入BigQuery。
现在工作的代码如下所示:
.apply(BigQueryIO.writeTableRows()
.to(tableRef)
.withSchema(FormatSegment.getSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
发布于 2018-05-22 07:43:23
使用DataflowRunner时,我会尝试使用..。org.apache.beam.sdk.transforms.Watch$WatchGrowthFn@4a1691ac : java.lang.UnsupportedOperationException: DataflowRunner目前不支持可拆分的DoFn: DoFn
使用直接运行程序,我看到它进行轮询,但管道的其余部分似乎没有触发,也没有错误。写信给数据存储和bigquery。
https://stackoverflow.com/questions/49364834
复制