如何创建一个使用字符串列表来迭代以下内容的函数。意图是a、b和c表示用户上传的表。无论用户上传多少个表,目标都是以编程方式迭代。我只是想拉取按表分解的新行的计数。
mylist = df.select('S_ID').distinct().rdd.flatMap(lambda x: x).collect()
mylist
>> ['a', 'b', 'c']
##Count new rows by S_ID type
a = df.filter(df.S_ID == 'a').count()
b = df.filter(df.S_ID == 'b').count()
c = df.filter(df.S_ID == 'c').count()
##Count current rows from Snowflake
a_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = 'a'").load()
a_current = a_current.count()
b_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = 'b'").load()
b_current = b_current.count()
c_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = 'c'").load()
c_current = c_current.count()
##Calculate count of new rows
a_new = a - a_current
a_new = str(a_new)
b_new = b - b_current
b_new = str(b_new)
c_new = c - c_current
c_new = str(c_new)如下所示:
new_counts_list = []
for i in mylist:
i = df.filter(df.S_ID == 'i').count()
i_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = 'i'").load()
i_current = i_current.count()
i_new = i - i_current
i_new = str(i_new)
new_counts_list.append(i)我坚持保留{names : new_counts}
发布于 2021-09-15 06:38:55
因为它涉及到:
我坚持保留{names : new_counts}
,在for循环的末尾,您可以使用
new_counts_list[i]=i_new而不是
new_counts_list.append(i)假设您更改了new_counts_list的初始化方式。即被初始化为字典(new_counts_list={})而不是列表。
您似乎还在硬编码文字值'i',它是一个字符串,而不是使用变量i (即)。在您提议的解决方案中没有引号。更新后的解决方案可能如下所示
new_counts_list={}
for i in mylist:
i = df.filter(df.S_ID == i).count()
i_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = '{0}'".format(i)).load()
i_current = i_current.count()
i_new = i - i_current
i_new = str(i_new)
new_counts_list[i]=i_new另一方面,虽然您的方法,即顺序循环通过我的列表中的每个S_ID并运行操作,即
运行操作collect以将驱动程序节点的所有S_ID从初始数据帧df拉到列表中,计算初始数据帧中S_ID的出现次数,然后使用spark.read.format(SNOWFLAKE_SOURCE_NAME)执行另一个可能代价高昂的数据帧(IO读取/网络通信/随机洗牌),该数据帧将在执行S_ID之前将每个过滤的所有记录加载到内存中初始数据帧和雪花源之间的差异
将工作,它是昂贵的IO读取和基于您的集群/设置,潜在的昂贵的网络通信和混洗。
您可以考虑使用groupby来减少执行可能代价高昂的collect的次数。此外,您还可以将初始数据帧加入到您的雪花源,并让spark将您的操作优化为分布在您的集群/设置中的懒惰执行计划。此外,与对snowflake源使用下推过滤器类似,您可以在该查询中组合所有选定的S_ID,以允许snowflake在一次读取中减少所有期望的结果。你不需要一个循环。这可能看起来像这样:
方法1
在这种方法中,我将提供一个纯粹的spark解决方案来实现您想要的结果
from pyspark.sql import functions as F
# Ask spark to select only the `S_ID` and group the data but not execute the transformation
my_exiting_counts_df = df.select('S_ID').groupBy('S_ID').count()
# Ask spark to select only the `S_ID` counts from the snowflake source
current_counts_df = (
spark.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(**sfOptions)
.option("query", "select R_ID, COUNT(1) as cnt FROM mytable GROUP BY R_ID")
)
# Join both datasets which will filter to only selected `S_ID`
# and determine the differences between the existing and current counts
results_df = (
my_exiting_counts_df.alias("existing")
.join(
current_counts_df.alias("current"),
F.col("S_ID")=F.col("R_ID"),
"inner"
)
.selectExpr(
"S_ID",
"count - cnt as actual_count"
)
)
# Execute the above transformations with `collect` and
# Convert the dictionary values in the list above to your desired final dictionary
new_counts = {}
for row in results_df.collect():
new_counts[row['S_ID']]=row['actual_count']
# your desired results are in `new_counts`方法2
在这种方法中,我将收集group by的结果,然后使用它优化雪花模式的下推查询,以返回所需的结果。
my_list_counts = df.select('S_ID').groupBy('S_ID').count()
selected_sids = []
case_expression = ""
for row in my_list_counts:
selected_sids.append(row['S_ID'])
case_expression = case_expression + " CASE WHEN R_ID='{0}' THEN {0} ".format(
row['S_ID'],
row['count']
)
# The above has a table with columns `S_ID` and `count` where the
# latter is the number of occurrences of `S_ID` in the dataset `df`
snowflake_push_down_query="""
SELECT
R_ID AS S_ID
((CASE
{0}
END) - cnt) as actual_count
FROM (
SELECT
R_ID,
COUNT(1) AS cnt
FROM
mytable
WHERE
R_ID IN ('{1}')
GROUP BY
R_ID
) t
""".format(
case_expression,
"','".join(selected_sids)
)
results_df = (
spark.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(**sfOptions)
.option("query", snowflake_push_down_query)
)
# Execute the above transformations with `collect` and
# Convert the dictionary values in the list above to your desired final dictionary
new_counts = {}
for row in results_df.collect():
new_counts[row['S_ID']]=row['actual_count']
# your desired results are in `new_counts`如果这对你有效,请告诉我。
https://stackoverflow.com/questions/69186022
复制相似问题