我希望将行号(row_num
)创建为MySql中现有表的列,通过spark并行读取数据库(即分区列,因为表中的所有列都是字符串)。
当我试图执行这个查询时:
val query = SELECT @row_number:=@row_number+1 as rowid,d.* FROM destination d, (SELECT @row_number:=0) as init
我有一个例外情况如下:
17/10/16 10:50:00 INFO SparkSqlParser: Parsing command: SELECT @row_number:=@row_number+1 as rowid,d. FROM destination d, (SELECT @row_number:=0) as init
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
no viable alternative at input 'SELECT @'(line 1, pos 7)
== SQL ==
SELECT @row_number:=@row_number+1 as rowid,d. FROM destination d, (SELECT @row_number:=0) as init
-------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
at com.syntel.spark.sparkDVT$.main(sparkDVT.scala:61)
at com.syntel.spark.sparkDVT.main(sparkDVT.scala)
我试过的代码:
val p2 = "@row_number"
val a = s"""SELECT $p2:=$p2+1 as rowid,d.* FROM destination d, (SELECT $p2:=0) as init"""
val df1 = spark.sql(a)
指的是:
https://forums.databricks.com/questions/115/how-do-i-pass-parameters-to-my-sql-statements.html
如何在mysql中执行下面的spark查询
val query = SELECT @row_number:=@row_number+1 as rowid,d.* FROM destination d, (SELECT @row_number:=0) as init
谢谢
发布于 2017-10-16 06:31:09
我想创建行号(Row_num)作为MySql中现有表的一个列
row_number函数
使用数
row_number():列窗口函数:返回窗口分区中从1开始的序列号。
您可以使用它如下:
val input = spark.range(10)
scala> input.printSchema
root
|-- id: long (nullable = false)
import org.apache.spark.sql.expressions.Window
val byId = Window.orderBy($"id".asc)
scala> input.withColumn("index", row_number over byId).show
17/10/16 08:27:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+-----+
| id|index|
+---+-----+
| 0| 1|
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
| 5| 6|
| 6| 7|
| 7| 8|
| 8| 9|
| 9| 10|
+---+-----+
但是要小心,因为它是一个窗口函数,需要一个有序的窗口,并将属于一个窗口分区的所有行移动到一个星火分区中,正如警告所指出的:
17/10/16 08:27:01警告WindowExec:没有为窗口操作定义的分区!将所有数据移动到单个分区,会导致严重的性能下降。
这意味着对于一个非常大的数据集,您可能会招致很长的GC,甚至由于OutOfMemoryError而无法完成。
monotonically_increasing_id()函数
还有另一个函数id
monotonically_increasing_id():列生成单调增加64位整数的列表达式.
请注意..。
生成的ID保证是单调增长和唯一的,但不是连续的。当前实现将分区ID放置在上31位中,将每个分区内的记录号放置在下33位中。假设数据帧的分区少于10亿个,每个分区的记录少于80亿条。
发布于 2017-10-16 06:28:16
如果要执行mySQL
查询,则需要使用标准JDBC。
Spark与DataFrame ou DataSet (Spark )相关。
How do I pass parameters to my SQL statements?
的主题不是mySql
,而是my SQL
https://stackoverflow.com/questions/46763927
复制相似问题