首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在pyspark sql的join中重复使用相同的数据框视图

在pyspark sql的join中重复使用相同的数据框视图
EN

Stack Overflow用户
提问于 2021-05-21 08:31:58
回答 1查看 78关注 0票数 0

我用pyspark编写了下面的代码。在代码中,我创建了3个子查询视图。然后,我将这3个视图连接在一起,形成一个最终的pyspark-sql视图。我发现,如果我使用我在前面代码中创建的daily_sku_t视图运行pyspark-sql,它会抛出一个非常长的错误,我已经提供了下面的第一行。相反,如果我首先将daily_sku_t视图写出到parquet,然后读回它,代码就会运行而不会出现错误。问题是它使整个代码需要更长的时间来写出和读回。有没有一种方法可以解决这个问题,而不必执行写出操作?我尝试使用不同的名称创建视图的另一个副本,但也抛出了一个错误。

示例:

代码语言:javascript
运行
复制
daily_sku_t.write.mode("overwrite")\
.parquet('s3://stuff/copy_daily_sku_t/')

copy_daily_sku_t=sqlContext.read.parquet('s3://stuff/copy_daily_sku_t/')

copy_daily_sku_t.createOrReplaceTempView("daily_sku_t")

代码:

代码语言:javascript
运行
复制
# product_repeat

product_repeat_product_cycle_days_query="""select
            dateclosed, s.product_id, s.tz_brandname,
             datediff(dateclosed,
            lag(dateclosed, 1) over (partition by s.tz_brandname, customer_uuid, s.product_id
        ORDER BY
            dateclosed ASC, ticketid )) as product_cycle_days
        from
            (select distinct dateclosed, product_id, tz_brandname, customer_uuid, ticketid 
            from daily_sku_t
            where (customer_uuid is not null) and (trim(customer_uuid) !='')
            and (product_id is not null) and (trim(product_id) !='')
            -- and (tz_brandname is not null) and (trim(tz_brandname) !='')
            and (dateclosed is not null) and (trim(dateclosed) !='')
            ) s 
    
        """

product_repeat_product_cycle_days=spark.sql(product_repeat_product_cycle_days_query)

product_repeat_product_cycle_days.createOrReplaceTempView("product_repeat_product_cycle_days")

product_cycle_days_sub_query="""select
        dateclosed, storeid, tz_brandname, producttype, productsubtype, size, product_id, 
        floor(avg(product_cycle_days)) as product_cycle_days
    from
        (
        select
            t.dateclosed, t.product_id, t.storeid, t.producttype, t.productsubtype, t.size, t.tz_brandname,
            a.product_cycle_days
        from daily_sku_t t
        left join product_repeat_product_cycle_days a
        on trim(a.product_id)=trim(t.product_id)
        and trim(a.dateclosed)=trim(t.dateclosed)
        and trim(a.tz_brandname)=trim(t.tz_brandname)
        where (t.customer_uuid is not null) and (trim(t.customer_uuid) !='')
            and (t.product_id is not null) and (trim(t.product_id) !='')
            and (t.dateclosed is not null) and (trim(t.dateclosed) !='')
        
        )
    group by
        dateclosed, tz_brandname, producttype, productsubtype, size, storeid, product_id"""

product_cycle_days_sub=spark.sql(product_cycle_days_sub_query)

product_cycle_days_sub.createOrReplaceTempView("product_cycle_days_sub")


product_repeat_gross_query="""select
        dateclosed, storeid, tz_brandname, producttype, productsubtype, size, product_id,
        sum(product_repeat_gross) + (rand() / 10000) as product_repeat_gross
    from
        (
        select
            t.dateclosed, t.product_id, t.storeid, t.producttype, t.productsubtype, t.size, t.tz_brandname,
            case
                when ticketid = first_value(ticketid) over (partition by t.product_id, t.customer_uuid
            ORDER BY
                t.dateclosed ASC,ticketid rows between unbounded preceding and unbounded following) then 0
                else grossreceipts
            end as product_repeat_gross
        from daily_sku_t t
        where (t.customer_uuid is not null) and (trim(t.customer_uuid) !='')
            and (t.product_id is not null) and (trim(t.product_id) !='')
            and (t.dateclosed is not null) and (trim(t.dateclosed) !='')
        
        )
    group by
        dateclosed, tz_brandname, producttype, productsubtype, size, storeid, product_id"""

product_repeat_gross=spark.sql(product_repeat_gross_query)

product_repeat_gross.createOrReplaceTempView("product_repeat_gross")



product_repeat_query="""select a.dateclosed, 
a.storeid, 
a.tz_brandname, 
a.producttype, 
a.productsubtype, 
a.size, 
a.product_id,
b.product_cycle_days,
c.product_repeat_gross
from (select distinct dateclosed, tz_brandname, producttype, productsubtype, size, storeid, product_id from daily_sku_t) a
left join product_repeat_gross c
on trim(a.dateclosed)=trim(c.dateclosed)
and trim(a.storeid)=trim(c.storeid) 
and trim(a.tz_brandname)=trim(c.tz_brandname) 
and trim(a.producttype)=trim(c.producttype) 
and trim(a.productsubtype)=trim(c.productsubtype) 
and trim(a.size)=trim(c.size) 
and trim(a.product_id)=trim(c.product_id)
left join product_cycle_days_sub b
on trim(a.dateclosed)=trim(b.dateclosed) 
and trim(a.storeid)=trim(b.storeid) 
and trim(a.tz_brandname)=trim(b.tz_brandname) 
and trim(a.producttype)=trim(b.producttype) 
and trim(a.productsubtype)=trim(b.productsubtype) 
and trim(a.size)=trim(b.size) 
and trim(a.product_id)=trim(b.product_id)


"""

product_repeat=spark.sql(product_repeat_query)

product_repeat.createOrReplaceTempView("product_repeat")

错误之首:

代码语言:javascript
运行
复制
An error was encountered:
'Resolved attribute(s) _w5#7888 missing from discounts#7149,_w5# ....
EN

回答 1

Stack Overflow用户

发布于 2021-05-21 13:21:03

在单个查询中,您可以通过使用'with‘子句来实现它。它看起来会像下面这样。

代码语言:javascript
运行
复制
spark.sql("""
with tableA as select ..., --tableA subquery goes here
tableB as select ..., --tableB subquery goes here
tableC as select --tableC subquery goes here
//refer the temp tables created above in the following query
select * from tableD join tableA join tableB....""")
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67629488

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档