我想使用将bigquery数据导入到bigtable。
将Avro格式的bigquery行导出到GCS是成功的。但是,未将Avro数据导入Bigtable。
错误说
Caused by: org.apache.avro.AvroTypeException: Found Root, expecting com.google.cloud.teleport.bigtable.BigtableRow, missing required field key
我想bigquery和bigtable之间的模式应该是匹配的。但我不知道该怎么做。
发布于 2021-08-09 10:30:17
发布于 2022-08-13 10:19:29
对于那些仍然像我一样有问题的人,因为他们不熟悉avro,下面是我经过一些修改后发现的一个工作模式转换。
例如,如果您有这样的bigquery表

您希望使用user_id作为bigtable row_key,并接收所有列,下面是将它们编码为avro文件的示例代码。
from avro.schema import Parse
from avro.io import DatumWriter
from avro.datafile import DataFileWriter
bigtable_schema = {
"name" : "BigtableRow",
"type" : "record",
"namespace" : "com.google.cloud.teleport.bigtable",
"fields" : [
{ "name" : "key", "type" : "bytes"},
{ "name" : "cells",
"type" : {
"type" : "array",
"items": {
"name": "BigtableCell",
"type": "record",
"fields": [
{ "name" : "family", "type" : "string"},
{ "name" : "qualifier", "type" : "bytes"},
{ "name" : "timestamp", "type" : "long", "logicalType" : "timestamp-micros"},
{ "name" : "value", "type" : "bytes"}
]
}
}
}
]
}
parsed_schema = Parse(json.dumps(bigtable_schema))
row_key = 'user_id'
family_name = 'feature_name'
feature_list = ['channel', 'zip_code', 'history']
with open('features.avro', 'wb') as f:
writer = DataFileWriter(f, DatumWriter(), parsed_schema)
for item in df.iterrows():
row = item[1]
ts = int(datetime.now().timestamp()) * 1000 * 1000
for feat in feature_list:
writer.append({
"key": row[row_key].encode('utf-8'),
"cells": [{"family": family_name,
"qualifier": feat.encode('utf-8'),
"timestamp": ts,
"value": str(row[feat]).encode('utf-8')}]
})
writer.close()然后,您可以使用数据流模板作业来运行摄入。
完整的代码可以在这里找到:https://github.com/mitbal/sidu/blob/master/bigquery_to_bigtable.ipynb
https://stackoverflow.com/questions/68664620
复制相似问题