当我每天运行Hadoop作业时,我希望覆盖/重用现有的输出目录。实际上,输出目录将存储每天作业运行结果的汇总输出。如果我指定相同的输出目录,它会给出错误“输出目录已经存在”。
如何绕过此验证?
发布于 2011-10-10 21:53:58
如果在运行作业之前删除目录呢?
您可以通过shell完成此操作:
hadoop fs -rmr /path/to/your/output/
或者通过Java API:
// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(new Path("/path/to/your/output"), true);
发布于 2011-10-10 22:10:51
Jungblut的答案是你的直接解决方案。因为我从来不相信自动化的过程会删除东西(我个人),所以我会建议一种替代方案:
我建议您将作业的输出名称设置为动态的,包括作业运行的时间,而不是尝试覆盖。
就像"/path/to/your/output-2011-10-09-23-04/
“一样。这样,您可以保留您的旧作业输出,以防您需要重新访问。在我的运行10+日常作业的系统中,我们将输出结构化为:/output/job1/2011/10/09/job1out/part-r-xxxxx
、/output/job1/2011/10/10/job1out/part-r-xxxxx
等。
发布于 2014-08-05 15:55:29
Hadoop的TextInputFormat
(我猜您正在使用它)不允许覆盖现有目录。可能是为了找出你错误地删除了一些你(和你的集群)非常努力工作的东西的痛苦。
但是,如果您确定希望作业覆盖您的输出文件夹,我认为最干净的方法是稍微更改一下TextOutputFormat
:
public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
{
public RecordWriter<K, V>
getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException
{
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed)
{
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, true);
if (!isCompressed)
{
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
}
else
{
return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
}
}
}
现在,您正在使用overwrite=true创建FSDataOutputStream
(fs.create(file, true)
)。
https://stackoverflow.com/questions/7713316
复制相似问题