在jupyter中,我正在运行Pyspark脚本来为csv编写一个数据文件,如下所示:
df.coalesce(1).write.csv('Data1.csv',header = 'true')运行一个小时后,我将得到以下错误。
错误:来自http://.....session的无效状态代码未激活。
我的配置就像:
spark.conf.set("spark.dynamicAllocation.enabled","true")
spark.conf.set("shuffle.service.enabled","true")
spark.conf.set("spark.dynamicAllocation.minExecutors",6)
spark.conf.set("spark.executor.heartbeatInterval","3600s")
spark.conf.set("spark.cores.max", "4")
spark.conf.set("spark.sql.tungsten.enabled", "true")
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.app.id", "Logs")
spark.conf.set("spark.io.compression.codec", "snappy")
spark.conf.set("spark.rdd.compress", "true")
spark.conf.set("spark.executor.instances", "6")
spark.conf.set("spark.executor.memory", '20g')
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.driver.allowMultipleContexts", "true")
spark.conf.set("spark.master", "yarn")
spark.conf.set("spark.driver.memory", "20G")
spark.conf.set("spark.executor.instances", "32")
spark.conf.set("spark.executor.memory", "32G")
spark.conf.set("spark.driver.maxResultSize", "40G")
spark.conf.set("spark.executor.cores", "5")我检查了容器节点,错误是:
ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed:container_e836_1556653519610_3661867_01_000005 on host: ylpd1205.kmdc.att.com. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143无法解决这个问题。
发布于 2019-09-23 16:01:09
从输出来看,如果应用程序没有以失败状态结束,这听起来就像是Livy超时错误:您的应用程序可能比Livy会话的定义超时时间更长(默认为1h),因此即使Spark应用程序成功,如果应用程序花费的时间超过Livy会话的超时时间,您的笔记本也会收到此错误。
如果是这样的话,下面是如何解决这个问题的:
/etc/livy/conf/livy.conf文件(在集群的主节点中)livy.server.session.timeout设置为更高的值,比如8h (或更大,取决于应用程序)sudo restart livy-server发布于 2019-06-21 20:10:31
我不是很精通火花公子,但是在scala中,解决方案需要这样的东西
首先,我们需要创建一个创建头文件的方法:
def createHeaderFile(headerFilePath: String, colNames: Array[String]) {
//format header file path
val fileName = "dfheader.csv"
val headerFileFullName = "%s/%s".format(headerFilePath, fileName)
//write file to hdfs one line after another
val hadoopConfig = new Configuration()
val fileSystem = FileSystem.get(hadoopConfig)
val output = fileSystem.create(new Path(headerFileFullName))
val writer = new PrintWriter(output)
for (h <- colNames) {
writer.write(h + ",")
}
writer.write("\n")
writer.close()
}您还需要一个方法来调用hadoop来合并您的部件文件,这些文件将由df.write方法编写:
def mergeOutputFiles(sourcePaths: String, destLocation: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
// in case of array[String] use for loop to iterate over the muliple source paths if not use the code below
// for (sourcePath <- sourcePaths) {
//Get the path under destination where the partitioned files are temporarily stored
val pathText = sourcePaths.split("/")
val destPath = "%s/%s".format(destLocation, pathText.last)
//Merge files into 1
FileUtil.copyMerge(hdfs, new Path(sourcePath), hdfs, new Path(destPath), true, hadoopConfig, null)
// }
//delete the temp partitioned files post merge complete
val tempfilesPath = "%s%s".format(destLocation, tempOutputFolder)
hdfs.delete(new Path(tempfilesPath), true)
}下面是一种生成输出文件的方法或df.write方法,在该方法中,您要传递要写入hadoop HDFS的庞大DF:
def generateOutputFiles( processedDf: DataFrame, opPath: String, tempOutputFolder: String,
spark: SparkSession): String = {
import spark.implicits._
val fileName = "%s%sNameofyourCsvFile.csv".format(opPath, tempOutputFolder)
//write as csv to output directory and add file path to array to be sent for merging and create header file
processedDf.write.mode("overwrite").csv(fileName)
createHeaderFile(fileName, processedDf.columns)
//create an array of the partitioned file paths
outputFilePathList = fileName
// you can use array of string or string only depending on if the output needs to get divided in multiple file based on some parameter in that case chagne the signature ot Array[String] as output
// add below code
// outputFilePathList(counter) = fileName
// just use a loop in the above and increment it
//counter += 1
return outputFilePathList
}这里定义的所有方法都是如何实现这些方法的:
def processyourlogic( your parameters if any):Dataframe=
{
// your logic to do whatever needs to be done to your data
}假设上面的方法返回一个dataframe,下面是如何将所有内容组合在一起的方法:
val yourbigD f = processyourlogic(your parameters) // returns DF
yourbigDf.cache // caching just in case you need it
val outputPathFinal = " location where you want your file to be saved"
val tempOutputFolderLocation = "temp/"
val partFiles = generateOutputFiles(yourbigDf, outputPathFinal, tempOutputFolderLocation, spark)
mergeOutputFiles(partFiles, outputPathFinal)如果你还有其他问题要问,请告诉我。如果你所寻求的答案是不同的,那么你应该问原来的问题。
https://stackoverflow.com/questions/56691950
复制相似问题