我想编写我自己的RecordReader,它将上下文作为一个完整的段落返回,而不是像在TextInputFormat中那样返回一行。
我尝试了以下函数,但肯定是它的way-off
public boolean nextKeyValue() throws IOException, InterruptedException {
    if (key == null) {
        key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
        value = new Text();
    }
    value.clear();
    final Text endline = new Text("\n");
    int newSize = 0;
        Text v = new Text();
        while (v!= endline) {
            value.append(v.getBytes(),0, v.getLength());
            value.append(endline.getBytes(),0, endline.getLength());
            if (newSize == 0) {
                break;
            }
            pos += newSize;
            if (newSize < maxLineLength) {
                break;
            }
        }
    if (newSize == 0) {
        key = null;
        value = null;
        return false;
    } else {
        return true;
    }
}发布于 2016-01-08 01:06:58
实际上,您不需要花精力编写自己的RecordReader。相反,只需扩展TextInputFormat并更改分隔符。下面是TextInputFormat的库代码,只是更改了分隔符:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import com.google.common.base.Charsets;
public class ParagraphInputFormat
extends TextInputFormat {
    private static final String PARAGRAPH_DELIMITER = "\r\n\r\n";
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
    @Override
    public RecordReader<LongWritable, Text>
    createRecordReader(InputSplit split, TaskAttemptContext context) {
        String delimiter = PARAGRAPH_DELIMITER;
        byte[] recordDelimiterBytes = null;
        if (null != delimiter) {
            recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
        }
        return new LineRecordReader(recordDelimiterBytes);
    }
}https://stackoverflow.com/questions/15593601
复制相似问题