我有一张巨大的雪花桌。我想在桌面上做一些转换。我的雪花表有一个名为“快照”的列。我只想读取pyspark中的当前快照数据,并对过滤后的数据进行转换。
那么,是否有一种方法可以应用于在中读取雪花表时对行进行过滤(我不想在内存中读取整个雪花表,因为它没有效率),还是需要读取整个雪花表(在中),然后应用筛选器获取以下内容的最新快照?
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
snowflake_database="********"
snowflake_schema="********"
source_table_name="********"
snowflake_options = {
"sfUrl": "********",
"sfUser": "********",
"sfPassword": "********",
"sfDatabase": snowflake_database,
"sfSchema": snowflake_schema,
"sfWarehouse": "COMPUTE_WH"
}
df = spark.read \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**snowflake_options) \
.option("dbtable",snowflake_database+"."+snowflake_schema+"."+source_table_name) \
.load()
df = df.where(df.snapshot == current_timestamp()).collect()发布于 2022-03-16 15:41:10
有一些类型的过滤器(、filter、或,其中的功能是DataFrame),而Spark不传递给火花雪花连接器。这意味着,在某些情况下,你可能会得到比你预期的更多的记录。
最安全的方法是直接使用SQL查询:
df = spark.read \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**snowflake_options) \
.option("query","SELECT X,Y,Z FROM TABLE1 WHERE SNAPSHOT==CURRENT_TIMESTAMP()") \
.load()当然,如果要使用Spark的filter/where功能,请检查雪花UI中的查询历史记录,以查看生成的查询是否应用了正确的筛选器。
https://stackoverflow.com/questions/71498611
复制相似问题