首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >PySpark读取hadoop-snappy压缩JSON

PySpark读取hadoop-snappy压缩JSON
EN

Stack Overflow用户
提问于 2022-08-09 11:11:32
回答 1查看 112关注 0票数 0

目标

用PySpark阅读hadoop压缩JSON。

环境

MacBook Pro与M1,Python3.9.5,PySpark 3.2.2

复制的步骤

安装

  • PySpark

代码语言:javascript
运行
复制
pip install pyspark==3.2.2

  • 编写简单的PySpark代码,以便从S3读取hadoop-snappy压缩的JSON

代码语言:javascript
运行
复制
# app.py

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data_frame = spark.read.json("file:///path/to/file.json.snappy")

data_frame.show()

  • Execute

代码语言:javascript
运行
复制
python app.py

输出

代码语言:javascript
运行
复制
22/08/09 12:52:01 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
java.lang.UnsatisfiedLinkError: 'int org.apache.hadoop.shaded.org.xerial.snappy.SnappyNative.rawUncompress(java.nio.ByteBuffer, int, int, java.nio.ByteBuffer, int)'
        at org.apache.hadoop.shaded.org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
        at org.apache.hadoop.shaded.org.xerial.snappy.Snappy.uncompress(Snappy.java:551)
        at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompressDirectBuf(SnappyDecompressor.java:267)
        at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompress(SnappyDecompressor.java:217)
        at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:88)
        at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
        at java.base/java.io.InputStream.read(InputStream.java:205)
        at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:191)
        at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
        at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:158)
        at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:198)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
        at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:191)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at scala.collection.Iterator.isEmpty(Iterator.scala:387)
        at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
        at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
        at scala.collection.TraversableOnce.reduceLeftOption(TraversableOnce.scala:249)
        at scala.collection.TraversableOnce.reduceLeftOption$(TraversableOnce.scala:248)
        at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1431)
        at scala.collection.TraversableOnce.reduceOption(TraversableOnce.scala:256)
        at scala.collection.TraversableOnce.reduceOption$(TraversableOnce.scala:256)
        at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1431)
        at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$1(JsonInferSchema.scala:80)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
22/08/09 12:52:01 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (192.168.1.43 executor driver): java.lang.UnsatisfiedLinkError: 'int org.apache.hadoop.shaded.org.xerial.snappy.SnappyNative.rawUncompress(java.nio.ByteBuffer, int, int, java.nio.ByteBuffer, int)'
        at org.apache.hadoop.shaded.org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
        at org.apache.hadoop.shaded.org.xerial.snappy.Snappy.uncompress(Snappy.java:551)
        at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompressDirectBuf(SnappyDecompressor.java:267)
        at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompress(SnappyDecompressor.java:217)
        at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:88)
        at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
        at java.base/java.io.InputStream.read(InputStream.java:205)
        at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:191)
        at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
        at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:158)
        at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:198)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
        at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:191)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at scala.collection.Iterator.isEmpty(Iterator.scala:387)
        at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
        at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
        at scala.collection.TraversableOnce.reduceLeftOption(TraversableOnce.scala:249)
        at scala.collection.TraversableOnce.reduceLeftOption$(TraversableOnce.scala:248)
        at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1431)
        at scala.collection.TraversableOnce.reduceOption(TraversableOnce.scala:256)
        at scala.collection.TraversableOnce.reduceOption$(TraversableOnce.scala:256)
        at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1431)
        at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$1(JsonInferSchema.scala:80)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

期望

我希望看到最终的数据框架。

我遗漏了什么?我是否必须安装Hadoop,或者没有Hadoop就可以使它工作?

EN

回答 1

Stack Overflow用户

发布于 2022-08-09 12:12:12

当您得到错误java.lang.UnsatisfiedLinkError时,我认为spark集群中找不到snappy jar (这很奇怪),在创建会话时尝试下载它:

代码语言:javascript
运行
复制
spark = (
    SparkSession.builder
    .config("spark.jars.packages", "org.xerial.snappy:snappy-java:1.1.8.4")
    .getOrCreate()
)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73290868

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档