专栏首页扎心了老铁Hadoop通过HCatalog编写Mapreduce任务访问hive库中schema数据

Hadoop通过HCatalog编写Mapreduce任务访问hive库中schema数据

1、dirver

package com.kangaroo.hadoop.drive;

import java.util.Map;
import java.util.Properties;

import com.kangaroo.hadoop.mapper.AggregateMapper;
import com.kangaroo.hadoop.reducer.AggregateReducer;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.kangaroo.hadoop.utils.PropertiesUtil;


public class DriveMain extends Configured implements Tool {

    private static final Logger logger = LoggerFactory.getLogger(DriveMain.class);
    private Configuration conf;
    private PropertiesUtil propUtil;

    public DriveMain() {
        this.conf = new Configuration();
        this.propUtil = new PropertiesUtil("configure.properties");
    }

    public int run(String[] args) throws Exception {
        try {
            logger.info("MapReduce Job Beginning.");
            String dbName = args[0];
            String tableName = args[1];
            String partition = args[2];
            String sumField = args[3];
            String outPath = args[4];
            String partFilter = partitionFormat(partition);
            logger.info("[Params] dbName:{}; tableName:{}, partition:{}, sumField:{}, outPath:{}, partFilter:{}",
                    dbName, tableName, partition, sumField, outPath, partFilter);
            this.conf.set("sumField", sumField);
            this.setMapRedConfiguration();
            Job job = this.setJobConfiguration(this.conf);
            HCatInputFormat.setInput(job, dbName, tableName, partFilter);
            logger.info("setInput successfully.");
            FileOutputFormat.setOutputPath(job, new Path(outPath));
            logger.info("setOutput successfully.");
            return (job.waitForCompletion(true) ? 0 : 1);
        } catch (Exception ex) {
            logger.error(ex.getMessage());
            throw ex;
        }
    }

    private Job setJobConfiguration(Configuration conf) throws Exception {
        try {
            logger.info("enter setJobConfiguration");
            Job job = Job.getInstance(conf);
            job.setJarByClass(DriveMain.class);
            job.setInputFormatClass(HCatInputFormat.class);
            job.setMapperClass(AggregateMapper.class);
            job.setReducerClass(AggregateReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.setNumReduceTasks(1);
            logger.info("setJobConfiguration successfully.");
            return job;
        } catch (Exception ex) {
            logger.error("setJobConfiguration: " + ex.getMessage());
            throw new Exception(ex);
        }
    }

    private void setMapRedConfiguration() {
        try {
            Properties properties = propUtil.getProperties();
            logger.info("Load MapReduce Configuration Successfully.");
            for (Map.Entry entry : properties.entrySet()) {
                if (entry.getKey().toString().startsWith("mapred")) {
                    conf.set(entry.getKey().toString(), entry.getValue().toString());
                    logger.info("[MR][Config] key:{}, value:{}", entry.getKey().toString(), entry.getValue().toString());
                }
            }
            logger.info("[MR][Config] Set MapReduce Configuration Successfully.");
        } catch (Exception e) {

        }

    }


    private String partitionFormat(String partition) {
        String format = "";
        if(!partition.contains("pt") && ! partition.contains("dt")) {
            String[] items = partition.split("/");
            String[] keys = {"year","month","day", "hour"};
            for(int i=0; i<items.length; i++) {
                if (i == items.length-1) {
                    format += keys[i] + "='" + items[i] + "'";
                } else {
                    format += keys[i] + "='" + items[i] + "' and ";
                }
            }
        } else {
            format = partition;
        }
        return format;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new DriveMain(), args);
        System.exit(exitCode);
    }

}

2、Mapper

package com.kangaroo.hadoop.mapper;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@SuppressWarnings("rawtypes")
public class AggregateMapper extends Mapper<WritableComparable, HCatRecord, Text, Text> {

    private static final Logger logger = LoggerFactory.getLogger(AggregateMapper.class);

    private HCatSchema schema;
    private Text outKey;
    private Text outValue;
    private IntWritable one;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        outKey = new Text();
        outValue = new Text();
        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
    }

    @Override
    protected void map(WritableComparable key, HCatRecord value, Context context) throws IOException, InterruptedException {
        String sumField = context.getConfiguration().get("sumField");
        Map<String, String> recordMap = new HashMap<String, String>();
        for (String fieldName : schema.getFieldNames()) {
            logger.info("fieldName={}", fieldName);
            String fieldValue = value.get(fieldName, schema).toString();
            logger.info("fieldName={}, fieldValue={}", fieldName, fieldValue);
            recordMap.put(fieldName, fieldValue);
            logger.info("recordMap={}", recordMap.toString());
        }
        outKey.set(recordMap.get(sumField));
        outValue.set("1");
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(outKey, outValue);
    }
}

