前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >orc文件格式对常用系统的支持

orc文件格式对常用系统的支持

作者头像
YG
发布2018-11-22 10:26:49
2.1K0
发布2018-11-22 10:26:49
举报
文章被收录于专栏:YG小书屋YG小书屋

1、Hive支持

创建表时指定orc格式即可:

代码语言:javascript
复制
create table tmp.orc_test(id bigint, name string, age int) stored as orc TBLPROPERTIES('orc.compress'='SNAPPY')

压缩格式有"SNAPPY"和 "ZLIB"两种,需要哪种格式指定即可。

2、SPARK支持

代码语言:javascript
复制
Spark读:
df  = spark.read.orc("/tmp/test/orc_data")  # 读出来的数据是一个dataframe

Spark写:
df.write.format("orc").save("/tmp/test/orc_data2")

3、Hadoop Streaming支持

3.1、读orc文件,输出text

代码语言:javascript
复制
hadoop jar /usr/local/hadoop-2.7.0//share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \
-libjars /usr/local/hive-1.2.0/lib/hive-exec-1.2.0-SNAPSHOT.jar \
-mapper /bin/cat -reducer /bin/cat \
-input /tmp/test/orc_test1 \
-output /tmp/test/orc_streaming_test3 \
-inputformat org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 

返回的数据:

代码语言:javascript
复制
null    {"name":"123","age":"456"}
null    {"name":"456","age":"789"}

3.2、读orc文件,写orc文件:

代码语言:javascript
复制
hadoop jar /usr/local/hadoop-2.7.0//share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \
-libjars orc_maprd_test.jar \
-D orc.mapred.output.schema="struct<id:string,name:string,sex:string,age:string>" \
-input /tmp/test/orc_streaming_test \
-output /tmp/test/orc_streaming_test2 \
-inputformat org.apache.orc.mapred.OrcInputFormat \
-outputformat org.apache.orc.mapred.OrcOutputFormat \
-mapper is.orc.MyMapper -reducer is.orc.MyReducer 

pom.xml

代码语言:javascript
复制
<dependencies>
  <dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-mapreduce</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.7.0</version>
  </dependency>
</dependencies>

mapper:

代码语言:javascript
复制
package is.orc;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.mapred.OrcStruct;

import java.io.IOException;
import java.util.Random;

class MyMapper implements Mapper<NullWritable,OrcStruct,LongWritable,Text> {

    Random random = new Random();

    public void close() { }

    public void map(NullWritable nullWritable, OrcStruct orcStruct, OutputCollector<LongWritable, Text> outputCollector, Reporter reporter) throws IOException {
        StringBuffer str = new StringBuffer();
        str.append(orcStruct.getFieldValue(0).toString() + "\t");
        str.append(orcStruct.getFieldValue(1).toString() + "\t");
        str.append(orcStruct.getFieldValue(2).toString() + "\t");
        str.append(orcStruct.getFieldValue(3).toString() );
        
        //不知道为什么Mapper的OutputKey只能用LongWritable,用随机数生成一个key,防止读orc文件后单reduce的情况
        LongWritable key = new LongWritable(random.nextInt(5)); 
        outputCollector.collect(key, new Text(str.toString()));
    }

    public void configure(JobConf jobConf) {
        jobConf.setMapOutputKeyClass(Writable.class);
        jobConf.setMapOutputValueClass(Text.class);
    }
}

Reducer:

代码语言:javascript
复制
package is.orc;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import java.io.IOException;
import java.util.Iterator;


class MyReducer implements Reducer<LongWritable, Text, NullWritable, OrcStruct> {
    //要创建的ORC文件中的字段类型
    private TypeDescription schema = TypeDescription.fromString(
            "struct<id:string," +
                    "name:string," +
                    "sex:string," +
                    "age:string>"
    );

    private OrcStruct pair = (OrcStruct)OrcStruct.createValue(schema);


