我有一些时间戳格式为"0001-mm-dd HH:MM:SS“的数据。我在试着得到最短的时间。为了获得最小时间,我需要首先转换为DoubleType,因为PySpark数据帧的minimum函数显然不适用于时间戳。然而,出于某些原因,日期时间讨厌0001年。无论我做什么,我都不能让它工作。下面,我尝试使用UDF手动将年份增加1,但由于某些原因,它没有注册。但是,我可以使用没有0001年的另一列数据,并将函数中的if语句更改为数据中包含的年份,这样我就可以观察到年份的变化。
我做错了什么?
from pyspark.sql import SQLContext
import pyspark.sql.functions as sfunc
import pyspark.sql.types as tp
from pyspark import SparkConf
from dateutil.relativedelta import relativedelta
columnname='x'
#columnname='y'
tmpdf.select(columnname).show(5)
def timeyearonecheck(date):
'''Datetimes breaks down at year = 0001, so bump up the year to 0002'''
if date.year == 1:
newdate=date+relativedelta(years=1)
return newdate
else:
return date
def timeConverter(timestamp):
'''Takes either a TimestampType() or a DateType() and converts it into a
float'''
timetuple=timestamp.timetuple()
if type(timestamp) == datetime.date:
timevalue=time.mktime(timetuple)
return int(timevalue)
else:
timevalue=time.mktime(timetuple)+timestamp.microsecond/1000000
return timevalue
tmptimedf1colname='tmpyeartime'
yearoneudf=sfunc.udf(timeyearonecheck,tp.TimestampType())
tmptimedf1=tmpdf.select(yearoneudf(sfunc.col(columnname)).alias(tmptimedf1colname))
tmptimedf2colname='numbertime'
timeudf=sfunc.udf(timeConverter,tp.DoubleType())
tmptimedf2=tmptimedf1.select(timeudf(sfunc.col(tmptimedf1colname)).alias(tmptimedf2colname))
minimum=tmptimedf2.select(tmptimedf2colname).rdd.min()[0]
+-------------------+
| x|
+-------------------+
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
|0001-01-02 00:00:00|
+-------------------+
only showing top 5 rows
Py4JJavaError Traceback (most recent call last)
<ipython-input-42-b5725bf01860> in <module>()
17 timeudf=sfunc.udf(timeConverter,tp.DoubleType())
18
tmptimedf2=tmpdf.select(timeudf(sfunc.col(columnname)).
alias(tmptimedf2colname))
---> 19 minimum=tmptimedf2.select(tmptimedf2colname).rdd.min()[0]
20 print(minimum)
...
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
in stage 43.0 failed 4 times, most recent failure: Lost task 3.3 in stage
43.0 (TID 7829, 10.10.12.41, executor 39):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
ValueError: year 0 is out of range
即使我只是试图查看第一个UDF的输出,我也会得到一个错误,但只有当我查看输出时,而不是当我实际计算它时。
tmptimedf1.select(tmptimedf1colname).show(5)
Py4JJavaError Traceback (most recent call last)
<ipython-input-44-5fc942678065> in <module>()
----> 1 tmptimedf1.select(tmptimedf1colname).show(5)
...
Py4JJavaError: An error occurred while calling o2215.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 44.0 failed 4 times, most recent failure: Lost task 0.3 in stage
44.0 (TID 7984, 10.10.12.36, executor 4):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
...
ValueError: year 0 is out of range
更重要的是,如果我这样做了,我会得到相同的ValueError来谈论第0年:
tmpdf.select(columnname).first()
但前提是我使用年份为0001的列,而不是没有0001年的'y‘列。'y‘列工作正常。
我不明白为什么我可以显示tmpdf的5个值,其中包括0001,但我不能选择第一个值,因为它有0001。
编辑:如下所述,我真的很想将0001年转换为002年,因为PySpark的approxQuantile不能在时间戳上工作,而且一般来说,我对数据集的了解不够,无法知道哪些年份是可接受的。0001绝对是一个填充年,但1970年在我的数据中可能是一个真实的年份(在我的工作的一般情况下)。
到目前为止,我得到了这样的结论:
def tmpfunc(timestamp):
time=datetime.datetime.strptime(timestamp,'%Y-%m-%d %H:%M:%S')
return time
adf=datadf.select(sfunc.col(columnname).cast("string").alias('a'))
newdf = adf.withColumn('b',sfunc.regexp_replace('a', '0001-', '0002-'))
newdf.show(10)
print(newdf.first())
tmpudf=sfunc.udf(tmpfunc,tp.TimestampType())
newnewdf=newdf.select(tmpudf(sfunc.col('b')).alias('c'))
newnewdf.show(10)
print(newnewdf.first())
+-------------------+-------------------+
| a| b|
+-------------------+-------------------+
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|2015-10-13 09:56:09|2015-10-13 09:56:09|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
|2013-11-05 21:28:09|2013-11-05 21:28:09|
|1993-12-24 03:52:47|1993-12-24 03:52:47|
|0001-01-02 00:00:00|0002-01-02 00:00:00|
+-------------------+-------------------+
only showing top 10 rows
Row(a='0001-01-02 00:00:00', b='0002-01-02 00:00:00')
+-------------------+
| c|
+-------------------+
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|0002-01-03 23:56:02|
|2015-10-13 09:56:09|
|0002-01-03 23:56:02|
|2013-11-05 21:28:09|
|1993-12-24 03:52:47|
|0002-01-03 23:56:02|
+-------------------+
only showing top 10 rows
Row(c=datetime.datetime(2, 1, 2, 0, 0))
正如一位用户在下面评论的那样,"show“中的日期是1天23小时56分钟2秒。为什么,我如何摆脱它?那么为什么我的“第一次”调用是正确的,但也在应该是(2,1,2,0,0,0)的地方少了一个0?
发布于 2018-06-16 19:52:34
为了获得最小时间,我需要首先转换为DoubleType,因为用于PySpark数据帧的minimum函数显然不适用于
。
确实是这样的
df = spark.createDataFrame(
["0001-01-02 00:00:00", "0001-01-03 00:00:00"], "string"
).selectExpr("to_timestamp(value) AS x")
min_max_df = df.select(sfunc.min("x"), sfunc.max("x"))
min_max_df.show()
# +-------------------+-------------------+
# | min(x)| max(x)|
# +-------------------+-------------------+
# |0001-01-02 00:00:00|0001-01-03 00:00:00|
# +-------------------+-------------------+
失败的部分实际上是转换为本地值:
>>> min_max_df.first()
Traceback (most recent call last):
...
return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)
ValueError: year 0 is out of range
最小值的纪元时间戳为
>>> df.select(sfunc.col("x").cast("long")).first().x
-62135683200
当转换回日期时,它似乎被移回了两天(Scala代码):
scala> java.time.Instant.ofEpochSecond(-62135683200L)
res0: java.time.Instant = 0000-12-31T00:00:00Z
因此在Python中不再有效。
假设0001
只是一个占位符,您可以在解析时忽略它:
df.select(sfunc.to_timestamp(
sfunc.col("x").cast("string"),
"0001-MM-dd HH:mm:ss").alias("x")
)).select(
sfunc.min("x"),
sfunc.max("x")
).first()
# Row(min(x)=datetime.datetime(1970, 1, 2, 1, 0), max(x)=datetime.datetime(1970, 1, 3, 1, 0))
您也可以直接将结果转换为string:
df.select(sfunc.min("x").cast("string"), sfunc.max("x").cast("string")).first()
# Row(CAST(min(x) AS STRING)='0001-01-02 00:00:00', CAST(max(x) AS STRING)='0001-01-03 00:00:00')
https://stackoverflow.com/questions/50885719
复制相似问题