我有两个文件,并在apache中使用CoGroupByKey进行内部连接。当我向bigquery写入行时,iy会给出以下错误。
RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_614_c4a563c648634e9dbbf7be3a56578b6d_2f196decc8984a0d83dee92e19054ffb failed. Error Result: <ErrorProto
location: 'gs://dataflow4bigquery/temp/bq_load/06bfafaa9dbb47338ad4f3a9914279fe/dotted-transit-351803.test_dataflow.inner_join/f714c1ac-c234-4a37-bf51-c725a969347a'
message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.'
reason: 'invalid'> [while running 'WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs']
from apache_beam.io.gcp.internal.clients import bigquery
import apache_beam as beam
def retTuple(element):
thisTuple=element.split(',')
return (thisTuple[0],thisTuple[1:])
def jstr(cstr):
import datetime
left_dict=cstr[1]['dep_data']
right_dict=cstr[1]['loc_data']
for i in left_dict:
for j in right_dict:
id,name,rank,dept,dob,loc,city=([cstr[0]]+i+j)
json_str={ "id":id,"name":name,"rank":rank,"dept":dept,"dob":datetime.datetime.strptime(dob, "%d-%m-%Y").strftime("%Y-%m-%d").strip("'"),"loc":loc,"city":city }
return json_str
table_spec = 'dotted-transit-351803:test_dataflow.inner_join'
table_schema = 'id:INTEGER,name:STRING,rank:INTEGER,dept:STRING,dob:STRING,loc:INTEGER,city:STRING'
gcs='gs://dataflow4bigquery/temp/'
p1 = beam.Pipeline()
# Apply a ParDo to the PCollection "words" to compute lengths for each word.
dep_rows = (
p1
| "Reading File 1" >> beam.io.ReadFromText('dept_data.txt')
| 'Pair each employee with key' >> beam.Map(retTuple) # {149633CM : [Marco,10,Accounts,1-01-2019]}
)
loc_rows = (
p1
| "Reading File 2" >> beam.io.ReadFromText('location.txt')
| 'Pair each loc with key' >> beam.Map(retTuple) # {149633CM : [9876843261,New York]}
)
results = ({'dep_data': dep_rows, 'loc_data': loc_rows}
| beam.CoGroupByKey()
| beam.Map(jstr)
| beam.io.WriteToBigQuery(
custom_gcs_temp_location=gcs,
table=table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
additional_bq_parameters={'timePartitioning': {'type': 'DAY'}}
)
)
p1.run().wait_until_finish()
我正在使用数据流运行在gcp上运行它。打印json_str字符串时,输出是有效的json。{'id':‘149633 01’,'name':'Marco',‘军衔’:'10','dept':'Accounts',‘道布’:'2019-01-31','loc':'9204232778',‘city’{‘纽约’} {'id':‘212539 01’,'name':'Rebekah',‘放'10',’10‘,’部门‘:’帐户‘,‘'dob':'2019-01-31','loc':'9995440673',’城市‘:’丹佛‘}
我定义的模式也是正确的。但是,当将其加载到bigquery时,会得到该错误。
发布于 2022-06-05 11:11:44
经过一番思考,我终于解决了这个问题。这是一个架构错误。Id列值类似于149633CM,我已经将Id的数据类型作为整数,但是当我尝试用bq和模式作为自动检测加载json时,bq标记了Id的数据类型为STRING.。
之后,我在代码中将Id列的数据类型更改为字符串。并创建并加载它的worked.The表。
但是,如果在Id列中启动6个字符是数字,为什么整数不工作,而字符串工作呢?。
发布于 2022-10-21 10:54:58
因为数据类型是对整个字段值进行解析的,而不仅仅是前6个字符。如果删除最后两个字符,则可以将整数
https://stackoverflow.com/questions/72499053
复制相似问题