我想使用PySpark应用程序在远程Postgres服务器上使用执行以下查询
SELECT id, postgres_function(some_column) FROM my_database GROUP BY id
问题是,我不能使用spark.sql(QUERY)对Pyspark执行这种查询,很明显,因为postgres_function不是ANSI函数。
我用的是星火2.0.1和Postgres 9.4。
我已经编写了一个SQL查询,其中包含一个子查询。这是一个正确的mySQL查询,但没有在Pyspark上实现 from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import *
sc = spark.sparkConte
如何执行不是程序驱动程序部分的例程的火花sql查询?
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
def doWork(rec):
data = SQLContext.sql("select * from zip_data where STATEFP ='{sfp}' and COUNTYFP = '{cfp}' ".format(sfp=rec[0], cfp=rec[1]))
我有以下SQL查询,我想将其转换为pyspark。我想使用两个列pp和gender,并在pyspark中执行以下操作 %sql
SELECT pp
, SUM(CASE WHEN Gender = 'M'
THEN 1.0 ELSE 0.0 END) /
COUNT(1) AS gender_score
, count(1) AS total
FROM df
WHERE gender in ('M', 'F')
GROUP BY pp
HAVING
我创建了一个增量表,它使用spark 3.x和增量0.7.x创建:
data = spark.range(0, 5)
data.write.format("delta").mode("overwrite").save("tmp/delta-table")
# add some more files
data = spark.range(20, 100)
data.write.format("delta").mode("append").save("tmp/delta-table")
df = sp
我对数据砖火花SQL很陌生。我正在寻找嵌套的collect_list并试图找出答案。下面是我的星星之火sql查询
select
policy.CustomerId,
collect_list(struct(Number, Type, Id, Product.product))as policydetail
from
policy
Left Join
(
SELECT
po
SELECT county, state, deaths, cases, count (*) as count
FROM table
GROUP BY county, state, deaths, cases
HAVING count(*)>1
我通过SQL从上面的查询中获得以下数据。我想要的是将这两个SQL查询转换为
Pandas
PySpark
请让我知道,因为我对熊猫和PySpark都是新手
注意-我不想使用spark.sql,而是希望使用spark.table从表中读取并执行上述操作。
在我们的Spark流应用程序中,使用60秒批处理,我们在一个DF上创建一个临时表,然后对它运行大约80个查询,如下所示:
sparkSession.sql("select ... from temp_view group by ...")
但是,考虑到这些查询相当繁重,包含大约300列的求和列,如果我们不必分析sql并为每个微批生成一个查询计划,那就太好了。
没有一种方法可以生成、缓存和重用查询计划吗?即使每个查询只保存50 us,每批也会节省我们大约4s。
我们在CDH/纱线上使用Spark2.2。谢谢。
我试图从表Warehouses和Boxes中选择所有的Warehouse.capacity,这样Warehouse.capacity比Boxes.count_of_boxes小。
在postgresql中工作的SQL查询
select w.code
from Warehouses w
join Boxes b
on w.code = b.warehouse
group by w.code
having count(b.code) > w.capacity
但是,相同的查询在pyspark中不工作。
spark.sql("""
select w.code
from
尝试使用以下命令检查某些列中是否有NaN值 ddf_temp = ddf.select('col1', 'col2' ...) # all int type
ddf_temp.select([count(when(isnull(c), c)).alias(c) for c in ddf_temp.columns]).show() 我可以找出哪些列给了我这些错误,但我找不到为什么会出现这样的错误: ---------------------------------------------------------------------------
Py4JJav
我有一个卡桑德拉表,这是相当大的,现在我有火花-卡桑德拉与以下代码连接。
import pandas as pd
import numpy as np
from pyspark import *
import os
from pyspark.sql import SQLContext
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.1 --conf spark.cassandra.connection.host
Spark应该以闪电的速度完成数据处理。但是我猜我没有为我的程序使用正确的功能来让Spark以这种方式工作。
下面是我的程序的样子:
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyparsing import re
import time
start_time = time.time()
sc = SparkContext("local","test
我有两个sql,它们给出了相同的结果:
问题1:
SELECT
u.*, COUNT(po.order_id) products_count
FROM (SELECT * FROM orders o WHERE o.date >= (CURRENT_DATE() - INTERVAL 1 MONTH)) o
LEFT JOIN products_orders po ON po.order_id=o.id
JOIN users u ON u.id=o.user_id
GROUP BY po.order_id
和查询2:
SELECT
u.*, o.id order_id, COUNT(po
我有个疑问
select * from
table
where (primary_product NOT IN ('No Technical Enforcement')
or group_name ilike ('%stove%'))
我想将相同的查询转换为PySpark SQL,但是我不能这样做,因为我不知道ILIKE的替代品。
我正在处理一个包含大量大型SQL请求的项目,我想知道:
什么在速度方面提供了更好的性能:
联接查询
子查询
例如
SELECT artistName FROM artist
JOIN group
On group.location = artist.location
AND group.available = 'true'
SELECT artistName FROM artist,
(SELECT group.available = true) AS groupAvailable
WHERE groupAvailable.location = artist
我来自R和到PySpark,因为它的出色的火花处理,我正在努力从一个上下文映射到另一个特定的概念。
尤其是,假设我拥有如下数据集
x | y
--+--
a | 5
a | 8
a | 7
b | 1
我希望添加一个列,其中包含每个x值的行数,如下所示:
x | y | n
--+---+---
a | 5 | 3
a | 8 | 3
a | 7 | 3
b | 1 | 1
在dplyr中,我只想说:
import(tidyverse)
df <- read_csv("...")
df %>%
group_by(x) %>%
mutate(n
在spark.sql查询中注册和使用pyspark version 3.1.2内置函数的正确方式是什么? 下面是一个创建pyspark DataFrame对象并在纯SQL中运行简单查询的最小示例。 尝试使用...TypeError: Invalid argument, not a string or column: -5 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' fu
我对SQL非常陌生。有没有人愿意解释一下这两个查询之间是否有什么不同?
SELECT author, sum(words) AS total_words
FROM books
GROUP BY author
HAVING sum(words) > 10000000;
SELECT author, sum(words) AS total_words
FROM books
GROUP BY author
HAVING total_words > 10000000;
我正在间接地从The essence of SQL上学习SQL Cookbook,这本书出版后引入了现代的SQL运算符,其中有一节介绍了它。
演习:
给予:
create table student
( sno integer,
sname varchar(10),
age integer
);
--
/* table of courses */
create table courses
( cno varchar(5),
title varchar(10),
credits integer
);
/* table of students and the courses the
根据所有示例,将SQL转换为LINQ for this子句,如本例所示:
SELECT NAME
FROM TABLES
GROUP BY NAME
HAVING COUNT(*) > 1
is:(vb.net)
from t in tables
group t by t.NAME into g = Group
where g.count > 1
select g
但是,上面的LINQ语句被转换为以下SQL:
SELECT [t1].[NAME] AS [Key]
FROM (
SELECT COUNT(*) AS [value], [t0].[NAME]
我有一组天气数据,我试图查询它,以获得平均低点和平均高点每年。我没有问题,提交的工作和取得预期的结果,但它需要几个小时的运行。我以为它会跑得更快,是我做错了什么,还是它没有我想的那么快?
数据是一个csv文件,包含100,000,000多个条目。THe列是日期、气象站、测量(TMAX或TMIN)和值。
我在我大学的hadoop集群上运行这个工作,我没有更多的关于这个集群的信息。
提前感谢!
import sys
from random import random
from operator import add
from pyspark.sql import SQLContext, Row
f