3、Reducer

package com.kangaroo.hadoop.reducer;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

@SuppressWarnings("rawtypes")
public class AggregateReducer extends Reducer<Text, Text, Text, Text> {
    protected static final Logger logger = LoggerFactory.getLogger(AggregateReducer.class);
    HCatSchema schema;
    Text outKey;
    Text outValue;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
    }

    @Override
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException {
        outKey.set(key);
        int sum = 0;
        for (Text value : values) {
            sum += Integer.parseInt(value.toString());
        }
        outValue.set(String.valueOf(sum));
    }

    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(outKey, outValue);
    }
}

4、propertyUtil

package com.kangaroo.hadoop.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;


public class PropertiesUtil {
    private String filePath;

    public PropertiesUtil() {
        this.filePath = "configure.properties";
    }

    public PropertiesUtil(String filePath) {
        this.filePath = filePath;
    }

    public Properties getProperties() throws IOException {
        Properties prop;
        InputStream inStream = null;
        try {
            inStream = PropertiesUtil.class.getClassLoader()
                    .getResourceAsStream(this.filePath);
            prop = new Properties();
            prop.load(inStream);

            return prop;
        } finally {
            if (inStream != null)
                inStream.close();
        }
    }
}

5、配置

mapred.job.queue.name=root.XXX
mapred.jar=./XXX.jar
mapred.map.tasks=300
mapred.reduce.tasks=100
#mapred.map.capacity=1
#mapred.reduce.capacity=1
mapred.job.priority=HIGH
mapred.job.name=XXX

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • springboot与thrift集成实现服务端和客户端

    我们这里用一个简单的小功能来演示一下如何使用springboot集成thrift 这个功能是,判断hdfs路径存在。 1、先解决依赖 <dependencie...

    用户1225216
  • springboot scheduled并发配置

    本文介绍如何使用springboot的sheduled实现任务的定时调度,并将调度的任务实现为并发的方式。 1、定时调度配置scheduled 1)注册定时任务...

    用户1225216
  • SpringMVC拦截器Interceptor

    SpringMVC拦截器(Interceptor)实现对每一个请求处理前后进行相关的业务处理,类似与servlet中的Filter。 SpringMVC 中的I...

    用户1225216
  • Scala Http请求工具类

    import java.io.IOException import java.util import org.apache.http.client.Clie...

    实时计算
  • 这篇带你深入理解SpringBoot中的自动装配(好文精读)

    SpringBoot的自动装配是拆箱即用的基础,也是微服务化的前提。其实它并不那么神秘,我在这之前已经写过最基本的实现了,大家可以参考这篇文章。这次主要的议题是...

    Java团长
  • freeswitch笔记(9)-esl outbound中如何放音采集按键?

    关于这个功能,esl-client 上给出的源码示例极具误导性,根本跑不起来,见: https://github.com/esl-client/esl-clie...

    菩提树下的杨过
  • Quartz2.x动态Job工具类 原

    有些需求,需要动态启动一个定时器,然后在一定条件下再停止。比如通过rest控制jenkins做发布,当发起一个构建后,就需要定时去查询构建状态,在构建完成后再停...

    尚浩宇
  • 微信公众号开发-素材/消息管理接口

    本文是 微信公众号开发者模式介绍及接入 的后续,如没看过前文的话,可能看本文会有些懵逼。本文主要介绍微信公众平台的素材、消息管理接口的开发。由于个人的订阅号是没...

    端碗吹水
  • Java任务调度框架Quartz教程实例

    http://blog.csdn.net/yuebinghaoyuan/article/details/9045471

    bear_fish
  • Spring Security技术栈开发企业级认证与授权(十四)使用Spring Social集成QQ登录验证方式

    在前一篇文章中介绍到,Spring Social封装了OAuth协议的标准步骤,我们只需要配置第三方应用的认证服务器地址即可,就可以获取到访问令牌Access ...

    itlemon

扫码关注云+社区

领取腾讯云代金券