orc文件格式对常用系统的支持

1、Hive支持

创建表时指定orc格式即可:

create table tmp.orc_test(id bigint, name string, age int) stored as orc TBLPROPERTIES('orc.compress'='SNAPPY')

压缩格式有"SNAPPY"和 "ZLIB"两种,需要哪种格式指定即可。

2、SPARK支持

Spark读:
df  = spark.read.orc("/tmp/test/orc_data")  # 读出来的数据是一个dataframe

Spark写:
df.write.format("orc").save("/tmp/test/orc_data2")

3、Hadoop Streaming支持

3.1、读orc文件,输出text

hadoop jar /usr/local/hadoop-2.7.0//share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \
-libjars /usr/local/hive-1.2.0/lib/hive-exec-1.2.0-SNAPSHOT.jar \
-mapper /bin/cat -reducer /bin/cat \
-input /tmp/test/orc_test1 \
-output /tmp/test/orc_streaming_test3 \
-inputformat org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 

返回的数据:

null    {"name":"123","age":"456"}
null    {"name":"456","age":"789"}

3.2、读orc文件,写orc文件:

hadoop jar /usr/local/hadoop-2.7.0//share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \
-libjars orc_maprd_test.jar \
-D orc.mapred.output.schema="struct<id:string,name:string,sex:string,age:string>" \
-input /tmp/test/orc_streaming_test \
-output /tmp/test/orc_streaming_test2 \
-inputformat org.apache.orc.mapred.OrcInputFormat \
-outputformat org.apache.orc.mapred.OrcOutputFormat \
-mapper is.orc.MyMapper -reducer is.orc.MyReducer 

pom.xml

<dependencies>
  <dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-mapreduce</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.7.0</version>
  </dependency>
</dependencies>

mapper:

package is.orc;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.mapred.OrcStruct;

import java.io.IOException;
import java.util.Random;

class MyMapper implements Mapper<NullWritable,OrcStruct,LongWritable,Text> {

    Random random = new Random();

    public void close() { }

    public void map(NullWritable nullWritable, OrcStruct orcStruct, OutputCollector<LongWritable, Text> outputCollector, Reporter reporter) throws IOException {
        StringBuffer str = new StringBuffer();
        str.append(orcStruct.getFieldValue(0).toString() + "\t");
        str.append(orcStruct.getFieldValue(1).toString() + "\t");
        str.append(orcStruct.getFieldValue(2).toString() + "\t");
        str.append(orcStruct.getFieldValue(3).toString() );
        
        //不知道为什么Mapper的OutputKey只能用LongWritable,用随机数生成一个key,防止读orc文件后单reduce的情况
        LongWritable key = new LongWritable(random.nextInt(5)); 
        outputCollector.collect(key, new Text(str.toString()));
    }

    public void configure(JobConf jobConf) {
        jobConf.setMapOutputKeyClass(Writable.class);
        jobConf.setMapOutputValueClass(Text.class);
    }
}

Reducer:

package is.orc;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import java.io.IOException;
import java.util.Iterator;


class MyReducer implements Reducer<LongWritable, Text, NullWritable, OrcStruct> {
    //要创建的ORC文件中的字段类型
    private TypeDescription schema = TypeDescription.fromString(
            "struct<id:string," +
                    "name:string," +
                    "sex:string," +
                    "age:string>"
    );

    private OrcStruct pair = (OrcStruct)OrcStruct.createValue(schema);


    public void reduce(LongWritable text, Iterator<Text> iterator, OutputCollector<NullWritable, OrcStruct> outputCollector, Reporter reporter) throws IOException {

        while (iterator.hasNext()) {
            String[] lineSplit = iterator.next().toString().split("\t");
            pair.setFieldValue("name",new Text(lineSplit[0]));
            pair.setFieldValue("sex",new Text(lineSplit[1]));
            pair.setFieldValue("age",new Text(lineSplit[2]));
            pair.setFieldValue("id",new Text(lineSplit[3]));
            break;
        }

        outputCollector.collect(NullWritable.get(),pair);
    }

    public void close() throws IOException {

    }

    public void configure(JobConf jobConf) {

    }
}

4、MapReduce支持

读orc的mapper:

