首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >PySpark: TypeError:不支持的操作数类型为+:'datetime.datetime‘和'str’

PySpark: TypeError:不支持的操作数类型为+:'datetime.datetime‘和'str’
EN

Stack Overflow用户
提问于 2019-07-22 10:46:47
回答 1查看 2.6K关注 0票数 1

我有DataFrame在PySpark中,它有以下模式:

代码语言:javascript
运行
复制
root
 |-- id: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- time: string (nullable = true)
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)

我想再添加一个类型为date_time的列timestamp

代码语言:javascript
运行
复制
import datetime

to_datetime_func =  udf (lambda d, t: datetime.strptime(d+" "+t, "%Y-%m-%d %H:%M:%S"), TimestampType())
df = df.withColumn("date_time", to_datetime_func("date","time"))

这段代码编译得很好。但是,当我运行一个使用date_time列的简单过滤操作时,我会得到以下错误:

代码语言:javascript
运行
复制
root
 |-- id: string (nullable = true)
 |-- date_time: timestamp (nullable = true)
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)


from pyspark.sql import functions as func

df \
    .filter(func.col("date_time")>=func.col("start"))
    .select("id","date_time","start") \
    .show()

错误:

代码语言:javascript
运行
复制
Py4JJavaError: An error occurred while calling o2966.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 30.0 failed 4 times, most recent failure: Lost task 2.3 in stage 30.0 (TID 765, 10.139.64.4, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 403, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 398, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 365, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 147, in dump_stream
    for obj in iterator:
  File "/databricks/spark/python/pyspark/serializers.py", line 354, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/databricks/spark/python/pyspark/worker.py", line 83, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<command-4293391875175815>", line 1, in <lambda>
TypeError: unsupported operand type(s) for +: 'datetime.datetime' and 'str'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:638)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:299)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:383)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2076)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:223)

更新:

代码语言:javascript
运行
复制
my_concat_func =  udf (lambda d, t: datetime.strptime(d+" "+t, "%Y-%m-%d %H:%M:%S"), StringType())
df = df.withColumn("date", df["date"].cast(StringType()))
df = df.withColumn("date_time", my_concat_func("date","time"))


df.select("date","time","date_time").printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- date_time: string (nullable = true)


df.select("date","time","date_time").show()

ValueError:未转换数据: 03:34:26

EN

回答 1

Stack Overflow用户

发布于 2019-07-22 10:56:03

你能试试这个,让我知道输出结果吗?

代码语言:javascript
运行
复制
timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
    df \
        .filter((func.unix_timestamp('date_time', format=timeFmt) >= func.unix_timestamp('start', format=timeFmt)))
        .select("id","date_time","start") \
        .show()

编辑

关于如何只获取日期而不是时间的问题:

代码语言:javascript
运行
复制
df = df.withColumn("new_data", func.to_date(df.date, 'yyyy-MM-dd'))
df.printSchema()

df = df.withColumn("new_data", df['new_data'].cast(StringType()))
df.show(10, False)
df.printSchema()

#### Output ####
+------------------------+
|date                    |
+------------------------+
|2015-07-02T11:22:21.050Z|
|2016-03-20T21:00:00.000Z|
+------------------------+
root
 |-- date: string (nullable = true)
 |-- new_data: date (nullable = true)
+------------------------+----------+
|date                    |new_data  |
+------------------------+----------+
|2015-07-02T11:22:21.050Z|2015-07-02|
|2016-03-20T21:00:00.000Z|2016-03-20|
+------------------------+----------+
root
 |-- date: string (nullable = true)
 |-- new_data: string (nullable = true)
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57144339

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档