我有一个场景,其中一定数量的操作(包括group )必须应用于多个小文件(每个文件约300 on )。手术看起来是这样..。
df.groupBy(....).agg(....)
现在,要在多个文件上处理它,我可以使用通配符“/**/*..csv”来创建单个RDD,并将其划分为操作。但是,从操作上看,它是一个组,涉及大量的洗牌,如果文件是相互排斥的,这是不必要的。
我正在看的是,我可以在文件上创建独立的RDD并独立地操作它们的方法。
发布于 2015-08-10 10:00:13
它与其说是一个完整的解决方案,不如说是一个想法,我还没有对它进行测试。
您可以从将数据处理管道提取到函数开始。
def pipeline(f: String, n: Int) = {
sqlContext
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(f)
.repartition(n)
.groupBy(...)
.agg(...)
.cache // Cache so we can force computation later
}如果您的文件很小,您可以调整n参数,以使用尽可能少的分区,以适应单个文件中的数据,并避免洗牌。这意味着您正在限制并发性,但我们稍后再讨论这个问题。
val n: Int = ??? 接下来,您必须获得输入文件的列表。这个步骤取决于数据源,但在大多数情况下,它或多或少是简单的:
val files: Array[String] = ???接下来,您可以使用pipeline函数映射上面的列表:
val rdds = files.map(f => pipeline(f, n))由于我们将并发限制在单个文件的级别上,所以我们希望通过提交多个作业来补偿。让我们添加一个简单的助手,强制计算并用Future包装它。
import scala.concurrent._
import ExecutionContext.Implicits.global
def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
df.rdd.foreach(_ => ()) // Force computation
df
}最后,我们可以在rdds上使用上面的助手
val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)根据您的需求,您可以添加onComplete回调或使用反应性流来收集结果。
发布于 2015-08-10 08:00:29
如果您有许多文件,而且每个文件都很小(在此之前,我将将300 as视为Spark的小文件),您可以尝试使用SparkContext.wholeTextFiles创建一个RDD,其中每个记录都是一个完整的文件。
发布于 2019-10-25 13:50:45
这样,我们就可以并行地编写多个RDD。
public class ParallelWriteSevice implements IApplicationEventListener {
private static final IprogramLogger logger = programLoggerFactory.getLogger(ParallelWriteSevice.class);
private static ExecutorService executorService=null;
private static List<Future<Boolean>> futures=new ArrayList<Future<Boolean>>();
public static void submit(Callable callable) {
if(executorService==null)
{
executorService=Executors.newFixedThreadPool(15);//Based on target tables increase this
}
futures.add(executorService.submit(callable));
}
public static boolean isWriteSucess() {
boolean writeFailureOccured = false;
try {
for (Future<Boolean> future : futures) {
try {
Boolean writeStatus = future.get();
if (writeStatus == false) {
writeFailureOccured = true;
}
} catch (Exception e) {
logger.error("Erorr - Scdeduled write failed " + e.getMessage(), e);
writeFailureOccured = true;
}
}
} finally {
resetFutures();
if (executorService != null)
executorService.shutdown();
executorService = null;
}
return !writeFailureOccured;
}
private static void resetFutures() {
logger.error("resetFutures called");
//futures.clear();
}
}https://stackoverflow.com/questions/31912858
复制相似问题