首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >PySpark:无法使用datetime years = 0001执行列操作

PySpark:无法使用datetime years = 0001执行列操作
EN

Stack Overflow用户
提问于 2018-06-16 15:03:03
回答 1查看 2.2K关注 0票数 3

我有一些时间戳格式为"0001-mm-dd HH:MM:SS“的数据。我在试着得到最短的时间。为了获得最小时间,我需要首先转换为DoubleType,因为PySpark数据帧的minimum函数显然不适用于时间戳。然而,出于某些原因,日期时间讨厌0001年。无论我做什么,我都不能让它工作。下面,我尝试使用UDF手动将年份增加1,但由于某些原因,它没有注册。但是,我可以使用没有0001年的另一列数据,并将函数中的if语句更改为数据中包含的年份,这样我就可以观察到年份的变化。

我做错了什么?

代码语言:javascript
复制
from pyspark.sql import SQLContext
import pyspark.sql.functions as sfunc
import pyspark.sql.types as tp
from pyspark import SparkConf
from dateutil.relativedelta import relativedelta

columnname='x'
#columnname='y'
tmpdf.select(columnname).show(5)

def timeyearonecheck(date):
    '''Datetimes breaks down at year = 0001, so bump up the year to 0002'''
    if date.year == 1:
        newdate=date+relativedelta(years=1)
        return newdate
    else:
        return date

def timeConverter(timestamp):
    '''Takes either a TimestampType() or a DateType() and converts it into a 
    float'''
    timetuple=timestamp.timetuple()
    if type(timestamp) == datetime.date:
        timevalue=time.mktime(timetuple)
        return int(timevalue)
    else:
        timevalue=time.mktime(timetuple)+timestamp.microsecond/1000000
        return timevalue

tmptimedf1colname='tmpyeartime'
yearoneudf=sfunc.udf(timeyearonecheck,tp.TimestampType())
tmptimedf1=tmpdf.select(yearoneudf(sfunc.col(columnname)).alias(tmptimedf1colname))
tmptimedf2colname='numbertime'
timeudf=sfunc.udf(timeConverter,tp.DoubleType())
tmptimedf2=tmptimedf1.select(timeudf(sfunc.col(tmptimedf1colname)).alias(tmptimedf2colname))
minimum=tmptimedf2.select(tmptimedf2colname).rdd.min()[0]


+-------------------+
|                  x|
+-------------------+
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
+-------------------+
only showing top 5 rows

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-42-b5725bf01860> in <module>()
 17 timeudf=sfunc.udf(timeConverter,tp.DoubleType())
 18 
tmptimedf2=tmpdf.select(timeudf(sfunc.col(columnname)).
alias(tmptimedf2colname))
---> 19 minimum=tmptimedf2.select(tmptimedf2colname).rdd.min()[0]
 20 print(minimum)
...
Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 
in stage 43.0 failed 4 times, most recent failure: Lost task 3.3 in stage 
43.0 (TID 7829, 10.10.12.41, executor 39): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
ValueError: year 0 is out of range

即使我只是试图查看第一个UDF的输出,我也会得到一个错误,但只有当我查看输出时,而不是当我实际计算它时。

代码语言:javascript
复制
tmptimedf1.select(tmptimedf1colname).show(5)

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-44-5fc942678065> in <module>()
----> 1 tmptimedf1.select(tmptimedf1colname).show(5)
...
Py4JJavaError: An error occurred while calling o2215.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
 in stage 44.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
44.0 (TID 7984, 10.10.12.36, executor 4): 
org.apache.spark.api.python.PythonException: Traceback (most recent call 
last):
...
ValueError: year 0 is out of range

更重要的是,如果我这样做了,我会得到相同的ValueError来谈论第0年:

代码语言:javascript
复制
tmpdf.select(columnname).first()

但前提是我使用年份为0001的列,而不是没有0001年的'y‘列。'y‘列工作正常。

我不明白为什么我可以显示tmpdf的5个值,其中包括0001,但我不能选择第一个值,因为它有0001。

