首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >TextIO.read().watchForNewFiles()防止写入BigQuery

TextIO.read().watchForNewFiles()防止写入BigQuery
EN

Stack Overflow用户
提问于 2018-03-19 06:01:01
回答 2查看 880关注 0票数 1

我正在尝试创建一个管道,在GCS文件夹中等待新的csv文件来处理它们并将输出写入BigQuery。

我编写了以下代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static void main(String[] args) {

    Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class));
    TableReference tableRef = new TableReference();
    tableRef.setProjectId(PROJECT_ID);
    tableRef.setDatasetId(DATASET_ID);
    tableRef.setTableId(TABLE_ID);
    //Pipeline p = Pipeline.create(PipelineOptionsFactory.as(Options.class));

    // Read files as they arrive in GS
    p.apply("ReadFile", TextIO.read()
        .from("gs://mybucket/*.csv")
        .watchForNewFiles(
            // Check for new files every 30 seconds
            Duration.standardSeconds(30),
            // Never stop checking for new files
            Watch.Growth.<String>never()
        )
    )
    .apply(ParDo.of(new DoFn<String, Segment>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String[] items = c.element().split(",");

            if (items[0].startsWith("_", 1)) {
                // Skip header (the header is starting with _comment)
                LOG.info("Skipped header");
                return;
            }

            Segment segment = new Segment(items);
            c.output(segment);
        }
    }))
    .apply(ParDo.of(new FormatSegment()))
    .apply(BigQueryIO.writeTableRows()
        .to(tableRef)
        .withSchema(FormatSegment.getSchema())
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

    // Run the pipeline.
    p.run();
}

如果我删除了watchForNewFiles部分,我的代码工作得很好(我看到了有关写到GCS位置的并行化的信息日志,最后的输出被写入BigQuery)。

但是如果我让watchForNewFiles (上面的代码),那么我只看到一个信息日志(关于写到GCS位置)和执行卡住了。BigQuery中没有日志,也没有错误,也没有输出。

有什么想法吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-03-19 07:24:13

看起来,在使用waitForNewFiles()时,我们必须使用BigQueryIO.Write.Method.STREAMING_INSERTS方法写入BigQuery。

