首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何容错Flink将数据以gzip压缩的形式下沉到hdfs?

如何容错Flink将数据以gzip压缩的形式下沉到hdfs?
EN

Stack Overflow用户
提问于 2019-06-06 17:31:27
回答 1查看 538关注 0票数 3

我们希望通过Flink的BucketingSink或StreamingFileSink将压缩数据写入HDFS。我已经编写了自己的Writer,如果没有失败发生,它工作得很好。但是,当它遇到故障并从检查点重新启动时,它将生成有效长度的文件(hadoop< 2.7)或截断该文件。不幸的是,gzip是二进制文件,文件末尾有尾部。因此,在我的例子中,简单的截断不起作用。有什么想法可以为压缩hdfs接收器启用一次语义吗?

这是我的作者的代码:

代码语言:javascript
运行
复制
public class HdfsCompressStringWriter extends StreamWriterBaseV2<JSONObject> {

private static final long serialVersionUID = 2L;

/**
 * The {@code CompressFSDataOutputStream} for the current part file.
 */
private transient GZIPOutputStream compressionOutputStream;

public HdfsCompressStringWriter() {}

@Override
public void open(FileSystem fs, Path path) throws IOException {
    super.open(fs, path);
    this.setSyncOnFlush(true);
    compressionOutputStream = new GZIPOutputStream(this.getStream(), true);
}

public void close() throws IOException {
    if (compressionOutputStream != null) {
        compressionOutputStream.close();
        compressionOutputStream = null;
    }
    resetStream();
}

@Override
public void write(JSONObject element) throws IOException {
    if (element == null || !element.containsKey("body")) {
        return;
    }
    String content = element.getString("body") + "\n";
    compressionOutputStream.write(content.getBytes());
    compressionOutputStream.flush();
}

@Override
public Writer<JSONObject> duplicate() {
    return new HdfsCompressStringWriter();
}

}

EN

回答 1

Stack Overflow用户

发布于 2019-06-07 20:08:51

我建议为StreamingFileSink实现一个BulkWriter,它通过GZIPOutputStream压缩元素。代码可能如下所示:

代码语言:javascript
运行
复制
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(1000);

    final DataStream<Integer> input = env.addSource(new InfinitySource());

    final StreamingFileSink<Integer> streamingFileSink = StreamingFileSink.<Integer>forBulkFormat(new Path("output"), new GzipBulkWriterFactory<>()).build();
    input.addSink(streamingFileSink);

    env.execute();
}

private static class GzipBulkWriterFactory<T> implements BulkWriter.Factory<T> {
    @Override
    public BulkWriter<T> create(FSDataOutputStream fsDataOutputStream) throws IOException {
        final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(fsDataOutputStream, true);
        return new GzipBulkWriter<>(new ObjectOutputStream(gzipOutputStream), gzipOutputStream);
    }
}

private static class GzipBulkWriter<T> implements BulkWriter<T> {

    private final GZIPOutputStream gzipOutputStream;
    private final ObjectOutputStream objectOutputStream;

    public GzipBulkWriter(ObjectOutputStream objectOutputStream, GZIPOutputStream gzipOutputStream) {
        this.gzipOutputStream = gzipOutputStream;
        this.objectOutputStream = objectOutputStream;
    }

    @Override
    public void addElement(T t) throws IOException {
        objectOutputStream.writeObject(t);
    }

    @Override
    public void flush() throws IOException {
        objectOutputStream.flush();
    }

    @Override
    public void finish() throws IOException {
        objectOutputStream.flush();
        gzipOutputStream.finish();
    }
}
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56474706

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档