前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口

对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口

作者头像
Maynor
发布2022-04-14 14:13:45
3410
发布2022-04-14 14:13:45
举报

对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口,同时对MapReduce处理好的数据利用Hive实现数据的基本统计。 设计要求:

  1. 根据数据特征,设计一个任务场景,利用MapReduce编程实现数据的清洗和预处理。(10分)
  2. 利用HDFS的JavaAPI编写程序将原始数据和预处理后的数据上传到分布式文件系统

数据集: 链接:https://pan.baidu.com/s/1rnUJn5ld45HpLhzbwYIM1A

代码语言:javascript
复制
package com.company.HDFS;
 
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class step0 {
	final static String INPUT_PATH="hdfs://192.168.88.100/data";
	final static String OUTPUT_PATH="hdfs://192.168.88.100/output";
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		Configuration configuration = new Configuration();
		FileSystem fileSystem =FileSystem.get(new URI(INPUT_PATH),configuration);
		if (fileSystem.exists(new Path(OUTPUT_PATH))) {
			fileSystem.delete(new Path(OUTPUT_PATH),true);
		}
		Job job = new Job(configuration,"step0");
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));
		job.setJarByClass(step0.class);
		job.setMapperClass(ReMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setReducerClass(ReReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		job.waitForCompletion(true);
	}
 
 
 
    public static class ReReducer extends Reducer<Text,NullWritable, Text,NullWritable> {
        private IntWritable result = new IntWritable();
 
        public ReReducer() {
        }

        protected void reduce(Text key2, Iterable<NullWritable> value2, Reducer<Text,NullWritable, Text,NullWritable>.Context context) throws IOException, InterruptedException {
        	
            context.write(key2,NullWritable.get());
            			
        }
    }
 
    public static class ReMapper extends Mapper<LongWritable, Text, Text,NullWritable> {
        private static final int FAIL_DATA=9999;
        public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
            
        	context.write(value,NullWritable.get());
 
        }
    }
}
代码语言:javascript
复制
package com.company.HDFS;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class step1 {

    /**
     * 查看   所有文件
     */
    @Test
    public void demo_03() {

        try {
            //1 获取文件系统
            Configuration configuration = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://192.168.88.100:8020"), configuration, "root");

            // 2 获取文件详情
            RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);



            while (listFiles.hasNext()) {
                LocatedFileStatus status = listFiles.next();

                // 输出详情
                // 文件名称
                System.out.println(status.getPath().getName());
                // 长度
                System.out.println(status.getLen());
                // 权限
                System.out.println(status.getPermission());
                // 分组
                System.out.println(status.getGroup());

                // 获取存储的块信息
                BlockLocation[] blockLocations = status.getBlockLocations();

                for (BlockLocation blockLocation : blockLocations) {

                    // 获取块存储的主机节点
                    String[] hosts = blockLocation.getHosts();

                    for (String host : hosts) {
                        System.out.println(host);
                    }
                }

                System.out.println("-----------分割线----------");
            }

            // 3 关闭资源
            fs.close();
        } catch (Exception ex) {
        }
    }


    /**
     * 上传
     */
    @Test
    public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException {

        // 1 获取文件系统
        Configuration configuration = new Configuration();
        configuration.set("dfs.replication", "2");
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.88.100:8020"), configuration, "root");

        // 2 上传文件
        fs.copyFromLocalFile(new Path("J:\\the_efforts_paid_offf\\HDFS_HBase_HiveApi\\src\\main\\java\\com\\company\\datas\\iris.data"), new Path("hdfs://192.168.88.100/input"));

        // 3 关闭资源
        fs.close();

        System.out.println("over");
    }
}
代码语言:javascript
复制
package com.company.HDFS;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;

import java.io.IOException;

/**
 * @author ChinaManor
 * #Description hbase的javaAPI
 * #Date: 2021/12/19 18:10
 */
public class step2 {
    /**
     * @Description: createTable():创建表的方法
     * @Param: 0
     * @return: 0
     */

    @Test
    public void createTable() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "node1:2181");
        //建立连接
        Connection conn = ConnectionFactory.createConnection(conf);
        //获取表的管理类
        Admin admin = conn.getAdmin();
        //定义表
        HTableDescriptor hTableDescriptor=new HTableDescriptor(TableName.valueOf("demo"));
        //定义列簇
        HColumnDescriptor hColumnDescriptor =new HColumnDescriptor("info");
        //讲列簇定义到表中
        hTableDescriptor.addFamily(hColumnDescriptor);
        //执行建表操作
        admin.createTable(hTableDescriptor);
        admin.close();
        conn.close();


    }

    /**
     * @Description: 向Hbase中插入数据的方法
     * @Param: null
     * @return: null
     */

    @Test
    public void put(){
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","node1:2181");

        try {
            //建立连接
            Connection conn= ConnectionFactory.createConnection(conf);
            //获取表
            Table table=conn.getTable(TableName.valueOf("demo"));
            //用行键实例化put
            Put put= new Put("rk001".getBytes());
            //指定列簇名,列名,和值
            put.addColumn("info".getBytes(),"name".getBytes(),"zhangsan".getBytes());
            table.put(put);
            table.close();
            conn.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * @Description: scan()查询一个表的所有信息
     * @Param: 1
     * @return: 1
     */

    @Test
    public  void scan() throws IOException {
        Configuration conf=HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "node1:2181");

        //建立连接
        Connection conn=ConnectionFactory.createConnection(conf);

        //获取表
        Table table=conn.getTable(TableName.valueOf("demo"));


        //初始化Scan实例
        Scan scan=new Scan();

        //增加过滤条件
        scan.addColumn("info".getBytes(), "name".getBytes());
        //返回结果
        ResultScanner rss=table.getScanner(scan);
        //迭代并取出结果
        for(Result rs:rss){
            String valStr=Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            System.out.println(valStr);
        }


        //关闭连接
        table.close();
        conn.close();

    }

    /**
     * @Description: delete()删除表中的信息
     * @Param: 1
     * @return: 1
     */

    @Test
    public  void delete() throws IOException {
        Configuration conf=HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "node1:2181");

        //建立连接
        Connection conn=ConnectionFactory.createConnection(conf);

        //获取表
        Table table=conn.getTable(TableName.valueOf("demo"));


        // 用行键来实例化Delete实例
        Delete del = new Delete("rk0001".getBytes());
        // 执行删除
        table.delete(del);


        //关闭连接
        table.close();
        conn.close();
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-12-19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档