package is.orc;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.orc.mapred.OrcStruct;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class OrcFileReadMapper extends Mapper<NullWritable, OrcStruct, Text, NullWritable> {

    private Text outputKey = new Text();

    @Override
    protected void map(NullWritable key, OrcStruct value, Context context) throws IOException, InterruptedException {
        StringBuffer sb= new StringBuffer();
        if (value.getFieldValue(0) == null){
            sb.append("-1\t");
        }else{
            sb.append(value.getFieldValue(0).toString() + "\t");      //通过下标索引获取数据
        }


        sb.append(value.getFieldValue(1).toString()+ "\t");
        sb.append(value.getFieldValue(2).toString()+ "\t");
        sb.append(value.getFieldValue(3).toString());      //也可以通过字段名获取数据

        outputKey = new Text(sb.toString());

        context.write(outputKey, NullWritable.get());
    }
}

写orc的reduer:

package is.orc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;

import java.io.IOException;

public class OrcFileWriteReducer extends Reducer<Text,NullWritable,NullWritable,OrcStruct> {

    //要创建的ORC文件中的字段类型
    private TypeDescription schema = TypeDescription.fromString(
            "struct<id:string," +
                    "name:string," +
                    "sex:string," +
                    "age:string>"
    );

    private OrcStruct pair = (OrcStruct)OrcStruct.createValue(schema);

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        String line = key.toString();
        String[] lineSplit = line.trim().split("\t");

        pair.setFieldValue("id",new Text(lineSplit[0]));
        pair.setFieldValue("name",new Text(lineSplit[1]));
        pair.setFieldValue("sex",new Text(lineSplit[2]));
        pair.setFieldValue("age",new Text(lineSplit[3]));

        context.write(NullWritable.get(),pair);
    }
}

job配置:

package is.orc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcInputFormat;
import org.apache.orc.mapreduce.OrcOutputFormat;

import java.io.IOException;

/**
 * @author lyf
 * @since 2018/06/16
 */
public class OrcFileWriteJob extends Configured implements Tool {

    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = getConf();

        conf.set("orc.mapred.output.schema","struct<id:string,name:string,sex:string,age:string>");
        String input = "/dws/dd_read_d_v2/dt=20180809/000000_0";
        String output = "/tmp/test/test_mr_orc";

        Job job = Job.getInstance(conf);

        job.setJarByClass(OrcFileWriteJob.class);
        job.setMapperClass(OrcFileReadMapper.class);
        job.setReducerClass(OrcFileWriteReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(OrcStruct.class);

        job.setInputFormatClass(OrcInputFormat.class);
        job.setOutputFormatClass(OrcOutputFormat.class);

        FileInputFormat.addInputPath(job,new Path(input));
        FileOutputFormat.setOutputPath(job,new Path(output));

        boolean rt = job.waitForCompletion(true);
        return rt?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        int retnum = ToolRunner.run(conf,new OrcFileWriteJob(),args);
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏PPV课数据科学社区

【学习】七天搞定SAS(二):基本操作(判断、运算、基本函数)

? 今天开始注重变量操作。 SAS生成新变量 SAS支持基本的加减乘除,值得一提的是它的**代表指数,而不是^。* Modify homegarden dat...

4784
来自专栏码匠的流水账

FluxSink实例及解析

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxSink.java

1712
来自专栏码匠的流水账

聊聊flink的RichParallelSourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/func...

1702
来自专栏浪淘沙

MapReduce操作实例

要点:有一个combiner方法,可以在执行完map时调用,从而对数据进行先一步的处理,降低Reduce的IO压力。

3203
来自专栏葡萄城控件技术团队

枚举的多语言显示

关于枚举类型的多语言显示,其实就是Globalization的问题。解决方案当然不止一种,这里介绍一种可用性和扩展性的比较好的通用方法。 显然这里自己去实现自定...

2945
来自专栏码匠的流水账

聊聊flink的RichParallelSourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/func...

2981
来自专栏数据结构与算法

BZOJ2434: [Noi2011]阿狸的打字机(AC自动机 树状数组)

1183
来自专栏积累沉淀

研究MapReduce源码之实现自定义LineRecordReader完成多行读取文件内容

TextInputFormat是Hadoop默认的数据输入格式,但是它只能一行一行的读记录,如果要读取多行怎么办? 很简单 自己写一个输入格式,然后写一个对...

2159
来自专栏码匠的流水账

reactor-netty中TcpClient的create过程

本文主要研究一下reactor-netty中TcpClient的create的过程

2701
来自专栏码匠的流水账

聊聊storm worker的executor与task

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java

1072

扫码关注云+社区

领取腾讯云代金券