我试图跟随PySpark高级分析的第5章中的代码示例。就上下文而言,这一章是关于k_means聚类的,我正在讨论的部分是使用一种计算加权平均熵的方法来寻找k的最优值。然而,在遵循教科书的同时,我发现了一个错误,我不知道如何修复。
以下是教科书中的代码:第1部分:
from math import log
def entropy(counts):
values = [c for c in counts if (c > 0)]
n = sum(values)
p = [v/n for v in values]
return sum([-1*(p_v) * log(p_v) for p_v
试图使用pyspark im运行spark,得到以下错误:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (pi1 executor driver): or
我有以下SQL查询,我想将其转换为pyspark。我想使用两个列pp和gender,并在pyspark中执行以下操作 %sql
SELECT pp
, SUM(CASE WHEN Gender = 'M'
THEN 1.0 ELSE 0.0 END) /
COUNT(1) AS gender_score
, count(1) AS total
FROM df
WHERE gender in ('M', 'F')
GROUP BY pp
HAVING
我有一个Spark (sdf),其中每一行都显示一个访问DataFrame的IP。我想要计算这个数据帧中不同的IP-URL对,最直接的解决方案是sdf.groupBy("ip", "url").count()。但是,由于数据帧有数十亿行,精确计数可能需要相当长的时间。我不是特别熟悉PySpark --我试着用.approx_count_distinct()替换.count(),这在语法上是不正确的。
我搜索"how to use .approx_count_distinct() with groupBy()“,找到了。然而,建议的解决方案(类似于:sdf
我在这里搜索过,但没有找到对我有用的东西。基本上,我只有一个行列表(有一些列),我必须将它们写在一个拼花表中。我需要“强制”从列表到DF,但只有1行,我有很多问题!
from pyspark.sql import Window,Row
from pyspark.sql import functions as F
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
tablename='table'
start_time = F.lit(datetime.datetime.no
获得了以下pyspark代码:
import pyspark.sql.functions as F
null_or_unknown_count = df.sample(0.01).filter(
F.col('env').isNull() | (F.col('env') == 'Unknown')
).count()
在测试代码中,数据帧是模拟的,所以我尝试为这个调用设置return_value,如下所示:
from unittest import mock
from unittest.mock import ANY
@mock.pa
我试图将爆炸的列添加到dataframe中:
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Convenience function for turning JSON strings into DataFrames.
def jsonToDataFrame(json, schema=None):
# SparkSessions are available with Spark 2.0+
reader = spark.read
if schema:
reader.schema(s
我正在使用电子病历中的蜂巢Metastore。
我可以通过HiveSQL或SparkSQL手动查询表。
但是当我在星火作业中使用相同的表时,它会说表或视图找不到
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco pyspark.sql.utils.AnalysisException:
u"Table or view not found: `logan_test`.`salary_csv`; line 1 pos 21;
'Aggregate
SELECT county, state, deaths, cases, count (*) as count
FROM table
GROUP BY county, state, deaths, cases
HAVING count(*)>1
我通过SQL从上面的查询中获得以下数据。我想要的是将这两个SQL查询转换为
Pandas
PySpark
请让我知道,因为我对熊猫和PySpark都是新手
注意-我不想使用spark.sql,而是希望使用spark.table从表中读取并执行上述操作。
我来自R和到PySpark,因为它的出色的火花处理,我正在努力从一个上下文映射到另一个特定的概念。
尤其是,假设我拥有如下数据集
x | y
--+--
a | 5
a | 8
a | 7
b | 1
我希望添加一个列,其中包含每个x值的行数,如下所示:
x | y | n
--+---+---
a | 5 | 3
a | 8 | 3
a | 7 | 3
b | 1 | 1
在dplyr中,我只想说:
import(tidyverse)
df <- read_csv("...")
df %>%
group_by(x) %>%
mutate(n
我正在尝试使用pyspark将数据从每天的批处理发送到Kafka主题,但目前我收到了以下错误:
文件跟踪(最近一次调用):文件"",第5行,文件"/usr/local/rms/lib/hdp26_c5000/spark2/python/pyspark/sql/readwriter.py",第548行,保存self._jwrite.save()文件"/usr/local/rms/lib/hdp26_c5000/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py",第1133行,在 c
我正在尝试将以下SQL查询转换为pyspark: SELECT COUNT( CASE WHEN COALESCE(data.pred,0) != 0 AND COALESCE(data.val,0) != 0
AND (ABS(COALESCE(data.pred,0) - COALESCE(data.val,0)) / COALESCE(data.val,0)) > 0.1
THEN data.pred END) / COUNT(*) AS Result 我现在在PySpark中的代码是: Result = data.select(
count(
我正在使用pyspark dataframe从每个行的数组中查找不同的计数:输入: col1 1,1,1 1,2,1,2
output:
1
3
2
I used below code but it is giving me the length of an array:
output:
3
3
4
please help me how do i achieve this using python pyspark dataframe.
slen = udf(lambda s: len(s), IntegerType())
count = Df.withColumn("Coun
我在pyspark dataframe中有一个count列,如下所示:
id Count Percent
a 3 50
b 3 50
我想要一个结果数据帧为:
id Count Percent CCount CPercent
a 3 50 3 50
b 3 50 6 100
我不能使用熊猫数据帧,因为数据库太大了。我找到了指向窗口分区的答案,但我没有这样的列作为分区依据。请大家用pyspark dataframe告诉我怎么做。注意: pysp
原谅我的无知,我对火种并不熟悉。我正在尝试改进udf,以便使用字典根据来自另一列count_adj的值创建一个新的列a_type。如何说明在此过程中创建新列的无/空类型。这在熊猫身上是非常容易的(df['adj_count'] = df.a_type.map(count_map)),但在火星雨中却很难做到。
抽样数据/进口:
# all imports used -- not just for this portion of the script
from pyspark.sql import SparkSession, HiveContext, SQLContext
from
#Load the CSV file into a RDD
irisData = sc.textFile("/home/infademo/surya/iris.csv")
irisData.cache()
irisData.count()
#Remove the first line (contains headers)
dataLines = irisData.filter(lambda x: "Sepal" not in x)
dataLines.count()
from pyspark.s