我正在尝试访问一个hive表,并从表/ dataframe中提取和转换某些列,然后将这些新列放到一个新的dataframe中。我正试着这样做-
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val hiveDF = sqlContext.sql("select * from table_x")
val system_generated_id = hiveDF("unique_key")
val application_assigned_event_id = hiveDF("
enter code here我正在练习在数据仓库中添加一个列表。我可以开发udf并注册,然后在dataframe上应用,但我想尝试一种不同的方法,即提取list from dataframe col和它们map it,然后在新列中提取readd to the original dataframe。
val df = spark.createDataFrame(Seq(("A",1),("B",2),("C",3))).toDF("Str", "Num")
+---+---+
|Str|Num|
+---+---+
我知道使用.withColumn()向星火.withColumn()添加新列的方法,以及返回DataFrame的UDF。我还知道,我们可以将结果DataFrame转换为DataSet。
我的问题是:
如果我们仍然遵循传统的DF方法(即将列名作为UDF输入的字符串传递),DataSet的类型安全性是如何发挥作用的?
是否有一种“面向对象的方式”来访问列(而不是将列名作为字符串传递),就像我们以前使用RDD那样,用于追加一个新列。
如何在地图、过滤器等正常操作中访问新列?
例如:
scala> case class Temp(a : Int, b : String
我需要对大小为100亿行的三列表t (s,p,o)运行200万次查询。每一列的数据类型为字符串。
只有两种类型的查询:
select s p o from t where s = param
select s p o from t where o = param
如果我将表存储在Postgresql数据库中,则使用Java ThreadPoolExecutor需要6个小时。
你认为Spark能更快地处理查询吗?最好的策略是什么?以下是我的想法:
将表加载到一个dataframe中,并启动对dataframe的查询。
将表加载到parquet数据库中,并对该数据库启动查询
我正在将一些用熊猫编写的代码转换为PySpark。该代码有许多for循环,用于根据用户指定的输入创建可变数量的列。
我使用的是Spark 1.6.x,其中包含以下示例代码:
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
# create a Pandas DataFrame, then convert to Spark DataFrame
test = sqlContext.createDataFrame(pd.D
我有三角桌
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
现在,我要添加一个模式更改,可能是单个列,可能是几个列,可能是嵌套数组。我无法预测代码执行过程中会出现什么情况。
我使用python的se
我尝试通过从数据帧中选择小时+分钟/60和其他列来创建新的数据帧,如下所示:
val logon11 = logon1.select("User","PC","Year","Month","Day","Hour","Minute",$"Hour"+$"Minute"/60)
我得到的错误如下:
<console>:38: error: overloaded method value select with alternatives:
(
我是新来的火花/斯卡拉。我正在尝试读取一些数据从一个蜂窝表到一个火花数据,然后添加一个列的基础上的一些条件。这是我的代码:
val DF = hiveContext.sql("select * from (select * from test_table where partition_date='2017-11-22') a JOIN (select max(id) as bid from test_table where partition_date='2017-11-22' group by at_id) b ON a.id=b.bid")
set.createOrReplaceTempView("input1");
String look = "select case when length(date)>0 then 'Y' else 'N' end as date from input1";
Dataset<Row> Dataset_op = spark.sql(look);
Dataset_op.show();
在上面的代码中,dataframe 'set‘有10列,我已经对其中的一列(即
我试图使用星火红移库,并且无法操作由sqlContext.read()命令创建的数据(从redshift读取)。
这是我的代码:
Class.forName("com.amazon.redshift.jdbc41.Driver")
val conf = new SparkConf().setAppName("Spark Application").setMaster("local[2]")
val sc = new SparkContext(conf)
import org.apache.spark.sql._
val sqlContext