目标
用PySpark阅读hadoop压缩JSON。
环境
MacBook Pro与M1,Python3.9.5,PySpark 3.2.2
复制的步骤
安装
pip install pyspark==3.2.2
。
# app.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data_frame = spark.read.json("file:///path/to/file.json.snappy")
data_frame.show()
python app.py
输出
22/08/09 12:52:01 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
java.lang.UnsatisfiedLinkError: 'int org.apache.hadoop.shaded.org.xerial.snappy.SnappyNative.rawUncompress(java.nio.ByteBuffer, int, int, java.nio.ByteBuffer, int)'
at org.apache.hadoop.shaded.org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.apache.hadoop.shaded.org.xerial.snappy.Snappy.uncompress(Snappy.java:551)
at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompressDirectBuf(SnappyDecompressor.java:267)
at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompress(SnappyDecompressor.java:217)
at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:88)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at java.base/java.io.InputStream.read(InputStream.java:205)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:191)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:158)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:198)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:191)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator.isEmpty(Iterator.scala:387)
at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
at scala.collection.TraversableOnce.reduceLeftOption(TraversableOnce.scala:249)
at scala.collection.TraversableOnce.reduceLeftOption$(TraversableOnce.scala:248)
at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1431)
at scala.collection.TraversableOnce.reduceOption(TraversableOnce.scala:256)
at scala.collection.TraversableOnce.reduceOption$(TraversableOnce.scala:256)
at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$1(JsonInferSchema.scala:80)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
22/08/09 12:52:01 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (192.168.1.43 executor driver): java.lang.UnsatisfiedLinkError: 'int org.apache.hadoop.shaded.org.xerial.snappy.SnappyNative.rawUncompress(java.nio.ByteBuffer, int, int, java.nio.ByteBuffer, int)'
at org.apache.hadoop.shaded.org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.apache.hadoop.shaded.org.xerial.snappy.Snappy.uncompress(Snappy.java:551)
at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompressDirectBuf(SnappyDecompressor.java:267)
at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompress(SnappyDecompressor.java:217)
at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:88)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at java.base/java.io.InputStream.read(InputStream.java:205)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:191)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:158)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:198)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:191)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator.isEmpty(Iterator.scala:387)
at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
at scala.collection.TraversableOnce.reduceLeftOption(TraversableOnce.scala:249)
at scala.collection.TraversableOnce.reduceLeftOption$(TraversableOnce.scala:248)
at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1431)
at scala.collection.TraversableOnce.reduceOption(TraversableOnce.scala:256)
at scala.collection.TraversableOnce.reduceOption$(TraversableOnce.scala:256)
at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$1(JsonInferSchema.scala:80)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
期望
我希望看到最终的数据框架。
我遗漏了什么?我是否必须安装Hadoop,或者没有Hadoop就可以使它工作?
发布于 2022-08-09 12:12:12
当您得到错误java.lang.UnsatisfiedLinkError
时,我认为spark集群中找不到snappy jar (这很奇怪),在创建会话时尝试下载它:
spark = (
SparkSession.builder
.config("spark.jars.packages", "org.xerial.snappy:snappy-java:1.1.8.4")
.getOrCreate()
)
https://stackoverflow.com/questions/73290868
复制相似问题