我正在尝试用开源框架来做ETL,我听说过两件事,Apache Beam和Apache Airflow,这两件事最适合整个ETL或ELT,比如Talend、Azure Data Factory等,事实上,我正在尝试用云数据仓库(redshift、azure数据仓库、雪花等)来做所有的事情。哪一个对这些类型的工作更好,如果我能在这两个框架之间进行一些比较,那就太好了。提前谢谢。
我正在运行一个Apache管道(与Google一起部署),该管道是由Apache气流组织的。
DAG文件如下所示:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator
import custom_py_file #beam job in this file
default_args = {
'owner': 'name',
我的管道通过GCS通过Pub\Sub方式读取数据,然后将数据汇到redis。一开始,它在Dataflow中运行得很好。但是,在两天后运行后,在我的管道中发现了以下异常。
java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:97)
org.xerial.snappy.SnappyOutputStream.
我尝试在Google Cloud Dataflow中运行Apache光束管道(Python),这是由Google Cloud Coomposer中的DAG触发的。 我的dags文件夹在各自的GCS存储桶中的结构如下: /dags/
dataflow.py <- DAG
dataflow/
pipeline.py <- pipeline
setup.py
my_modules/
__init__.py
commons.py <- the module I want to import in the pipeline se
带命令的气流安装失败
sudo pip3 install apache-airflow[gcp_api]
昨天一切都很好。今天,我看到了以下错误:
Could not find a version that satisfies the requirement apache-beam[gcp]==2.3.0 (from google-cloud-dataflow->apache-airflow[gcp_api]) (from versions: 0.6.0, 2.0.0, 2.1.0, 2.1.1, 2.2.0)
No matching distribution found for apa
我想在运行数据流程序时在屏幕上打印信息或调试或异常消息。我可以在以"DirectRunner“作为runner运行管道时做到这一点。但是,当使用runner "DataflowRunner“运行时,相同的程序不会在数据流控制台上打印任何内容。这是代码,它非常基础。
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import
我有以下代码来运行直接运行在windows上的apache beam中的sql转换。
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
with beam.Pipeline() as p:
pipe = (
p
|'hello' >> beam.Create([('SE',400),('SC',500)])
|'schema' >> beam.
我正在使用GCP数据流运行Apache Beam管道,并从worker那里获得了以下错误:
Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException: Got poison pill or timeout but stream is not done
两分钟内就有了。
我正在使用管道将消息从PubSub写到BigQuery。在管道中,当将PubSub消息转换为TableRow时,我使用的是FailsafeElement<PubsubMessage, String>,并
我无法通过气流BeamRunPythonPipelineOperator运行python管道。下面是我的完整代码:
DAG文件
import os
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
from airflow
我有一个数据流作业,使用DataFlowRunner在本地运行时运行得很好,但当我尝试使用GCP的Composer/AirFlow运行它时,它会给我一个错误:
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for ConvertToYouTubeMetadata/ParDo(convertToTableRow$1)/ParMultiDo(convertToTableRow$1).output [PCollection]. Correct o
当使用数据流运行beam模型时,我得到了以下错误。
java.lang.IllegalArgumentException: Class interface org.apache.beam.sdk.options.PipelineOptions missing a property named 'output'.
at org.apache.beam.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1488)
at org.apache.beam.sdk.options
我正在尝试在GCP数据流管道中运行以下脚本。 import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from typing import NamedTuple, Optional
from apache_beam.io.gcp.spanner import *
from past.builtins import unicode
import logging
class ItemRow(NamedTuple):
item_id: unicode
cla
我试图在GCS Dataflow上上传/运行一个Apache项目,但我得到了
INFO:apache_beam.runners.dataflow.internal.apiclient:Defaulting to the temp_location as staging_location: gs://dataflow-staging-europe-west4-10a485e03dda20c80122afcef299fc02
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://datafl
我正在尝试运行Python中的示例。但是,这个堆栈跟踪的错误如下所示。注意:第一个管道确实创建了“./name”文件,但是第二个管道似乎无法从中读取。
No handlers could be found for logger "oauth2client.contrib.multistore_file"
Traceback (most recent call last):
File "example.py", line 17, in <module>
| 'save' >> beam.io.WriteToTex
我已经编写了一个Python数据流作业,用于从csv文件读取数据并使用该数据填充BigQuery表。但是,每当我运行此作业时,都会弹出一个错误。如果我删除write to Big Query部分,改为写入文件,则代码执行正常,并且表以dict格式写入输出文件。代码如下:
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.
为了提供我的问题的最小示例,我尝试实现一个简单的Beam作业,它将String作为一个侧输入,并将它应用到从Cloud中的csv文件中读取的PCollection中。然后将结果输出到云存储中的.txt文件中。
到目前为止,我已经尝试过:尝试使用PipelineResult.waitUntilFinish (如in (p.run().waitUntilFinish()) ),更改两个p.run()命令的位置,并尽可能地简化,只使用字符串作为我的侧输入,结果总是一样。在Stack和Google上的搜索使我找到了实现错误消息的梁回购上的PR。
SideInputTest.java:
public c
我正在尝试创建一个google数据流模板,但我似乎无法找到一种不产生以下异常的方法:
WARNING: Size estimation of the source failed: RuntimeValueProvider{propertyName=inputFile, default=null}
java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=inputFile,
我遇到了阿帕奇·梁博士的问题。在尝试运行容器时,我得到的是"No id provided."消息,仅此而已。下面是代码和文件:
Dockerfile
FROM apache/beam_python3.8_sdk:latest
RUN apt update
RUN apt install -y wget curl unzip git
COPY ./ /root/data_analysis/
WORKDIR /root/data_analysis
RUN python3 -m pip install -r data_analysis/beam/requirements.txt
EN
我试图运行一个管道,使用apache-束,源作为一个卡夫卡主题,目的地作为另一个卡夫卡主题。我已经编写了我的代码,并且运行良好(也就是说,在我认为的代码中没有错误)。但在输出主题中看不到数据--这是代码:
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.kafka import ReadFromKafka, W