我有一个火花表,里面有三千万个观察值。
DF = sc.parallelize([
[('comp1'),('P1'), '2016-01-01'],
[('comp1'),('P4'),'2015-01-01'],
[('comp2'),('P1'),'2017-01-01'],
[('comp2'),('P2'),'2015-01-01'],
[('comp2'),('P4'),'2016-01-01'],
[('comp3'),('P3'),'2014-01-01'],
[('comp1'),('P2'),'2016-01-01'],
[('comp3'),('P2'),'2017-01-01']
]).toDF(["company", "Project",'Date'])
DF.show()
我想创建一个定向网络数据集来统计每个项目在过去5年中在公司之间的移动。当我在我的表上执行自连接时,它会创建数据集中不存在的边:
DF.alias('l').join(DF.alias('r'), on='Project')\
.where('r.Date > l.Date')\
.select(F.col('l.company').alias('company1'),
F.col('r.company').alias('company2'), 'l.Project')\
.show()
+--------+--------+-------+
|company1|company2|Project|
+--------+--------+-------+
| comp1| comp2| P1|
| comp1| comp3| P2|
| comp2| comp1| P2|
| comp2| comp3| P2| #This is wrong
| comp1| comp2| P4|
+--------+--------+-------+
我尝试创建一个计数器,并在where子句中添加另一个条件:
DF =DF.withColumn("row_num",
F.row_number().over(Window.partitionBy("Project"))).orderBy('Project',
'Date')
DF.alias('l').join(DF.alias('r'), on='Project')\
.where(('r.Date > l.Date')& ('r.row_num - l.row_num < 2' ))\
.select(F.col('l.company').alias('company1'),
F.col('r.company').alias('company2'), 'l.Project')\
.show()
但是我得到了这个错误:
TypeError: unsupported operand type(s) for &: 'str' and 'str'
如何更改Where子句中的条件以纠正此问题?
我正在集群上工作,无法安装库,而且我只安装了networkx,而我的Spark版本是1.6
发布于 2018-07-24 23:31:03
我找到了一种方法来创建我正在寻找的输出:
df_lag = DF.withColumn('comp1',F.lag(DF['company']).\
over(Window.partitionBy("Project")))\
.filter(~F.isnull(F.col('comp1'))).select(F.col('comp1'),
F.col('company').alias('comp2'),F.col('project')).show()
+-----+-----+-------+
|comp1|comp2|project|
+-----+-----+-------+
|comp1|comp2| P1|
|comp2|comp1| P2|
|comp1|comp3| P2|
|comp1|comp2| P4|
+-----+-----+-------+
https://stackoverflow.com/questions/51411239
复制相似问题