现在工作的代码如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
.apply(BigQueryIO.writeTableRows()
        .to(tableRef)
        .withSchema(FormatSegment.getSchema())
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
票数 3
EN

Stack Overflow用户

发布于 2018-05-22 07:43:23

使用DataflowRunner时,我会尝试使用..。org.apache.beam.sdk.transforms.Watch$WatchGrowthFn@4a1691ac : java.lang.UnsupportedOperationException: DataflowRunner目前不支持可拆分的DoFn: DoFn

使用直接运行程序,我看到它进行轮询,但管道的其余部分似乎没有触发,也没有错误。写信给数据存储和bigquery。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49364834

复制
相关文章
【HTML】Iframe中的onload事件
当iframe.src重新指定一个url时会重新执行iframe的onload事件 <iframe id="indexFrame" name="index" width="800" onload='iFrameHeight("indexFrame");'                 frameborder="0" marginwidth="0" marginheight="0" scrolling="no"></iframe> html生成时,会执行iframe中的onload事件 当iframe.sr
悟空聊架构
2018/05/18
3.4K0
定时任务引发的挂库
Archived Log entry 2000 added for T-1.S-288759 ID 0x5a3a0712 LAD:1
杨漆
2021/07/07
2700
定时任务引发的挂库
log4j.properties连接到数据库中
<code> # For JBoss: Avoid to setup Log4J outside $JBOSS_HOME/server/default/deploy/log4j.xml! # For all other servers: Comment out the Log4J listener in web.xml to activate Log4J. log4j.rootLogger=INFO, stdout, logfile, DATABASE
全栈程序员站长
2021/04/28
3800
onload事件
1.window.onload <script> window.onload=Firstfunction(); window.onload=Secondfunction(); function Firstfunction(){ alert("我是第一个") } function Secondfunction(){ alert("我是第二个") } </script> 上述代码只会执行第二个,因为代码的覆盖 2.解决方案
天天_哥
2018/09/29
1.4K0
连接到PostgreSQL数据库
PG默认只接受本地的访问,这个规则是由参数listen_addresses控制的
姚远OracleACE
2023/09/06
3970
连接到PostgreSQL数据库
python-数据库编程-如何在Python中连接到数据库
在Python中,我们可以使用各种模块来连接到关系型数据库并进行操作,如MySQL、PostgreSQL、SQLite等。
玖叁叁
2023/04/22
1.2K0
如何使用Python连接到驻留在内存中的SQLite数据库?
SQLite 是一种流行的、轻量级的、独立的数据库引擎,广泛用于各种应用程序。SQLite的独特功能之一是它能够在内存中创建数据库,这允许更快的数据访问和操作。在本文中,我们将探讨如何使用 Python 连接到内存中的 SQLite 数据库,提供分步说明、代码示例、解释和示例输出。
很酷的站长
2023/08/11
6810
如何使用Python连接到驻留在内存中的SQLite数据库?
数据库使用教程:如何在.NET中连接到MySQL数据库
dbForge Studio for MySQL是一个在Windows平台被广泛使用的MySQL客户端,它能够使MySQL开发人员和管理人员在一个方便的环境中与他人一起完成创建和执行查询,开发和调试MySQL程序,自动化管理MySQL数据库对象等工作。
麻烦成了精
2020/11/12
5.6K0
关于IFRAME的onload事件
昨天遇到一个关于iframe的问题,比如a页面中嵌入了一个iframe称为a_iframe,如果直接在a_iframe的标签上直接加入属性的设置,οnlοad=’’,这样才onload事件才是起作用的,网上打听了下,具体原因不明,但是是有解决方法的:
全栈程序员站长
2022/07/07
1.5K0
onload 和 domready
window.onload 事件会在页面或图像加载完成后触发(即所有元素的资源都下载完毕)
Krry
2018/10/15
2.7K0
库中是如何实现string类的?
需要注意的是,如果采用无参构造,刚开始容量是0. 这就导致是初次扩容,容量开始是0,所以这里要判断扩容前,容量是否是0,再考虑1.5倍或者二倍扩容.
初阶牛
2023/10/14
1800
库中是如何实现string类的?
javascript当中onload用法
<meta http-equiv="content-type" content="text/html; charset=utf-8"/>
马克java社区
2019/10/10
7590
javascript当中onload用法
JavaScript之共享onload
我们知道,当我们将JS代码脚本放到<head></head>标签之间时,这是的js代码加载要先于DOM加载,而我们往往会在JS代码脚本中写一些获取DOM元素的代码,而此时的DOM是不完整的, 所以我们通常的解决方法是将函数放入到window.onload里面去,window.load事件是网页加载完毕时会触发的一个事件,如果将我们的函数与之绑定,我们的函数也会在页面加载完毕之后执行. 如下代码: <html xmlns="http://www.w3.org/1999/xhtml"> <head> <
郑小超.
2018/01/24
8170
No JNI_OnLoad found in /data/data/
0. 些在前面: 最近,又用到ndk去进行jni的开发了,居然连最简单的hello-jni都没有编译过。
Java架构师历程
2018/09/26
1.5K0
jQuery onload与ready
jQuery是一种流行的JavaScript库,用于简化在网页中操作和处理HTML文档的过程。在jQuery中,有两个常用的事件处理方法,即$(document).ready()和$(window).on("load", function()),用于在页面加载时执行JavaScript代码。这两种方法在特定情况下有所不同。
堕落飞鸟
2023/05/18
7910
DOMContentLoaded和window.onload
相信写js的。都知道window.onload吧,可是并非每一个人都知道DOMContentLoaded,事实上即使你不知道。非常有可能你也常常使用了这个东西。
全栈程序员站长
2022/07/06
1.5K0
python连接到SQList数据库以及简单操作
背景了解: Python就内置了SQLite3,所以,在Python中使用SQLite,不需要安装任何东西,直接使用。Python定义了一套操作数据库的API接口,任何数据库要连接到Python, 只需要提供符合Python标准的数据库驱动即可 1:代码
Python研究者
2020/09/28
9170
python连接到SQList数据库以及简单操作
Python标准库collections中与字典有关的类
Python标准库中提供了很多扩展功能,大幅度提高了开发效率。这里主要介绍OrderedDict类、defaultdict类和Counter类。 (1)OrderedDict类 Python内置字典dict是无序的,如果需要一个可以记住元素插入顺序的字典,可以使用collections.OrderedDict。例如: >>> import collections >>> x = collections.OrderedDict() #有序字典 >>> x['a'] = 3 >>> x['b'] = 5 >>
Python小屋屋主
2018/04/16
1.4K0
Postgresql之查看当前连接到数据库的client信息
    在使用postgresql时,会报connection too many问题,导致拿不到连接数,在mysql中可以通过show process list来查看连接到数据库的client信息,那
克虏伯
2020/05/14
5.9K0
muduo网络库学习之MutexLock类、MutexLockGuard类、Condition类、CountDownLatch类封装中的知识点
该文介绍了如何使用 muduo 库实现 C++ 多线程服务端,并总结了在多线程环境下使用 CountDownLatch 进行线程同步需要注意事项。
s1mba
2017/12/28
1.2K0
muduo网络库学习之MutexLock类、MutexLockGuard类、Condition类、CountDownLatch类封装中的知识点

相似问题

执行sql代码以在zend框架中创建表和数据库

11

如何读取包含SQL代码的文本文件并执行

35

读取包含数组的文本文件-在表中显示

24

读取文本文件以将数据插入Oracle SQL表

67

SQL数据库表连接以创建IdealTable

48
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文