首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Dataproc:使用BigQuery读写数据时使用PySpark时的错误

Dataproc:使用BigQuery读写数据时使用PySpark时的错误
EN

Stack Overflow用户
提问于 2022-08-09 17:25:37
回答 1查看 713关注 0票数 1

我正在尝试从用户管理的朱庇特笔记本实例中读取一些BigQuery数据(ID:my-project.mydatabase.mytable原始名称受保护),在Dataproc工作台中。我尝试的是中的灵感,更具体地说,代码是(请阅读一些关于代码本身的附加注释):

代码语言:javascript
复制
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, ArrayType, StringType
from google.cloud import bigquery

# UPDATE (2022-08-10): BQ conector added
spark = SparkSession.builder.appName('SpacyOverPySpark') \
                    .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.2') \
                    .getOrCreate()

# ------------------ IMPORTING DATA FROM BIG QUERY --------------------------

# UPDATE (2022-08-10): This line now runs...
df = spark.read.format('bigquery').option('table', 'my-project.mydatabase.mytable').load()

# But imports the whole table, which could become expensive and not optimal
print("DataFrame shape: ", (df.count(), len(df.columns)) # 109M records & 9 columns; just need 1M records and one column: "posting"

# I tried the following, BUT with NO success:
# sql = """
# SELECT `posting`
# FROM `mentor-pilot-project.indeed.indeed-data-clean`
# LIMIT 1000000
# """
# df = spark.read.format("bigquery").load(sql)
# print("DataFrame shape: ", (df.count(), len(df.columns)))

# ------- CONTINGENCY PLAN: IMPORTING DATA FROM CLOUD STORAGE ---------------

# This section WORKS (just to enable the following sections)
# HINT: This dataframe contains 1M rows of text, under a single column: "posting"
df = spark.read.csv("gs://hidden_bucket/1M_samples.csv", header=True)

# ---------------------- EXAMPLE CUSTOM PROCESSING --------------------------

# Example Python UDF Python
def split_text(text:str) -> list:
    return text.split()

# Turning Python UDF into Spark UDF
textsplitUDF = udf(lambda z: split_text(z), ArrayType(StringType()))

# "Applying" a UDF on a Spark Dataframe (THIS WORKS OK)
df.withColumn("posting_split", textsplitUDF(col("posting")))

# ------------------ EXPORTING DATA TO BIG QUERY ----------------------------

# UPDATE (2022-08-10) The code causing the error:

# df.write.format('bigquery') \
#   .option('table', 'wordcount_dataset.wordcount_output') \
#   .save()

# has been replace by a code that successfully stores data in BQ:

df.write \
  .format('bigquery') \
  .option("temporaryGcsBucket", "my_temp_bucket_name") \
  .mode("overwrite") \
  .save("my-project.mynewdatabase.mytable")

使用SQL查询从BigQuery读取数据时,触发的错误如下:

代码语言:javascript
复制
Py4JJavaError: An error occurred while calling o195.load.
: com.google.cloud.spark.bigquery.repackaged.com.google.inject.ProvisionException: Unable to provision, see the following errors:

1) Error in custom provider, java.lang.IllegalArgumentException: 'dataset' not parsed or provided.
  at com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule.provideSparkBigQueryConfig(SparkBigQueryConnectorModule.java:65)
  while locating com.google.cloud.spark.bigquery.SparkBigQueryConfig

1 error
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalProvisionException.toProvisionException(InternalProvisionException.java:226)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InjectorImpl$1.get(InjectorImpl.java:1097)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InjectorImpl.getInstance(InjectorImpl.java:1131)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelationInternal(BigQueryRelationProvider.scala:75)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:46)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:332)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:242)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:197)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: 'dataset' not parsed or provided.
    at com.google.cloud.bigquery.connector.common.BigQueryUtil.lambda$parseTableId$2(BigQueryUtil.java:153)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at com.google.cloud.bigquery.connector.common.BigQueryUtil.parseTableId(BigQueryUtil.java:153)
    at com.google.cloud.spark.bigquery.SparkBigQueryConfig.from(SparkBigQueryConfig.java:237)
    at com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule.provideSparkBigQueryConfig(SparkBigQueryConnectorModule.java:67)
    at com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule$$FastClassByGuice$$db983008.invoke(<generated>)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.ProviderMethod$FastClassProviderMethod.doProvision(ProviderMethod.java:264)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.ProviderMethod.doProvision(ProviderMethod.java:173)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalProviderInstanceBindingImpl$CyclicFactory.provision(InternalProviderInstanceBindingImpl.java:185)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalProviderInstanceBindingImpl$CyclicFactory.get(InternalProviderInstanceBindingImpl.java:162)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InjectorImpl$1.get(InjectorImpl.java:1094)
    ... 18 more

当将数据写入BigQuery时,错误是:

代码语言:javascript
复制
Py4JJavaError: An error occurred while calling o167.save.
: java.lang.ClassNotFoundException: Failed to find data source: bigquery. Please find packages at http://spark.apache.org/third-party-projects.html

更新: (2022-09-10)向BigQuery写入数据时的错误已经解决,请参阅上面的代码以及下面的注释部分。

我做错了什么?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-08-11 04:06:36

在讨论中发现的要点:

  1. 将BigQuery连接器作为依赖项通过spark.jars=<gcs-uri>spark.jars.packages=com.google.cloud.spark:spark-bigquery-with-dependencies_<scala-version>:<version>添加。
  2. <project>.<dataset>.<table>格式指定正确的表名。
  3. 数据写入器的默认模式是errorifexists。当写入不存在的表时,数据集必须存在,表将自动创建.当写入现有表时,需要将模式设置为"append""overwrite"df.write.mode(<mode>)...save()中。
  4. 当写信到烧烤桌时,要做以下任何一件 ( a)直接写入(自0.26.0以来支持) df.write \ .format("bigquery") \ .option("writeMethod",“直接”)\ .save("dataset.table") ( b)或间接写入 df.write \ .format("bigquery") \ .option("temporaryGcsBucket",“some”)\ .save("dataset.table") 看这个文档
  5. 当通过SQL查询读取BigQuery时,添加强制属性viewsEnabled=truematerializationDataset=<dataset>: spark.conf.set("viewsEnabled","true") spark.conf.set("materializationDataset","") sql =“选择”标记,COUNT(*) c从bigquery-public-data.stackoverflow.posts\_questions abigquery-public-data.stackoverflow.posts\_questionsa WHERE ( creation_date)>=2014提取的标记中选择拆分(标签),UNNEST(标记)标签组按2 DESC限制10“”df = spark.read.format("bigquery").load(sql) df.show()按1顺序排列 看这个文档
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73295856

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档