我的目标是从来自外部dataframe的列和值动态创建一个dataframe。这是如何使用手动模式和数据定义创建dataframe:
val columnSufix: String = "isNull"
val data = Seq(Row(
details.filter(col("DAY").isNull).count(),
details.filter(col("CHANNEL_CATEGORY").isNull).count(),
对于以下代码--其中DataFrame转换为RDD[Row],新列的数据通过mapPartitions追加
// df is a DataFrame
val dfRdd = df.rdd.mapPartitions {
val bfMap = df.rdd.sparkContext.broadcast(factorsMap)
iter =>
val locMap = bfMap.value
iter.map { r =>
val newseq = r.toSeq :+ locMap(r.getAs[String](inColName))
我有两个行数相同的DataFrame,但是根据源,列数是不同的和动态的。
第一个DataFrame包含所有列,但是第二个DataFrame被过滤和处理,没有所有其他列。
需要从第一个DataFrame中选择特定的列,然后添加/合并第二个DataFrame。
val sourceDf = spark.read.load(parquetFilePath)
val resultDf = spark.read.load(resultFilePath)
val columnName :String="Col1"
我试着用几种方式加进去,这里我只给了几个.
val modifiedRes
我有以下数据(只是显示了一个代码片段)
DEST_COUNTRY_NAME ORIGIN_COUNTRY_NAME count
United States Romania 15
United States Croatia 1
United States Ireland 344
Egypt United States 15
我在inferSchema选项设置为true的情况下读取它,然后对列执行describe操作。它似乎工作得很好。
scala> val data = spark.read.option("header", "true
我有一个场景,通过where条件从同一个DataFrame中使用另一个列从DataFrame读取一列,这个值作为IN条件通过,从另一个DataFrame中选择相同的值,我如何在spark DataFrame中实现。
在SQL中,它将类似于:
select distinct(A.date) from table A where A.key in (select B.key from table B where cond='D');
我尝试了如下所示:
val Bkey: DataFrame = b_df.filter(col("cond")==="D
我尝试通过从数据帧中选择小时+分钟/60和其他列来创建新的数据帧,如下所示:
val logon11 = logon1.select("User","PC","Year","Month","Day","Hour","Minute",$"Hour"+$"Minute"/60)
我得到的错误如下:
<console>:38: error: overloaded method value select with alternatives:
(
enter code here我正在练习在数据仓库中添加一个列表。我可以开发udf并注册,然后在dataframe上应用,但我想尝试一种不同的方法,即提取list from dataframe col和它们map it,然后在新列中提取readd to the original dataframe。
val df = spark.createDataFrame(Seq(("A",1),("B",2),("C",3))).toDF("Str", "Num")
+---+---+
|Str|Num|
+---+---+
我需要将一个带有string列的dataframe连接到一个字符串数组中,这样如果数组中的值匹配,行就会连接起来。
我试过了,但我想这不是支持。还有别的办法吗?
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
val spark = SparkSession.builder().config(sparkConf).g
在Spark 2.11中,当将Dataframe转换为Dataset时,spark会保留甚至在dataset的类中都没有引用的额外列。 scala> case class F(x: String, y: String)
defined class F
scala> import spark.implicits._
import spark.implicits._
scala> val df = Seq(("1a","2a","3a","4a"), ("5a", "6a", &
我使用以下命令创建了一个空的Seq() scala> var x = Seq[DataFrame]()
x: Seq[org.apache.spark.sql.DataFrame] = List() 我有一个名为createSamplesForOneDay()的函数,它返回一个DataFrame,我想将它添加到这个Seq() x中。 val temp = createSamplesForOneDay(some_inputs) // this returns a Spark DF
x = x + temp // this throws an error 我得到下面的错误- scala&