编辑:如下所述,我真的很想将0001年转换为002年,因为PySpark的approxQuantile不能在时间戳上工作,而且一般来说,我对数据集的了解不够,无法知道哪些年份是可接受的。0001绝对是一个填充年,但1970年在我的数据中可能是一个真实的年份(在我的工作的一般情况下)。

到目前为止,我得到了这样的结论:

代码语言:javascript
复制
def tmpfunc(timestamp):
    time=datetime.datetime.strptime(timestamp,'%Y-%m-%d %H:%M:%S')
    return time

adf=datadf.select(sfunc.col(columnname).cast("string").alias('a'))
newdf = adf.withColumn('b',sfunc.regexp_replace('a', '0001-', '0002-'))
newdf.show(10)
print(newdf.first())
tmpudf=sfunc.udf(tmpfunc,tp.TimestampType())
newnewdf=newdf.select(tmpudf(sfunc.col('b')).alias('c'))
newnewdf.show(10)
print(newnewdf.first())

+-------------------+-------------------+
|                  a|                  b|
+-------------------+-------------------+
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|2015-10-13 09:56:09|2015-10-13 09:56:09|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|2013-11-05 21:28:09|2013-11-05 21:28:09|
|1993-12-24 03:52:47|1993-12-24 03:52:47|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
+-------------------+-------------------+
only showing top 10 rows

Row(a='0001-01-02 00:00:00', b='0002-01-02 00:00:00')
+-------------------+
|                  c|
+-------------------+
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|2015-10-13 09:56:09|
|0002-01-03 23:56:02|
|2013-11-05 21:28:09|
|1993-12-24 03:52:47|
|0002-01-03 23:56:02|
+-------------------+
only showing top 10 rows

Row(c=datetime.datetime(2, 1, 2, 0, 0))

正如一位用户在下面评论的那样,"show“中的日期是1天23小时56分钟2秒。为什么,我如何摆脱它?那么为什么我的“第一次”调用是正确的,但也在应该是(2,1,2,0,0,0)的地方少了一个0?

EN

回答 1

Stack Overflow用户

发布于 2018-06-16 19:52:34

为了获得最小时间,我需要首先转换为DoubleType,因为用于PySpark数据帧的minimum函数显然不适用于

确实是这样的

代码语言:javascript
复制
df = spark.createDataFrame(
    ["0001-01-02 00:00:00", "0001-01-03 00:00:00"], "string"
).selectExpr("to_timestamp(value) AS x")

min_max_df = df.select(sfunc.min("x"), sfunc.max("x"))
min_max_df.show()
# +-------------------+-------------------+
# |             min(x)|             max(x)|
# +-------------------+-------------------+
# |0001-01-02 00:00:00|0001-01-03 00:00:00|
# +-------------------+-------------------+

失败的部分实际上是转换为本地值:

代码语言:javascript
复制
>>> min_max_df.first()
Traceback (most recent call last):
...
    return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)
ValueError: year 0 is out of range

最小值的纪元时间戳为

代码语言:javascript
复制
>>> df.select(sfunc.col("x").cast("long")).first().x
-62135683200

当转换回日期时,它似乎被移回了两天(Scala代码):

代码语言:javascript
复制
scala> java.time.Instant.ofEpochSecond(-62135683200L)
res0: java.time.Instant = 0000-12-31T00:00:00Z

因此在Python中不再有效。

假设0001只是一个占位符,您可以在解析时忽略它:

代码语言:javascript
复制
df.select(sfunc.to_timestamp(
   sfunc.col("x").cast("string"),
   "0001-MM-dd HH:mm:ss").alias("x")
)).select(
    sfunc.min("x"),
    sfunc.max("x")
).first()
# Row(min(x)=datetime.datetime(1970, 1, 2, 1, 0), max(x)=datetime.datetime(1970, 1, 3, 1, 0))

您也可以直接将结果转换为string:

代码语言:javascript
复制
df.select(sfunc.min("x").cast("string"), sfunc.max("x").cast("string")).first()
# Row(CAST(min(x) AS STRING)='0001-01-02 00:00:00', CAST(max(x) AS STRING)='0001-01-03 00:00:00')
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50885719

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档