我有一个列RESULT
,在每个列中都有长度为11的数字,其模式是:
RESULT: string (nullable = true)
现在,我想执行下面的操作,并更新一个新的列,这将增加一个额外的数字在最后。下面所示的示例用于第一个数字03600024145
注意事项:我不想把桌子的格式改成熊猫,但是我想用Pyspark做所有的事情。
036000241457
因此,如果将此逻辑应用于整个列,结果将变成UPDATED RESULT
。
为了进一步澄清逻辑:digit#UPC
有类似的python代码,但在第5步:python:创建检查数字函数中有一点不同。
发布于 2021-11-14 17:35:50
我们可以把逻辑转换成火花函数。
RESULT
列与check digit
连接起来。工作实例
import pyspark.sql.functions as F
from pyspark.sql import Column
from typing import List
df = spark.createDataFrame([("03600024145",), ("01010101010",)], ("RESULT",))
def sum_digits(c: Column, pos: List[int]):
sum_col = F.lit(0)
for p in pos:
sum_col = sum_col + F.substring(c, p, 1).cast("int")
return sum_col
def check_digit(c: Column) -> Column:
odd_sum = sum_digits(c, [1, 3, 5, 7, 9, 11])
even_sum = sum_digits(c, [2, 4, 6, 8, 10])
sum_result = (3 * odd_sum) + even_sum
modulo = sum_result % 10
return (10 - modulo) % 10
df.withColumn("UPDATED_RESULT", F.concat(F.col("RESULT"), check_digit(F.col("RESULT")))).show()
输出
+-----------+--------------+
| RESULT|UPDATED_RESULT|
+-----------+--------------+
|03600024145| 036000241457|
|01010101010| 010101010105|
+-----------+--------------+
发布于 2021-11-14 17:54:15
使用用户定义函数(udf)的解决方案。
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
df = spark.createDataFrame([('03600024145',), ('01010101010',)], ['RESULT'])
@udf(StringType())
def add_check_digit(val):
odd = sum(int(i) for i in val[::2])
even = sum(int(i) for i in val[1::2])
check_val = (odd * 3 + even) % 10
return val + str((10 - check_val) % 10)
df = df.withColumn('UPDATED_RESULT', add_check_digit(col('RESULT')))
df.show()
+-----------+--------------+
| RESULT|UPDATED_RESULT|
+-----------+--------------+
|03600024145| 036000241457|
|01010101010| 010101010105|
+-----------+--------------+
发布于 2021-11-14 18:00:46
可以将列RESULT
拆分为一个数字数组,而不是使用一些高阶函数transform
和aggregate
,您可以计算连接到原始字符串的checkdigit
:
import pyspark.sql.functions as F
df1 = df.withColumn(
"digits",
F.expr("slice(split(RESULT, ''), 1, size(split(RESULT, '')) - 1)")
).withColumn(
"digits",
F.expr("transform(digits, (x, i) -> struct(int(x) as d, i+1 as i))")
).withColumn(
"odd_even",
F.expr(
"""aggregate(digits,
array(0, 0),
(acc, x) ->
IF (x.i%2 = 1,
array(acc[0] + x.d, acc[1]),
array(acc[0], acc[1] + x.d)
)
)""")
).withColumn(
"UPDATED RESULT",
F.concat(F.col("RESULT"), 10 - ((F.col("odd_even")[0] * 3 + F.col("odd_even")[1]) % 10))
).select(
"RESULT", "UPDATED RESULT"
)
df1.show(truncate=False)
#+-----------+--------------+
#|RESULT |UPDATED RESULT|
#+-----------+--------------+
#|03600024145|036000241457 |
#|01010101010|010101010105 |
#+-----------+--------------+
解释:
0 -> struct(0, 1)
)您可以显示所有中间列来理解逻辑。
https://stackoverflow.com/questions/69965226
复制相似问题