首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >是否有方法将avro模式与Bigquery和Bigtable匹配?

是否有方法将avro模式与Bigquery和Bigtable匹配?
EN

Stack Overflow用户
提问于 2021-08-05 10:08:53
回答 2查看 490关注 0票数 1

我想使用将bigquery数据导入到bigtable。

将Avro格式的bigquery行导出到GCS是成功的。但是,未将Avro数据导入Bigtable。

错误说

Caused by: org.apache.avro.AvroTypeException: Found Root, expecting com.google.cloud.teleport.bigtable.BigtableRow, missing required field key

我想bigquery和bigtable之间的模式应该是匹配的。但我不知道该怎么做。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-08-09 10:30:17

对于从Avro文件中读取的每一条记录:

显示在文件和表中的table.

  • Attributes中的
  • 属性被加载到文件中而不是表中的ignore_unknown_fields中,如果存在于表中但不在文件中的
  • 属性将使用它们的默认值(如果有一组.

)。

下面的链接很有帮助。

1 2

3

票数 2
EN

Stack Overflow用户

发布于 2022-08-13 10:19:29

对于那些仍然像我一样有问题的人,因为他们不熟悉avro,下面是我经过一些修改后发现的一个工作模式转换。

例如,如果您有这样的bigquery表

您希望使用user_id作为bigtable row_key,并接收所有列,下面是将它们编码为avro文件的示例代码。

代码语言:javascript
运行
复制
from avro.schema import Parse
from avro.io import DatumWriter
from avro.datafile import DataFileWriter

bigtable_schema = {
    "name" : "BigtableRow",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "key", "type" : "bytes"},
      { "name" : "cells",
        "type" : {
          "type" : "array",
          "items": {
            "name": "BigtableCell",
            "type": "record",
            "fields": [
              { "name" : "family", "type" : "string"},
              { "name" : "qualifier", "type" : "bytes"},
              { "name" : "timestamp", "type" : "long", "logicalType" : "timestamp-micros"},
              { "name" : "value", "type" : "bytes"}
            ]
          }
        }
      }
   ]
}

parsed_schema = Parse(json.dumps(bigtable_schema))

row_key = 'user_id'
family_name = 'feature_name'
feature_list = ['channel', 'zip_code', 'history']

with open('features.avro', 'wb') as f:

    writer = DataFileWriter(f, DatumWriter(), parsed_schema)

    for item in df.iterrows():

        row = item[1]
        ts =  int(datetime.now().timestamp()) * 1000 * 1000

        for feat in feature_list:

            writer.append({
                "key": row[row_key].encode('utf-8'),
                "cells": [{"family": family_name, 
                           "qualifier": feat.encode('utf-8'), 
                           "timestamp": ts, 
                           "value": str(row[feat]).encode('utf-8')}]
            })
        
    writer.close()

然后,您可以使用数据流模板作业来运行摄入。

完整的代码可以在这里找到:https://github.com/mitbal/sidu/blob/master/bigquery_to_bigtable.ipynb

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68664620

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档