我正在研究中的一个项目,我需要从启动数据流模板提供的运行时参数中将bigquery命名为。
到目前为止,我没有运气,它要么为我提供了运行时参数的定义,要么为我提供了一个空字符串。
所以我基本上需要用这个来解决问题:
class CustomPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--path',
type=str,
help='csv storage path')
parser.add_value_provider_argument(
'--table_name',
type=str,
help='Table Id')
def run()
def rewrite_values(element):
""" Rewrite default env values"""
try:
logging.info("File Path with str(): {}".format(str(custom_options.path)))
logging.info("----------------------------")
logging.info("element: {}".format(element))
project_id = str(cloud_options.project)
file_path = custom_options.path.get()
table_name = custom_options.table_name.get()
logging.info("project: {}".format(project_id))
logging.info("File path: {}".format(file_path))
logging.info("language: {}".format(table_name))
logging.info("----------------------------")
except Exception as e:
logging.info("Error format----------------------------")
raise KeyError(e)
return file_path
pipeline_options = PipelineOptions()
cloud_options = pipeline_options.view_as(GoogleCloudOptions)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
# Beginning of the pipeline
p = beam.Pipeline(options=pipeline_options)
init_data = (p
| beam.Create(["Start"])
| beam.FlatMap(rewrite_values))
pipeline processing, running pipeline etc.
save_data_bigquery = (table_data | "Get all numbers" >> beam.ParDo(GetAllNumbers())
| 'Flat items' >> beam.FlatMap(flat_item)
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=project_id,
dataset="defined_dataset",
table=table_name, **********
schema="id:STRING",
batch_size=10000)
)在writetobigquery函数中命名表是我遇到麻烦的地方,我还尝试使用custom_options.table_name、将变量声明为全局变量等等。
我已经创建了一个自定义DoFn来写入BigQuery,尽管这将是我首选的方法。
发布于 2019-12-23 12:39:36
我尝试编写一个BQ_writer类,并在其中编写了实际的WriteToBigQuery。
class BQ_writer(beam.DoFn):
def __init__(self, schema, output):
self.output = output
self.schema = schema
def process(self, element):
schema_l = self.schema.get()
output_table_l = self.output.get()
logging.info('Writing to table and schema: {} {}'.format(output_table_l,schema_l))
beam.io.WriteToBigQuery(output_table_l,
schema=schema_l,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)然后在准备中:
| 'WriteToBigQuery' >> beam.ParDo(BQ_writer(useroptions.schema,useroptions.output))这样做很好,流程的构建是没有错误的。但是在大查询表中找不到数据。可能是我们不能在WriteToBigQuery函数中使用ParDo。欢迎从这里开始提出建议。
https://stackoverflow.com/questions/52265477
复制相似问题