我有一个带有一个模式的数据格式。现有的dataframe已经有50列.Now,我想在现有的dataframe中添加一个新列。新的列名是" hashing_id“,这个hashing_id的逻辑是sha1(行)。我怎样才能做到这一点?
我试过下面的代码。以下两种方法都包含在主类使用的特性中。此特性还扩展了可序列化的。
def addHashingKey():DataFrame={
val sha1 = java.security.MessageDigest.getInstance("SHA-1")
val enCoder = new sun.misc.BASE64Enc
如何将DataFrame cc传递给Array[Seq[String]]
val factors = $(ccCols).split(",")
val cc = dataset.select(factors.head, factors.tail: _*)
我试过这样做,但它给了我Array[Row]
cc.rdd.collect()
对于以下代码--其中DataFrame转换为RDD[Row],新列的数据通过mapPartitions追加
// df is a DataFrame
val dfRdd = df.rdd.mapPartitions {
val bfMap = df.rdd.sparkContext.broadcast(factorsMap)
iter =>
val locMap = bfMap.value
iter.map { r =>
val newseq = r.toSeq :+ locMap(r.getAs[String](inColName))
我有一个dataframe,它希望向一个名为row_num的列中添加一个列,该列表示行的索引。这是我最初的解决方案:
df$row_num<-seq(1:nrow(df))
但是,在df为空的情况下,它不起作用,因为我得到了错误:
Error in `$<-.data.frame`(`*tmp*`, row_num, value = 1:2) :
replacement has 2 rows, data has 0
我发现的一个解决方案是使用来自dplyr的row_number(),但是这似乎会使我的代码慢很多,所以我正在寻找一个更简单的解决方案。
我有一个dataframe,我想按一个列进行分组,并将这些组转换为具有相同模式的dataframe。原因是我想要在各个组之间映射一个带有签名DataFrame -> String的函数。以下是我正在尝试的:
val df = sc.parallelize(Seq((1,2,3),(1,2,4),(2,3,4))).toDF
val schema = df.schema
val groups = df.rdd.groupBy(x => x(0))
.mapValues(g => sqlContext.createDataFrame(sc.makeR
我想得到星火中数据的每一列的最大值。我的代码只适用于一列(例如,第一列):
val col = df.columns(0);
val Row(maxValue: Int) = df.agg(max(col)).head();
我不知道如何将foreach和我所拥有的代码组合起来,这样我就可以为dataframe中的每一列获得最大值。(我不知道dataframe中有多少列,列名是什么)
谢谢。
我的目标是从来自外部dataframe的列和值动态创建一个dataframe。这是如何使用手动模式和数据定义创建dataframe:
val columnSufix: String = "isNull"
val data = Seq(Row(
details.filter(col("DAY").isNull).count(),
details.filter(col("CHANNEL_CATEGORY").isNull).count(),
这是我第一次使用火花或scala,所以我是个新手。我有一个2D数组,我需要把它转换成一个数据帧。示例数据是一个连接表,其形式为矩形(double),点(a,b)也是双倍的,以及点是否位于矩形内的布尔值。我的最终目标是返回一个带有矩形名称的dataframe,以及它在ST_contains为真的地方出现多少次。由于查询返回所有为真的实例,所以我只是尝试按矩形进行排序(它们被命名为doubles),并对每个事件进行计数。我将其放入数组中,然后尝试将其转换为dataset。下面是我的一些代码和我尝试过的内容:
// Join two datasets (not my code)
spark.udf.