首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >作为独立的RDD并行处理多个文件

作为独立的RDD并行处理多个文件
EN

Stack Overflow用户
提问于 2015-08-10 06:27:40
回答 3查看 7.3K关注 0票数 6

我有一个场景,其中一定数量的操作(包括group )必须应用于多个小文件(每个文件约300 on )。手术看起来是这样..。

df.groupBy(....).agg(....)

现在,要在多个文件上处理它,我可以使用通配符“/**/*..csv”来创建单个RDD,并将其划分为操作。但是,从操作上看,它是一个组,涉及大量的洗牌,如果文件是相互排斥的,这是不必要的。

我正在看的是,我可以在文件上创建独立的RDD并独立地操作它们的方法。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2015-08-10 10:00:13

它与其说是一个完整的解决方案,不如说是一个想法,我还没有对它进行测试。

您可以从将数据处理管道提取到函数开始。

代码语言:javascript
复制
def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}

如果您的文件很小,您可以调整n参数,以使用尽可能少的分区,以适应单个文件中的数据,并避免洗牌。这意味着您正在限制并发性,但我们稍后再讨论这个问题。

代码语言:javascript
复制
val n: Int = ??? 

接下来,您必须获得输入文件的列表。这个步骤取决于数据源,但在大多数情况下,它或多或少是简单的:

代码语言:javascript
复制
val files: Array[String] = ???

接下来,您可以使用pipeline函数映射上面的列表:

代码语言:javascript
复制
val rdds = files.map(f => pipeline(f, n))

由于我们将并发限制在单个文件的级别上,所以我们希望通过提交多个作业来补偿。让我们添加一个简单的助手,强制计算并用Future包装它。

代码语言:javascript
复制
import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
    df.rdd.foreach(_ => ()) // Force computation
    df
}

最后,我们可以在rdds上使用上面的助手

代码语言:javascript
复制
val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

根据您的需求,您可以添加onComplete回调或使用反应性流来收集结果。

票数 8
EN

Stack Overflow用户

发布于 2015-08-10 08:00:29

如果您有许多文件,而且每个文件都很小(在此之前,我将将300 as视为Spark的小文件),您可以尝试使用SparkContext.wholeTextFiles创建一个RDD,其中每个记录都是一个完整的文件。

票数 1
EN

Stack Overflow用户

发布于 2019-10-25 13:50:45

这样,我们就可以并行地编写多个RDD。

代码语言:javascript
复制
public class ParallelWriteSevice implements IApplicationEventListener {

    private static final IprogramLogger logger = programLoggerFactory.getLogger(ParallelWriteSevice.class);

    private static ExecutorService executorService=null;
    private static List<Future<Boolean>> futures=new ArrayList<Future<Boolean>>();

    public static void submit(Callable callable) {
        if(executorService==null)
        {
            executorService=Executors.newFixedThreadPool(15);//Based on target tables increase this
        }

        futures.add(executorService.submit(callable));
    }

    public static boolean isWriteSucess() {
        boolean writeFailureOccured = false;
        try {
            for (Future<Boolean> future : futures) {
                try {
                    Boolean writeStatus = future.get();
                    if (writeStatus == false) {
                        writeFailureOccured = true;
                    }
                } catch (Exception e) {
                    logger.error("Erorr - Scdeduled write failed " + e.getMessage(), e);
                    writeFailureOccured = true;
                }
            }
        } finally {
            resetFutures();         
              if (executorService != null) 
                  executorService.shutdown();
              executorService = null;

        }
        return !writeFailureOccured;
    }

    private static void resetFutures() {
            logger.error("resetFutures called");
            //futures.clear();
    }




}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31912858

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档