orc文件格式对常用系统的支持

1、Hive支持

创建表时指定orc格式即可:

create table tmp.orc_test(id bigint, name string, age int) stored as orc TBLPROPERTIES('orc.compress'='SNAPPY')

压缩格式有"SNAPPY"和 "ZLIB"两种,需要哪种格式指定即可。

2、SPARK支持

Spark读:
df  = spark.read.orc("/tmp/test/orc_data")  # 读出来的数据是一个dataframe

Spark写:
df.write.format("orc").save("/tmp/test/orc_data2")

3、Hadoop Streaming支持

3.1、读orc文件,输出text

hadoop jar /usr/local/hadoop-2.7.0//share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \
-libjars /usr/local/hive-1.2.0/lib/hive-exec-1.2.0-SNAPSHOT.jar \
-mapper /bin/cat -reducer /bin/cat \
-input /tmp/test/orc_test1 \
-output /tmp/test/orc_streaming_test3 \
-inputformat org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 

返回的数据:

null    {"name":"123","age":"456"}
null    {"name":"456","age":"789"}

3.2、读orc文件,写orc文件:

hadoop jar /usr/local/hadoop-2.7.0//share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \
-libjars orc_maprd_test.jar \
-D orc.mapred.output.schema="struct<id:string,name:string,sex:string,age:string>" \
-input /tmp/test/orc_streaming_test \
-output /tmp/test/orc_streaming_test2 \
-inputformat org.apache.orc.mapred.OrcInputFormat \
-outputformat org.apache.orc.mapred.OrcOutputFormat \
-mapper is.orc.MyMapper -reducer is.orc.MyReducer 

pom.xml

<dependencies>
  <dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-mapreduce</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.7.0</version>
  </dependency>
</dependencies>

mapper:

package is.orc;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.mapred.OrcStruct;

import java.io.IOException;
import java.util.Random;

class MyMapper implements Mapper<NullWritable,OrcStruct,LongWritable,Text> {

    Random random = new Random();

    public void close() { }

    public void map(NullWritable nullWritable, OrcStruct orcStruct, OutputCollector<LongWritable, Text> outputCollector, Reporter reporter) throws IOException {
        StringBuffer str = new StringBuffer();
        str.append(orcStruct.getFieldValue(0).toString() + "\t");
        str.append(orcStruct.getFieldValue(1).toString() + "\t");
        str.append(orcStruct.getFieldValue(2).toString() + "\t");
        str.append(orcStruct.getFieldValue(3).toString() );
        
        //不知道为什么Mapper的OutputKey只能用LongWritable,用随机数生成一个key,防止读orc文件后单reduce的情况
        LongWritable key = new LongWritable(random.nextInt(5)); 
        outputCollector.collect(key, new Text(str.toString()));
    }

    public void configure(JobConf jobConf) {
        jobConf.setMapOutputKeyClass(Writable.class);
        jobConf.setMapOutputValueClass(Text.class);
    }
}

Reducer:

package is.orc;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import java.io.IOException;
import java.util.Iterator;


class MyReducer implements Reducer<LongWritable, Text, NullWritable, OrcStruct> {
    //要创建的ORC文件中的字段类型
    private TypeDescription schema = TypeDescription.fromString(
            "struct<id:string," +
                    "name:string," +
                    "sex:string," +
                    "age:string>"
    );

    private OrcStruct pair = (OrcStruct)OrcStruct.createValue(schema);


    public void reduce(LongWritable text, Iterator<Text> iterator, OutputCollector<NullWritable, OrcStruct> outputCollector, Reporter reporter) throws IOException {

        while (iterator.hasNext()) {
            String[] lineSplit = iterator.next().toString().split("\t");
            pair.setFieldValue("name",new Text(lineSplit[0]));
            pair.setFieldValue("sex",new Text(lineSplit[1]));
            pair.setFieldValue("age",new Text(lineSplit[2]));
            pair.setFieldValue("id",new Text(lineSplit[3]));
            break;
        }

        outputCollector.collect(NullWritable.get(),pair);
    }

    public void close() throws IOException {

    }

    public void configure(JobConf jobConf) {

    }
}

4、MapReduce支持

读orc的mapper:

