首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何为现有表生成行号作为列?

如何为现有表生成行号作为列?
EN

Stack Overflow用户
提问于 2017-10-16 06:11:11
回答 2查看 1.7K关注 0票数 0

我希望将行号(row_num)创建为MySql中现有表的列,通过spark并行读取数据库(即分区列,因为表中的所有列都是字符串)。

当我试图执行这个查询时:

代码语言:javascript
运行
复制
val query = SELECT @row_number:=@row_number+1 as rowid,d.* FROM destination d, (SELECT @row_number:=0) as init

我有一个例外情况如下:

代码语言:javascript
运行
复制
17/10/16 10:50:00 INFO SparkSqlParser: Parsing command: SELECT @row_number:=@row_number+1 as rowid,d. FROM destination d, (SELECT @row_number:=0) as init
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
no viable alternative at input 'SELECT @'(line 1, pos 7)

== SQL ==
SELECT @row_number:=@row_number+1 as rowid,d. FROM destination d, (SELECT @row_number:=0) as init
-------^^^

at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
at com.syntel.spark.sparkDVT$.main(sparkDVT.scala:61)
at com.syntel.spark.sparkDVT.main(sparkDVT.scala)

我试过的代码:

代码语言:javascript
运行
复制
val p2 = "@row_number"
val a = s"""SELECT $p2:=$p2+1 as rowid,d.* FROM destination d, (SELECT $p2:=0) as init"""
val df1 = spark.sql(a)

指的是:

https://forums.databricks.com/questions/115/how-do-i-pass-parameters-to-my-sql-statements.html

如何在mysql中执行下面的spark查询

代码语言:javascript
运行
复制
val query = SELECT @row_number:=@row_number+1 as rowid,d.* FROM destination d, (SELECT @row_number:=0) as init

谢谢

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-10-16 06:31:09

我想创建行号(Row_num)作为MySql中现有表的一个列

row_number函数

使用

row_number():列窗口函数:返回窗口分区中从1开始的序列号。

您可以使用它如下:

代码语言:javascript
运行
复制
val input = spark.range(10)
scala> input.printSchema
root
 |-- id: long (nullable = false)

import org.apache.spark.sql.expressions.Window
val byId = Window.orderBy($"id".asc)
scala> input.withColumn("index", row_number over byId).show
17/10/16 08:27:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+-----+
| id|index|
+---+-----+
|  0|    1|
|  1|    2|
|  2|    3|
|  3|    4|
|  4|    5|
|  5|    6|
|  6|    7|
|  7|    8|
|  8|    9|
|  9|   10|
+---+-----+

但是要小心,因为它是一个窗口函数,需要一个有序的窗口,并将属于一个窗口分区的所有行移动到一个星火分区中,正如警告所指出的:

17/10/16 08:27:01警告WindowExec:没有为窗口操作定义的分区!将所有数据移动到单个分区,会导致严重的性能下降。

这意味着对于一个非常大的数据集,您可能会招致很长的GC,甚至由于OutOfMemoryError而无法完成。

monotonically_increasing_id()函数

还有另一个函数id

monotonically_increasing_id():列生成单调增加64位整数的列表达式.

请注意..。

生成的ID保证是单调增长和唯一的,但不是连续的。当前实现将分区ID放置在上31位中,将每个分区内的记录号放置在下33位中。假设数据帧的分区少于10亿个,每个分区的记录少于80亿条。

票数 1
EN

Stack Overflow用户

发布于 2017-10-16 06:28:16

如果要执行mySQL查询,则需要使用标准JDBC。

Spark与DataFrame ou DataSet (Spark )相关。

How do I pass parameters to my SQL statements?的主题不是mySql,而是my SQL

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46763927

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档