我在亚马逊上读了一篇关于展平json文件并上传到redshift中的文章。
我的计划是转换json文件并将其上传到s3中,然后再次将该文件爬入到数据目录的aws-glue中,然后在amazon redshift中将数据作为表上传。
现在,“示例3: Python代码转换嵌套的JSON并将其输出到ORC”中的代码问题显示了一些错误:
NameError:未定义名称“spark”
不是我迷路了,因为我是aws-glue的新手,我需要在redshift中上传json (它们是嵌套数组)。
下面是我的代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
#from awsglue.transforms import Relationalize
# Begin variables to customize with your information
glue_source_database = "DATABASE"
glue_source_table = "TABLE_NAME"
glue_temp_storage = "s3://XXXXX"
glue_relationalize_output_s3_path = "s3://XXXXX"
dfc_root_table_name = "root" #default value is "roottable"
# End variables to customize with your information
glueContext = GlueContext(spark.sparkContext)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = glue_source_table, transformation_ctx = "datasource0")
dfc = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "dfc")
blogdata = dfc.select(dfc_root_table_name)
blogdataoutput = glueContext.write_dynamic_frame.from_options(frame = blogdata, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path}, format = "orc", transformation_ctx = "blogdataoutput")
发布于 2018-05-24 15:07:52
您错误地创建了GlueContext
。您的代码应该如下所示
from pyspark.context import SparkContext
glueContext = GlueContext(SparkContext.getOrCreate())
发布于 2018-07-26 13:25:25
@beni
跟你一样,我也遇到了同样的问题,正确的spark context会导致编写glueContext.write_dynamic_frame.from_options.的另一个问题
检查日志时,我看到了一个空值错误。所以添加DropNullFields.apply解决了这个问题
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# Begin variables to customize with your information
glue_source_database = "database_name"
glue_source_table = "table_name"
glue_temp_storage = "s3://bucket/tmp"
glue_relationalize_output_s3_path = "s3://bucket/output"
dfc_root_table_name = "root" # default value is "roottable"
# End variables to customize with your information
sc = SparkContext()
glueContext = GlueContext(sc)
spark = GlueContext(sc).spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database=glue_source_database, table_name=glue_source_table,
transformation_ctx="datasource0")
dfc = Relationalize.apply(frame=datasource0, staging_path=glue_temp_storage, name=dfc_root_table_name,
transformation_ctx="dfc")
fb_data = dfc.select(dfc_root_table_name)
dropnullfields3 = DropNullFields.apply(frame=fb_data, transformation_ctx="dropnullfields3")
fb_dataoutput = glueContext.write_dynamic_frame.from_options(frame=dropnullfields3, connection_type="s3",
connection_options={
"path": glue_relationalize_output_s3_path},
format="orc", transformation_ctx="fb_dataoutput")
job.commit()
希望这对你有所帮助!
https://stackoverflow.com/questions/50502777
复制相似问题