我使用下面的脚本从bigquery表中进行选择,然后循环查询作业结果行,然后一次写入csv文件一行。
我的问题是,对于某些查询作业,写入的行总数与预期的行数相匹配,但有些行重复,而另一些行则丢失。例如,查询本身可能返回25k行,而不返回重复项。但是,与查询结果相比,在编写的25k行中,有15个记录重复,15个记录缺失。
在压缩之前,有一个文件超过3Gb,但对于其他查询(选择相同的列,但选择不同的visitStartTime范围),文件将更大,但没有重复问题。对于某些查询作业,有些记录是一式两份,而另一些则根本不写,有什么原因吗?
from google.cloud import bigquery
from google.oauth2 import service_account
import csv
query_string = """select c1,c2,c3,c4,c5
from `mydb.mydataset.mytable_20211212`
where visitStartTime >= 1639350000.0 AND
visitStartTime < 1639353600.0"""
credentials = service_account.Credentials.from_service_account_file(key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],)
client = bigquery.Client(credentials=credentials, project=credentials.project_id, )
query_job = client.query(query_string)
with open('myfilename.csv', 'w', newline='', encoding="utf-8") as csvfile:
writer = csv.writer(csvfile, quoting=csv.QUOTE_NONE, delimiter='|', quotechar='', escapechar='\\')
# write header row
writer.writerow(["c1","c2","c3","c4","c5"])
# write data rows
for row in query_job:
writer.writerow([row.c1, row.c2, row.c3, row.c4, row.c5])
发布于 2021-12-17 11:02:34
根据我在你问题中的理解,如果你有15份重复的记录,你会有另外15份丢失(同样的号码)。如果CSV结果与查询不同,即使CSV是确切的查询,那么问题必须在CSV编写过程中解决。我建议你看一下导出数据文档,看看给出的例子。由于您正在使用的大小,我首先尝试提取压缩表到一个桶:
from google.cloud import bigquery
client = bigquery.Client()
bucket_name = '[BUCKET_NAME]'
destination_uri = "gs://{}/{}".format(bucket_name, "[TABLE_NAME].csv.gz")
dataset_ref = bigquery.DatasetReference(project, dataset_id)
table_ref = dataset_ref.table("[TABLE_NAME]")
job_config = bigquery.job.ExtractJobConfig()
job_config.compression = bigquery.Compression.GZIP
extract_job = client.extract_table(
table_ref,
destination_uri,
location="[LOCATION]",
job_config=job_config
)
extract_job.result()
如果这不起作用,我会尝试导出多个文件中的数据。
如果其中任何一个都能按您的要求工作,那么最可能的问题是query_string
是如何编写的。您考虑过使用DISTINCT
或设置更多的过滤器吗?
此外,问题还可能是在导出的数据。看到导出的“错误数据”,您是否尝试过将visitStartTime
设置为只导出“错误的数据”?
https://stackoverflow.com/questions/70382939
复制相似问题