我用pyspark编写了下面的代码。在代码中,我创建了3个子查询视图。然后,我将这3个视图连接在一起,形成一个最终的pyspark-sql视图。我发现,如果我使用我在前面代码中创建的daily_sku_t视图运行pyspark-sql,它会抛出一个非常长的错误,我已经提供了下面的第一行。相反,如果我首先将daily_sku_t视图写出到parquet,然后读回它,代码就会运行而不会出现错误。问题是它使整个代码需要更长的时间来写出和读回。有没有一种方法可以解决这个问题,而不必执行写出操作?我尝试使用不同的名称创建视图的另一个副本,但也抛出了一个错误。
示例:
daily_sku_t.write.mode("overwrite")\
.parquet('s3://stuff/copy_daily_sku_t/')
copy_daily_sku_t=sqlContext.read.parquet('s3://stuff/copy_daily_sku_t/')
copy_daily_sku_t.createOrReplaceTempView("daily_sku_t")代码:
# product_repeat
product_repeat_product_cycle_days_query="""select
dateclosed, s.product_id, s.tz_brandname,
datediff(dateclosed,
lag(dateclosed, 1) over (partition by s.tz_brandname, customer_uuid, s.product_id
ORDER BY
dateclosed ASC, ticketid )) as product_cycle_days
from
(select distinct dateclosed, product_id, tz_brandname, customer_uuid, ticketid
from daily_sku_t
where (customer_uuid is not null) and (trim(customer_uuid) !='')
and (product_id is not null) and (trim(product_id) !='')
-- and (tz_brandname is not null) and (trim(tz_brandname) !='')
and (dateclosed is not null) and (trim(dateclosed) !='')
) s
"""
product_repeat_product_cycle_days=spark.sql(product_repeat_product_cycle_days_query)
product_repeat_product_cycle_days.createOrReplaceTempView("product_repeat_product_cycle_days")
product_cycle_days_sub_query="""select
dateclosed, storeid, tz_brandname, producttype, productsubtype, size, product_id,
floor(avg(product_cycle_days)) as product_cycle_days
from
(
select
t.dateclosed, t.product_id, t.storeid, t.producttype, t.productsubtype, t.size, t.tz_brandname,
a.product_cycle_days
from daily_sku_t t
left join product_repeat_product_cycle_days a
on trim(a.product_id)=trim(t.product_id)
and trim(a.dateclosed)=trim(t.dateclosed)
and trim(a.tz_brandname)=trim(t.tz_brandname)
where (t.customer_uuid is not null) and (trim(t.customer_uuid) !='')
and (t.product_id is not null) and (trim(t.product_id) !='')
and (t.dateclosed is not null) and (trim(t.dateclosed) !='')
)
group by
dateclosed, tz_brandname, producttype, productsubtype, size, storeid, product_id"""
product_cycle_days_sub=spark.sql(product_cycle_days_sub_query)
product_cycle_days_sub.createOrReplaceTempView("product_cycle_days_sub")
product_repeat_gross_query="""select
dateclosed, storeid, tz_brandname, producttype, productsubtype, size, product_id,
sum(product_repeat_gross) + (rand() / 10000) as product_repeat_gross
from
(
select
t.dateclosed, t.product_id, t.storeid, t.producttype, t.productsubtype, t.size, t.tz_brandname,
case
when ticketid = first_value(ticketid) over (partition by t.product_id, t.customer_uuid
ORDER BY
t.dateclosed ASC,ticketid rows between unbounded preceding and unbounded following) then 0
else grossreceipts
end as product_repeat_gross
from daily_sku_t t
where (t.customer_uuid is not null) and (trim(t.customer_uuid) !='')
and (t.product_id is not null) and (trim(t.product_id) !='')
and (t.dateclosed is not null) and (trim(t.dateclosed) !='')
)
group by
dateclosed, tz_brandname, producttype, productsubtype, size, storeid, product_id"""
product_repeat_gross=spark.sql(product_repeat_gross_query)
product_repeat_gross.createOrReplaceTempView("product_repeat_gross")
product_repeat_query="""select a.dateclosed,
a.storeid,
a.tz_brandname,
a.producttype,
a.productsubtype,
a.size,
a.product_id,
b.product_cycle_days,
c.product_repeat_gross
from (select distinct dateclosed, tz_brandname, producttype, productsubtype, size, storeid, product_id from daily_sku_t) a
left join product_repeat_gross c
on trim(a.dateclosed)=trim(c.dateclosed)
and trim(a.storeid)=trim(c.storeid)
and trim(a.tz_brandname)=trim(c.tz_brandname)
and trim(a.producttype)=trim(c.producttype)
and trim(a.productsubtype)=trim(c.productsubtype)
and trim(a.size)=trim(c.size)
and trim(a.product_id)=trim(c.product_id)
left join product_cycle_days_sub b
on trim(a.dateclosed)=trim(b.dateclosed)
and trim(a.storeid)=trim(b.storeid)
and trim(a.tz_brandname)=trim(b.tz_brandname)
and trim(a.producttype)=trim(b.producttype)
and trim(a.productsubtype)=trim(b.productsubtype)
and trim(a.size)=trim(b.size)
and trim(a.product_id)=trim(b.product_id)
"""
product_repeat=spark.sql(product_repeat_query)
product_repeat.createOrReplaceTempView("product_repeat")错误之首:
An error was encountered:
'Resolved attribute(s) _w5#7888 missing from discounts#7149,_w5# ....发布于 2021-05-21 13:21:03
在单个查询中,您可以通过使用'with‘子句来实现它。它看起来会像下面这样。
spark.sql("""
with tableA as select ..., --tableA subquery goes here
tableB as select ..., --tableB subquery goes here
tableC as select --tableC subquery goes here
//refer the temp tables created above in the following query
select * from tableD join tableA join tableB....""")https://stackoverflow.com/questions/67629488
复制相似问题