例如,我使用的是tidyquant数据集。
install.packages('tidyquant')
library(tidyquant)
library(data.table)
options("getSymbols.warning4.0"=FALSE)
options("getSymbols.yahoo.warning"=FALSE)
# Downloading Apple price using quantmod
first.date <- Sys.Date() - 30
last.date <- Sys.Date()
getSymbols("AAPL", from = first.date,
to = last.date,warnings = FALSE,
auto.assign = TRUE)
AAPL<-data.frame(AAPL)
AAPL<- setDT(AAPL, keep.rownames = TRUE)[]
colnames(AAPL)[1] <- "DATE"
AAPL$RUNDATE<-AAPL$DATE
head(AAPL)
DATE AAPL.Open AAPL.High AAPL.Low AAPL.Close AAPL.Volume AAPL.Adjusted RUNDATE
1: 2021-03-19 119.90 121.43 119.68 119.99 185023200 119.99 2021-03-19
2: 2021-03-22 120.33 123.87 120.26 123.39 111912300 123.39 2021-03-22
3: 2021-03-23 123.33 124.24 122.14 122.54 95467100 122.54 2021-03-23
4: 2021-03-24 122.82 122.90 120.07 120.09 88530500 120.09 2021-03-24
5: 2021-03-25 119.54 121.66 119.00 120.59 98844700 120.59 2021-03-25
6: 2021-03-26 120.35 121.48 118.92 121.21 93958900 121.21 2021-03-26下面是我的目标,我已经知道如何使用下面的命令使用R从Databricks上传这个数据集到雪花中:
sparkr.sdf <- SparkR::createDataFrame(AAPL)
SparkR::write.df(
df = sparkr.sdf,
source = "snowflake",
mode = "overwrite",
sfUrl = "<snowflake-url>",
sfUser = user,
sfPassword = password,
sfDatabase = "<snowflake-database>",
sfSchema = "<snowflake-schema>",
sfWarehouse = "<snowflake-cluster>",
dbtable = "AAPL_table"
)然而,我想每天运行这个,上传最新的日期的拉(最后30天),附加和覆盖最后30天从RunDate专栏。
我的想法是使用上面提到的相同的查询,但将rundate列更改为今天的日期。AAPL$RUNDATE<-Sys.Date()日报。这样的话,我可以从这个新的AAPL$RUNDATE中减去30天,并且只在AAPL$RUNDATE 30天的时间范围内继续前进。
我看到了这段代码,但我没有看到设置回溯限制的选项
DBI::dbSendQuery(snowflake.conn,"use schema schemaname") # strange this is required
dbWriteTable(snowflake.conn, 'tablename', df, append = T, overwrite = F, verbose = T) 或者这个来源:
# Write table
sparklyr.sdf <- copy_to(sc, iris, overwrite = TRUE)
sparklyr.sdf %>%
spark_write_source(
sc = sc,
mode = "overwrite",
source = "snowflake",
options = sf_options
)我的想法是,在最后30天的数据中,我每天运行这个数据拉,而不是完整的时间段,并附加到现有的数据集中。
以前有人做过类似的事情吗?我真的很感激你能在这方面提供帮助,如果你需要我做些什么,请告诉我。
发布于 2021-04-19 13:56:28
如果您运行代码以获取新数据并将其存储在名为AAPL的变量中,那么
AAPL$RUNDATE <- Sys.Date()
DBI::dbWriteTable(conn, AAPL, append = TRUE)从这里开始,我认为你有两个选择:
DATEDIFF和CURRENT_DATE的示例组成。)
作为另一种选择,如果您今天不运行清除,您可能更喜欢使用表中的最大观察到的RUNDATE,而不是今天的日期,在这种情况下,您可以使用
maxdate <- DBI::dbGetQuery(conn,"select max(RUNDATE) as maxdate AAPL_table")$maxdate if (length(maxdate) && !anyNA(maxdate)) { DBI::dbExecute(conn ),“从AAPL_table中删除DATEDIFF(day,RUNDATE,?) > 30",params =list(Maxdate)}}(警告:我没有雪花,所以这是未经检验的。我在sql server中测试了它的前提,没有问题,并且验证了至少支持这两个sql函数。)
https://stackoverflow.com/questions/67151981
复制相似问题