我正在尝试编写一个UDF来用地理位置信息来丰富IP列。我要用于丰富的数据以IP范围到国家/地区的形式存储在一个数据湖中。为了读取文件,我使用Java API,但我发现这非常慢。例如,阅读一百万行代码需要超过2分钟。读取整个文件需要几个小时,效率低得令人惊讶。下面是我用来读取文件的代码:
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.master(...).getOrCreate())
sc = spark._sc
hadoopConf = sc._jsc.hadoopConfiguration()
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
BufferedReader = sc._gateway.jvm.java.io.BufferedReader
InputStreamReader = sc._gateway.jvm.java.io.InputStreamReader
datalake_file_system = '...'
account_name = '...'
fs = FileSystem.get(URI("abfss://{}@{}.dfs.core.windows.net".format(datalake_file_system, account_name)), hadoopConf)
file_path = Path('...')
f = fs.open(file_path)
r = BufferedReader(InputStreamReader(f))
while True:
line = r.readLine()
fields = line.split(',')
if fields[0].startswith('start_ip'):
continue
# Load IP range to country mapping in a map
...有没有更快的方法来执行这样的操作?
我的UDF背后的想法是将IP范围/地理信息加载到映射中,然后在映射加载到内存中后在UDF中执行查找。在Spark中,可能还有其他更有效的方法来实现这一点。我想知道是否有更典型的方式来做这样的事情。原始信息在一个表中,我已经编写了代码来执行表之间的连接,但是由于合并涉及IP范围,直接连接的效率非常低。我已经使用了bucketing,它确实加快了相当多的速度,但我正在尝试使用UDF是否会更有效。
发布于 2021-05-11 20:49:13
我找到的解决此问题的最佳解决方案是创建一个Java UDF,它在map中执行延迟加载和查找。
https://stackoverflow.com/questions/66012343
复制相似问题