我在EMR中使用YARN作为资源管理器并在2个节点上运行spark作业。如果我的条件不满足,我需要故意使步骤失败,这样下一步就不会按照配置执行。为了实现这一点,在dynamoDB中插入日志消息后,我抛出了一个自定义异常。
它运行得很好,但是Dynamo中的记录被插入了两次。
下面是我的代码。
if(<condition>) {
<method call to insert in dynamo>
throw new SparkException(<msg>);
return;
}如果我删除了抛出异常的行,它可以正常工作,但步骤已经完成。
如何才能在不收到两次日志消息的情况下使步骤失败。
谢谢你的帮助。
致敬,Sorabh
发布于 2017-10-10 03:51:44
您的发电机消息被插入两次的原因可能是因为您的错误条件被两个不同的执行器命中和处理。Spark正在将要完成的工作分配给它的工人,而这些工人不分享任何知识。
我不确定是什么驱使你的需求让Spark步骤失败,但我建议在你的应用程序代码中跟踪失败的情况,而不是直接让spark死掉。换句话说,编写检测错误并将其传递回spark驱动程序的代码,然后适当地对其执行操作。
要做到这一点,一种方法是使用累加器来计算在处理数据时发生的任何错误。它看起来大概是这样的(我假设是scala和DataFrames,但你可以根据需要适应RDD和/或python ):
val accum = sc.longAccumulator("Error Counter")
def doProcessing(a: String, b: String): String = {
if(condition) {
accum.add(1)
null
}
else {
doComputation(a, b)
}
}
val doProcessingUdf = udf(doProcessing _)
df = df.withColumn("result", doProcessing($"a", $"b"))
df.write.format(..).save(..) // Accumulator value not computed until an action occurs!
if(accum.value > 0) {
// An error detected during computation! Do whatever needs to be done.
<insert dynamo message here>
}这种方法的一个好处是,如果你在Spark UI中寻找反馈,你将能够在Spark UI运行时看到累加器的值。作为参考,下面是关于累加器的文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
https://stackoverflow.com/questions/46419548
复制相似问题