package is.orc;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.orc.mapred.OrcStruct;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class OrcFileReadMapper extends Mapper<NullWritable, OrcStruct, Text, NullWritable> {

    private Text outputKey = new Text();

    @Override
    protected void map(NullWritable key, OrcStruct value, Context context) throws IOException, InterruptedException {
        StringBuffer sb= new StringBuffer();
        if (value.getFieldValue(0) == null){
            sb.append("-1\t");
        }else{
            sb.append(value.getFieldValue(0).toString() + "\t");      //通过下标索引获取数据
        }


        sb.append(value.getFieldValue(1).toString()+ "\t");
        sb.append(value.getFieldValue(2).toString()+ "\t");
        sb.append(value.getFieldValue(3).toString());      //也可以通过字段名获取数据

        outputKey = new Text(sb.toString());

        context.write(outputKey, NullWritable.get());
    }
}

写orc的reduer:

package is.orc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;

import java.io.IOException;

public class OrcFileWriteReducer extends Reducer<Text,NullWritable,NullWritable,OrcStruct> {

    //要创建的ORC文件中的字段类型
    private TypeDescription schema = TypeDescription.fromString(
            "struct<id:string," +
                    "name:string," +
                    "sex:string," +
                    "age:string>"
    );

    private OrcStruct pair = (OrcStruct)OrcStruct.createValue(schema);

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        String line = key.toString();
        String[] lineSplit = line.trim().split("\t");

        pair.setFieldValue("id",new Text(lineSplit[0]));
        pair.setFieldValue("name",new Text(lineSplit[1]));
        pair.setFieldValue("sex",new Text(lineSplit[2]));
        pair.setFieldValue("age",new Text(lineSplit[3]));

        context.write(NullWritable.get(),pair);
    }
}

job配置:

package is.orc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcInputFormat;
import org.apache.orc.mapreduce.OrcOutputFormat;

import java.io.IOException;

/**
 * @author lyf
 * @since 2018/06/16
 */
public class OrcFileWriteJob extends Configured implements Tool {

    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = getConf();

        conf.set("orc.mapred.output.schema","struct<id:string,name:string,sex:string,age:string>");
        String input = "/dws/dd_read_d_v2/dt=20180809/000000_0";
        String output = "/tmp/test/test_mr_orc";

        Job job = Job.getInstance(conf);

        job.setJarByClass(OrcFileWriteJob.class);
        job.setMapperClass(OrcFileReadMapper.class);
        job.setReducerClass(OrcFileWriteReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(OrcStruct.class);

        job.setInputFormatClass(OrcInputFormat.class);
        job.setOutputFormatClass(OrcOutputFormat.class);

        FileInputFormat.addInputPath(job,new Path(input));
        FileOutputFormat.setOutputPath(job,new Path(output));

        boolean rt = job.waitForCompletion(true);
        return rt?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        int retnum = ToolRunner.run(conf,new OrcFileWriteJob(),args);
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏数据结构与算法

BZOJ2434: [Noi2011]阿狸的打字机(AC自动机 树状数组)

1133
来自专栏叁金大数据

C#调用C++ Dll

现在项目基本都是旁边C++的哥们做好dll扔给我,然后我调用。好久之前晚上down了一份c#调用c++dll的方法,出处早已经遗忘。闲来无事,放上来好了。原作者...

3402
来自专栏码匠的流水账

聊聊flink的RichParallelSourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/func...

1372
来自专栏码匠的流水账

FluxSink实例及解析

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxSink.java

1292
来自专栏函数式编程语言及工具

SDP(6):分布式数据库运算环境- Cassandra-Engine

    现代信息系统应该是避不开大数据处理的。作为一个通用的系统集成工具也必须具备大数据存储和读取能力。cassandra是一种分布式的数据库,具备了分布式数据...

3274
来自专栏智能大石头

C++返回值优化RVO

返回值优化,是一种属于编译器的技术,它通过转换源代码和对象的创建来加快源代码的执行速度。RVO = return value optimization。 测试...

2959
来自专栏张善友的专栏

如何结合IbatisNet的LIST遍历实现模糊查询

我仿照Java的Spring+Ibatis+Struct用Castle+IBatisNet+Asp.net的开发框架的DAO的基类:BaseSqlMapDao内...

2109
来自专栏浪淘沙

MapReduce操作实例

要点:有一个combiner方法,可以在执行完map时调用,从而对数据进行先一步的处理,降低Reduce的IO压力。

2163
来自专栏后端沉思录

自定义钉钉机器人报警

按照钉钉的文档来开发,创建机器人后,即可获取Webhook地址,整个过程还是很简单的,以上只是提供了一个思路.

2812
来自专栏码匠的流水账

聊聊flink的RichParallelSourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/func...

2101

扫码关注云+社区