下面是spark shell代码
scala> val colName = "time_period_id"
scala> val df = spark.sql("""select time_period_id from prod.demo where time_period_id =
202101102 """)
df: org.apache.spark.sql.DataFrame = [time_period_id: int]
scala> val result = df.agg(max(
我有一个星火DataFrame:
我必须使用Scala从中计算平均精度。我想,根据文档,我们必须使用RDD而不是DataFrame。我尝试了以下几点:
var llist = df.select("predicted", "actual").rdd.map(x => (x.get(0), x.get(1))).collect()
// It gave Array[(Any, Any)]
var df_rdd =sc.parallelize(llist)
// df_rdd is org.apache.spark.rdd.RDD[(Any, An
我正在尝试在C# Spark中实现向量自定义函数。 我已经通过Spark .Net创建了.Net Spark环境。在我的IntegerType专栏中,Vector Udf (Apache箭头和Microsoft.Data.Analysis都是)很好用。现在,尝试将Integer数组类型的列发送到Vector Udf,但找不到实现此目的的方法。 用法 using System;
using System.Linq;
using Microsoft.Data.Analysis;
using Microsoft.Spark.Sql;
using func = Microsoft.Spark.Sql
我试图理解map和flatMap是如何工作的,但被下面的代码卡住了。flatMap()函数返回一个RDDChar,但我期望返回的是RDDString。有人能解释一下为什么它会产生RDDChar吗?
scala> val inputRDD = sc.parallelize(Array(Array("This is Spark"), Array("It is a processing language"),Array("Very fast"),Array("Memory operations")))
scala> val
我将csv文件读取到RDD,并试图将其转换为DataFrame。但是,它克服了错误。
scala> rows.toDF()
<console>:34: error: value toDF is not a member of org.apache.spark.rdd.RDD[Array[String]]
rows.toDF()
scala> rows.take(2)
Array[Array[String]] = Array(Array(1, 0, 3, "Braund, ...
我做错了
我使用以下命令创建了一个空的Seq() scala> var x = Seq[DataFrame]()
x: Seq[org.apache.spark.sql.DataFrame] = List() 我有一个名为createSamplesForOneDay()的函数,它返回一个DataFrame,我想将它添加到这个Seq() x中。 val temp = createSamplesForOneDay(some_inputs) // this returns a Spark DF
x = x + temp // this throws an error 我得到下面的错误- scala&
我正在尝试创建一个dataFrame。似乎spark无法从scala.Tuple2类型创建数据帧。我该怎么做呢?我是scala和spark的新手。 下面是代码运行中的错误跟踪的一部分 Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
- field (class: "org.apache.spark.sql.Row", name: "_1")
- root class:
我熟悉Python,我正在学习Spark-Scala。
我想构建一个DataFrame,它的结构由以下语法描述:
// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
(1.1, Vectors.dense(1.1, 0.1)),
(0.2, Vectors.dense(1.0, -1.0)),
(3.0, Vectors.dense(1.3, 1.0)),
(1.0, Vectors.dense(1.2, -0.5)
我有spark scala应用程序。我正在尝试使用它内部的Futures来并行化几个独立的操作集。我在期货中调用它们,它们返回给我未来类型的DataFrame,我如何在最后合并它们,并在任何未来类型无法计算的情况下抛出错误。下面是我的代码。当我尝试在onComplete块中应用数据帧的联合时,它显示以下错误
value union is not a member of scala.concurrent.Future[(scala.concurrent.Future[org.apache.spark.sql.DataFrame], scala.concurrent.Future[org.apac
当我在将我的代码从Spark2.0迁移到2.1时,我无意中发现了一个与Dataframe保存相关的问题。
这是密码
import org.apache.spark.sql.types._
import org.apache.spark.ml.linalg.VectorUDT
val df = spark.createDataFrame(Seq(Tuple1(1))).toDF("values")
val toSave = new org.apache.spark.ml.feature.VectorAssembler().setInputCols(Array("value
我有一个RDD[MapString,Any],我正在尝试将它转换为Dataframe。我没有可以指定Dataframe的架构。
我试着做了一个rdd.toDF,但是没有帮助。它出现了一个错误,如下所示。
Exception in thread "main" java.lang.ClassNotFoundException: scala.Any
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.ja
我是新来的火花/斯卡拉。我正在尝试读取一些数据从一个蜂窝表到一个火花数据,然后添加一个列的基础上的一些条件。这是我的代码:
val DF = hiveContext.sql("select * from (select * from test_table where partition_date='2017-11-22') a JOIN (select max(id) as bid from test_table where partition_date='2017-11-22' group by at_id) b ON a.id=b.bid")
我的问题是,当我将代码转换为流模式并将数据帧放入foreach循环时,数据帧会显示空表!我不填!我也不能将它放入assembler.transform()中。错误是:
Error:(38, 40) not enough arguments for method map: (mapFunc: String => U)(implicit evidence$2: scala.reflect.ClassTag[U])org.apache.spark.streaming.dstream.DStream[U].
Unspecified value parameter mapFunc.
v