    public void reduce(LongWritable text, Iterator<Text> iterator, OutputCollector<NullWritable, OrcStruct> outputCollector, Reporter reporter) throws IOException {

        while (iterator.hasNext()) {
            String[] lineSplit = iterator.next().toString().split("\t");
            pair.setFieldValue("name",new Text(lineSplit[0]));
            pair.setFieldValue("sex",new Text(lineSplit[1]));
            pair.setFieldValue("age",new Text(lineSplit[2]));
            pair.setFieldValue("id",new Text(lineSplit[3]));
            break;
        }

        outputCollector.collect(NullWritable.get(),pair);
    }

    public void close() throws IOException {

    }

    public void configure(JobConf jobConf) {

    }
}

4、MapReduce支持

读orc的mapper:

代码语言:javascript
复制
package is.orc;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.orc.mapred.OrcStruct;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class OrcFileReadMapper extends Mapper<NullWritable, OrcStruct, Text, NullWritable> {

    private Text outputKey = new Text();

    @Override
    protected void map(NullWritable key, OrcStruct value, Context context) throws IOException, InterruptedException {
        StringBuffer sb= new StringBuffer();
        if (value.getFieldValue(0) == null){
            sb.append("-1\t");
        }else{
            sb.append(value.getFieldValue(0).toString() + "\t");      //通过下标索引获取数据
        }


        sb.append(value.getFieldValue(1).toString()+ "\t");
        sb.append(value.getFieldValue(2).toString()+ "\t");
        sb.append(value.getFieldValue(3).toString());      //也可以通过字段名获取数据

        outputKey = new Text(sb.toString());

        context.write(outputKey, NullWritable.get());
    }
}

写orc的reduer:

代码语言:javascript
复制
package is.orc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;

import java.io.IOException;

public class OrcFileWriteReducer extends Reducer<Text,NullWritable,NullWritable,OrcStruct> {

    //要创建的ORC文件中的字段类型
    private TypeDescription schema = TypeDescription.fromString(
            "struct<id:string," +
                    "name:string," +
                    "sex:string," +
                    "age:string>"
    );

    private OrcStruct pair = (OrcStruct)OrcStruct.createValue(schema);

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        String line = key.toString();
        String[] lineSplit = line.trim().split("\t");

        pair.setFieldValue("id",new Text(lineSplit[0]));
        pair.setFieldValue("name",new Text(lineSplit[1]));
        pair.setFieldValue("sex",new Text(lineSplit[2]));
        pair.setFieldValue("age",new Text(lineSplit[3]));

        context.write(NullWritable.get(),pair);
    }
}

job配置:

代码语言:javascript
复制
package is.orc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcInputFormat;
import org.apache.orc.mapreduce.OrcOutputFormat;

import java.io.IOException;

/**
 * @author lyf
 * @since 2018/06/16
 */
public class OrcFileWriteJob extends Configured implements Tool {

    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = getConf();

        conf.set("orc.mapred.output.schema","struct<id:string,name:string,sex:string,age:string>");
        String input = "/dws/dd_read_d_v2/dt=20180809/000000_0";
        String output = "/tmp/test/test_mr_orc";

        Job job = Job.getInstance(conf);

        job.setJarByClass(OrcFileWriteJob.class);
        job.setMapperClass(OrcFileReadMapper.class);
        job.setReducerClass(OrcFileWriteReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(OrcStruct.class);

        job.setInputFormatClass(OrcInputFormat.class);
        job.setOutputFormatClass(OrcOutputFormat.class);

        FileInputFormat.addInputPath(job,new Path(input));
        FileOutputFormat.setOutputPath(job,new Path(output));

        boolean rt = job.waitForCompletion(true);
        return rt?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        int retnum = ToolRunner.run(conf,new OrcFileWriteJob(),args);
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.10.24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、Hive支持
  • 2、SPARK支持
  • 3、Hadoop Streaming支持
    • 3.1、读orc文件,输出text
      • 3.2、读orc文件,写orc文件:
        • 4、MapReduce支持
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档