我已经通过以下方法加载了一个数据集:
Dataset<Row> rows = sparkSession.read().format("com.databricks.spark.csv").option("header", "true").load(tablenameAndLocationMap.get(tablename));数据正在正确加载,但我希望在运行时更新列值。我试过像上面提到的那样使用循环,但没有起作用。
Column data = rows.col("UPLOADED_ON");
Dataset<Row> d = rows.select(data);
d.foreach(obj->{
String date = obj.getAs(0);
DateFormat inputFo formatter = new SimpleDateFormat("yyyy-MM-dd");
Date da = (Date)inputFormatter.parse(date);
DateFormat outputFormatter = new SimpleDateFormat("dd-MM-yy");
date = outputFormatter.format(da);
});在这里,我希望用新的值UPLOADED_ON替换/更新列date的现有值。
如果有人能帮上忙,该怎么做。
谢谢
发布于 2022-11-24 14:37:45
您可以创建另一个具有不同值的列,并删除前一个列。
// create a new column
yourdataset = yourdataset.withColumn("UPLOADED_ON_NEW", lit("Any-value"));
// drop a column
yourdataset = yourdataset.column("UPLOADED_ON");在您的例子中,我建议您创建一个接收日期的UDF函数,并根据需要以特定格式返回它。
示例将函数创建到sparkSession中,用于所有数据集转换。
context.sparkSession().udf().register(
"formatDateYYYYMMDDtoDDMMYY", // name of function
(String dateIn) -> { ... }, // all convert rules
DataTypes.StringType // return type
);使用创建的函数
yourdataset =
yourdataset.withColumn(
"UPLOADED_ON_NEW",
callUDF(
"formatDateYYYYMMDDtoDDMMYY", // same name of create function
col("UPLOADED_ON")
)
);在sqlContext中也可以使用UDF函数
yourdataset.createOrReplaceTempView("MY_DATASET");
yourdataset =
sparkSession.sqlContext().sql("select * , formatDateYYYYMMDDtoDDMMYY(UPLOADED_ON) as UPLOADED_ON_NEW from MY_DATASET");https://stackoverflow.com/questions/74559962
复制相似问题