首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >SCD-2在数据库中使用Delta

SCD-2在数据库中使用Delta
EN

Stack Overflow用户
提问于 2020-06-25 15:02:29
回答 1查看 651关注 0票数 2

我正在尝试构建SCD-2转换,但无法在Databricks中实现使用Delta。

示例:

代码语言:javascript
运行
复制
//Base Table
 val employeeDf = Seq((1,"John","CT"),
                     (2,"Mathew","MA"),
                     (3,"Peter","CA"),
                     (4,"Joel","NY"))
                    .toDF("ID","NAME","ADDRESS")

val empBaseDf = employeeDf.withColumn("IS_ACTIVE",lit(1))
  .withColumn("EFFECTIVE_DATE",current_date())
  .withColumn("TERMINATION_DATE",lit(null).cast(StringType))      

empBaseDf.write.format("delta").mode("overwrite").saveAsTable("empBase")

代码语言:javascript
运行
复制
// Batch Data
//Note: Here 1 record changed, 2 new record and 1 unchnaged.
val updateEmployeeDf = Seq( (1,"John","NH"),
                 (2,"Mathew","MA"),
                (5,"Adam","NJ"),
                (6,"Philip","CT")).toDF("ID","NAME","ADDRESS").createOrReplaceTempView("EmpUpdates")


val updatedEmp = updateEmployeeDf.withColumn("IS_ACTIVE",lit(1))
      .withColumn("EFFECTIVE_DATE",current_date())
      .withColumn("TERMINATION_DATE",lit(null).cast(StringType))      

updatedEmp.createOrReplaceTempView("empBatch")

代码语言:javascript
运行
复制
import io.delta.tables._    
val empbaseTable: DeltaTable =  DeltaTable.forName("empBase")          
val empBatch = table("empBatch")

// Rows to INSERT new addresses of existing customers
val newAddressesToInsert = empBatch
  .as("batch")
  .join(empbaseTable.toDF.as("emp"), "ID")
  .where("batch.ADDRESS <> emp.ADDRESS").selectExpr("batch.*")

newAddressesToInsert.show()

代码语言:javascript
运行
复制
val processRec = newAddressesToInsert
  .selectExpr("NULL as mergeKey", "*")
  .union(empBatch.selectExpr("ID as mergeKey", "*")  )                 
processRec.show()

代码语言:javascript
运行
复制
empbaseTable
  .as("base")
  .merge(processRec.as("batch1"),"base.ID = mergeKey")
  .whenMatched("base.IS_ACTIVE = true AND base.address <> batch1.address")
  .updateExpr(Map(                                      
    "IS_ACTIVE" -> "false",
    "TERMINATION_DATE" -> "current_date()"))
  .whenNotMatched()  
  .insertExpr((Map("ID" -> "batch1.ID",
              "NAME" -> "batch1.NAME",
              "ADDRESS" -> "batch1.ADDRESS",             
              "IS_ACTIVE" -> "true",              
              "EFFECTIVE_DATE" -> "current_date()",
               "TERMINATION_DATE" -> "null" )))
  .execute()

//With multiple run of the above code duplicate records are getting inserted. I need to restrict the duplicate entry into the delta table.
ID  NAME    ADDRESS IS_ACTIVE   EFFECTIVE_DATE  TERMINATION_DATE
1   John    NH  1   2020-06-25  null
1   John    CT  0   2020-06-25  2020-06-25
1   John    NH  1   2020-06-25  null
2   Mathew  MA  1   2020-06-25  null
3   Peter   CA  1   2020-06-25  null
4   Joel    NY  1   2020-06-25  null
5   Adam    NJ  1   2020-06-25  null
6   Philip  CT  1   2020-06-25  null

我遵循了来自databricks的SCD-2转换的文档,但没有为我工作。https://docs.databricks.com/delta/delta-update.html#write-change-data-into-a-delta-table

任何建议都是有用的。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-06-26 05:50:39

在为员工记录接收的更新创建新条目时,必须通过添加谓词emp.IS_ACTIVE = true,确保根据employee表中雇员的最新条目验证更新记录,这将避免重复。

代码语言:javascript
运行
复制
// Rows to INSERT new addresses of existing customers
val newAddressesToInsert = empBatch
  .as("batch")
  .join(empbaseTable.toDF.as("emp"), "ID")
  .where("emp.IS_ACTIVE = true and batch.ADDRESS <> emp.ADDRESS").selectExpr("batch.*")
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62578435

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档