问题:
我有两个表(d1和d2)包含地理空间点.我想执行以下查询:
select * from table 1 where table1.point is within 50km of any point in table2.point
我使用Spark-SQL和GeoMesa & Accumulo来实现同样的功能。(Spark作为处理引擎,Accumulo作为数据存储库& GeoMesa用于GeoSpatial库)。
上面的查询是某种left semi join,但我不确定如何使用Spark-SQL实现它,因为就我所读到的子查询而言,在where子句中不能使用子查询。
在spark.sql查询中注册和使用pyspark version 3.1.2内置函数的正确方式是什么? 下面是一个创建pyspark DataFrame对象并在纯SQL中运行简单查询的最小示例。 尝试使用...TypeError: Invalid argument, not a string or column: -5 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' fu
我一直在用简单的空间查询测试geomesa,并将其与Postgis进行比较。例如,在Postgis中,此SQL查询在30秒内运行:
with series as (
select
generate_series(0, 5000) as i
),
points as (
select ST_Point(i, i*2) as geom from series
)
select st_distance(a.geom, b.geom) from points as a, points as b
现在,以下geomesa版本需要5分钟(使用-Xmx10g ):
import org.ap
我想得到一堆多边形的面积。我可以使用st_area和st_geomFromText,但是在尝试使用st_transform时会出现undefined function错误。我需要从4326转变为3857 (或任何将给我英亩)。
geomesa版本:我有2.4.2,但现在我有3.4。
误差
AnalysisException: Undefined function: st_transform. This function is neither a built-in/temporary function, nor a persistent function that is qualified as
我正在尝试将一些Pandas代码转换为Spark以进行缩放。myfunc是一个复杂应用程序接口的包装器,它接受一个字符串并返回一个新的字符串(这意味着我不能使用矢量化函数)。
def myfunc(ds):
for attribute, value in ds.items():
value = api_function(attribute, value)
ds[attribute] = value
return ds
df = df.apply(myfunc, axis='columns')
myfunc获取一个DataSeri
当我调用一个函数时,它会工作。但是,当我在UDF中调用该函数时,它将无法工作。
这是完整的密码。
val sparkConf = new SparkConf().setAppName("HiveFromSpark").set("spark.driver.allowMultipleContexts","true")
val sc = new SparkContext(sparkConf)
val hive = new org.apache.spark.sql.hive.HiveContext(sc)
///////////// UDFS
def
我使用Spark2.4已经有一段时间了,最近几天我刚刚开始使用Spark3.0。在切换到Spark3.0运行udf((x: Int) => x, IntegerType)后,我得到了这个错误
Caused by: org.apache.spark.sql.AnalysisException: You're using untyped Scala UDF, which does not have the input type information. Spark may blindly pass null to the Scala closure with primitive-ty
假设我在python中创建了一个函数,所以将一个数字求幂为2: def squared(s):
return s * s 然后我在Spark session中注册了这个函数,如下所示: spark.udf.register("squaredWithPython", squared) 然后当我在Spark SQL中调用UDF时,如下所示: spark.range(1, 20).registerTempTable("test")
%sql select id, squaredWithPython(id) as id_squared from test 那么,如
在使用Spark的DataFrames时,需要用户定义函数(UDF)来映射列中的数据。UDF要求显式指定参数类型。在我的例子中,我需要操作一个由对象数组组成的列,但我不知道要使用哪种类型。下面是一个例子:
import sqlContext.implicits._
// Start with some data. Each row (here, there's only one row)
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
"&
我已经编写了一个模块,其中包含了在PySpark DataFrames上工作的函数。它们对DataFrame中的列进行转换,然后返回一个新的DataFrame。下面是代码的一个示例,缩短为只包含其中一个函数:
from pyspark.sql import functions as F
from pyspark.sql import types as t
import pandas as pd
import numpy as np
metadta=pd.DataFrame(pd.read_csv("metadata.csv")) # this contains metad
我的设置是在AWS中运行的3节点集群。我已经摄取了我的数据(3000万行),并且在使用jupyter笔记本运行查询时没有任何问题。但现在我正在尝试使用spark和java运行查询,如以下代码片段所示。
public class SparkSqlTest {
private static final Logger log = Logger.getLogger(SparkSqlTest.class);
public static void main(String[] args) {
Map<String, String> dsParams = ne
我有两个数据帧: dataDf和regexDf。dataDf有大量记录,而regexDf有两列正则表达式。我的问题是,我需要根据regexDef中的两列匹配正则表达式的两列来过滤dataDf。我想出了这个
dataDf.registerTempTable("dataTable")
sqlContext.udf.register("matchExpressionCombination", matchExpressionCombination _)
val matchingResults = sqlContext.sql("SELECT * FROM da
我正在开发一个spark 2.0.0版本,其中我的需求是在我的sql上下文中使用'com.facebook.hive.udf.UDFNumberRows‘函数来使用其中一个查询。在我的集群with Hive查询中,我将其用作临时函数,只需定义:创建临时函数myFunc为'com.facebook.hive.udf.UDFNumberRows',这非常简单。
我尝试将其注册到sparkSession,如下所示,但得到一个错误:
sparkSession.sql("""CREATE TEMPORARY FUNCTION myFunc AS '
例如,我们使用Spark执行下面的SQL,我们需要my_udf(row)返回Spark中的分区id。
add jar hdfs:///dir/udf/udf.jar;
create temporary function my_udf as 'com.my.MyUDF';
select row, my_udf(row) from table;
我已经知道如何让taskId在Hive中在MR engine:中执行,但是在Spark中执行它并不有效。请告诉我如何获得partitionID或taskContext的火花在蜂巢UDF,非常感谢!
尝试使用以下命令检查某些列中是否有NaN值 ddf_temp = ddf.select('col1', 'col2' ...) # all int type
ddf_temp.select([count(when(isnull(c), c)).alias(c) for c in ddf_temp.columns]).show() 我可以找出哪些列给了我这些错误,但我找不到为什么会出现这样的错误: ---------------------------------------------------------------------------
Py4JJav