我的以下代码是使用spark从hive表中读取数据。该表中有1亿条记录。当我在Rdd中选择这么多记录并尝试执行result.show()时,它给出了严重的问题异常。
我基本上是想通过从这个表中选择几列来插入其他表中的记录,以获得1亿条记录集。
下面是我的代码:
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val hiveContext = new org.apache.spark.sql
我正在尝试执行一个带有卡住的Spark的动作。相应的执行器抛出以下异常: 2019-03-06 11:18:16 ERROR Inbox:91 - Ignoring error
java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readUTF(DataInputStream.java:609)
at java.io.DataInputStream.readUTF(DataInputStream.java:564)
at
我使用Spark1.6.0和Scala。
我想将DataFrame保存为压缩的CSV格式。
到目前为止(假设我已经将df和sc作为SparkContext)如下:
//set the conf to the codec I want
sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
sc.getConf.set(
我正在尝试运行中给出的Spark / Python的Logistic回归示例,并且已经成功地使用了Spark1.6和Python2.7。
现在我必须将它移到Spark2.1和Python3.5( 3.6是不兼容的),我正在使用Ubuntu16.04中的木星笔记本
这段代码工作正常
# Evaluate the model on training data
labelsAndPreds = modelInput.map(lambda p: (p.label, LRmodel.predict(p.features)))
print(labelsAndPreds.count())
print(lab
我试图从Kafka (用Java语言)中读到一个主题,但这个例外总是在启动:
kafka.common.UnknownCodecException: 3 is an unknown compression codec
at kafka.message.CompressionCodec$.getCompressionCodec(CompressionCodec.scala:26)
at kafka.message.Message.compressionCodec(Message.scala:213)
at kafka.message.ByteBufferMessageS
作为问题的后续,当我尝试在我的单个节点机器上使用Spark2.1.1 over (Hadoop2.8.0)时,我得到了一个新的错误。如果我用
spark-shell
一开始就没有问题。在使用通常的start-dfs.sh和start-yarn.sh启动Hadoop之后,如果我使用
spark-shell --master yarn
我得到以下错误:
17/06/10 12:00:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classe
当我对拼图文件进行计数时,我得到了下面的错误, java.lang.NoSuchMethodError: org.apache.parquet.schema.Types$MessageTypeBuilder.addFields([Lorg/apache/parquet/schema/Type;)Lorg/apache/parquet/schema/Types$GroupBuilder;
at org.apache.spark.sql.execution.datasources.parquet.CatalystReadSupport$.clipParquetSchema(Catalys
我正在使用Kafka运行一个结构化的流应用程序。我发现如果由于某种原因系统停机了几天...检查点变得陈旧,并且在Kafka中找不到与检查点对应的偏移量。我如何让Spark结构化流媒体应用选择最后一个可用的偏移量,并从那里开始。我尝试将偏移量重置设置为较早/最新,但系统崩溃,出现以下错误:
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {MyTopic-574=6559828}
at
HI编码器,我又回来了。我正在尝试使用scala代码中的hive上下文从一个dataframe创建一个HIve表,我可以在sqlContext中这样做,但是当涉及到HiveContext时,它会抛出这个错误
[error] /home/mapr/avroProject/src/main/scala/AvroConsumer.scala:75: object HiveContext in package hive cannot be accessed in package org.apa che.spa
如何在应用转换之前将每个火花输入流中的数据集组合为一个。我用的是火花-2.0.0
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val lines = ssc.textFileStream("input")
lines.foreachRDD { rdd =>
val count = rdd.count()
if (count > 0) {
val dataSet = sqlContext.read.json(rdd)
我试图用IntellijIDEA中的线性回归在Spark中建立一个模型。
在拟合模型之前,我应该创建一个具有VectorAssembler列的feature。
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
//creating features column
val assembler = new VectorAssembler()
.setInputCols(Array("col4","col5","col6
使用addPyFiles()似乎不会添加所需的文件来触发作业节点( spark是新手,所以这里可能缺少一些基本的用法知识)。
尝试使用pyspark运行脚本时,发现未找到某些模块可供导入的错误。以前从未使用过spark,但其他帖子(来自有问题的包和)建议通过sparkContext.addPyFiles(mymodulefiles.zip)压缩模块并添加到spark作业中,但仍收到错误。相关的代码片段是...
from distkeras.trainers import *
from distkeras.predictors import *
from distkeras.transforme