我有一个名为“description”值的dataframe列,格式如下
ABC XXXXXXXXXXXX STORE NAME ABC TYPE1
我想把它解析成3列,如下所示
| mode | type | store | description |
|------------------------------------------------------------------------|
| ABC | TYPE1 | STORE NAME | ABC XXXXXXXXXXXX STORE NAM
我希望使用选定的列对数据进行排序,方法是将它们从giving类型转换为prederred类型和prederred order。,但是即使是简单的列转换也不起作用,从而导致了这种异常。我在这里提供了示例代码。
val conf = new SparkConf().setAppName("Sparkify").setMaster("local[*]")
val sparkContext =new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)
var d
当我想在Spark2.2中重命名我的DataFrame列并使用show()打印它的内容时,我会得到以下错误:
18/01/04 12:05:37 WARN ScalaRowValueReader: Field 'cluster' is backed by an array but the associated Spark Schema does not reflect this;
(use es.read.field.as.array.include/exclude)
18/01/04 12:05:37 WARN ScalaRowValueRead
我正在使用spark完成一些小步骤,我的练习是将一个JSON文件加载到RDD中,选择一个列,然后使用distinct来获得惟一的值。我过滤的列包含多个值(CSV行),必须拆分。
val sqlContext = spark.sqlContext
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc)
import hiveCtx.implicits._
val bizDF = hiveCtx.jsonFile("/home/xpto/Documents/PersonalProjects
根据这个,我正在应用udf来过滤CountVectorizer之后的空向量。
val tokenizer = new RegexTokenizer().setPattern("\\|").setInputCol("dataString").setOutputCol("dataStringWords")
val vectorizer = new CountVectorizer().setInputCol("dataStringWords").setOutputCol("features")
val pipeline
在我的spark DataFrame中,有一列包含了CountVectoriser转换的输出-它是稀疏向量格式的。我想要做的是将这列再次“分解”成一个密集的向量,然后是它的组成部分行(这样它就可以用于外部模型的评分)。
我知道本专栏中有40个特性,因此在下面的示例中,我尝试了:
import org.apache.spark.sql.functions.udf
import org.apache.spark.mllib.linalg.Vector
// convert sparse vector to a dense vector, and then to array<double&g
我正在尝试对一个列执行一些正则表达式操作。为了做到这一点,我用如下的基本小写操作进行了说明:
df.select('name').map(lambda x: x.lower())
这里的df是一个DataFrame,当我调用collect()操作时,该操作抛出了一个异常。
Ques 1: After map(or reduce) operation, every DataFrame converts to a Pipelined RDD. Am I right?
如果是这样,为什么这个命令在收集流水线RDD时抛出异常。
我错过了什么吗?
异常太大,无法读取:
17/07
我已经编写了Scala代码与spark dataframe相结合。起初,它是有效的(只有当我不使用if else语句时)。虽然它不是一个干净的代码,但我想知道如何转换它?
其次,if/ else语句不起作用,我如何像python中那样将值附加到上面的变量中,并在以后将其用作dataframe?
对不起,我是Scala的新手。
%scala
for(n <- Scalaconfigs){
var bulkCopyMetadata = new BulkCopyMetadata
val sourceTable = n(0)
val tar
有人能在这个场景中帮助我吗?我正在使用spark/scala读取一个Json文件,然后尝试访问列名,但在访问列名时,我得到了下面的错误消息。 org.apache.spark.sql.AnalysisException: cannot resolve
'explode(`b2b_bill_products_prod_details`.`amt`)'
due to data type mismatch: input to function explode should be
array or map type, not DoubleTy
我想基于下面的hive创建一个df:
WITH FILTERED_table1 AS (select *
, row_number() over (partition by key_timestamp order by datime DESC) rn
FROM table1)
scala function:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val table1 = Wi