前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hadoop--HDFS API编程封装

Hadoop--HDFS API编程封装

作者头像
汤高
发布2018-01-11 16:46:10
9780
发布2018-01-11 16:46:10
举报
文章被收录于专栏:积累沉淀积累沉淀

HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。下面记录一下使用JAVA API对HDFS中的文件进行操作的过程。

  对分HDFS中的文件操作主要涉及一下几个类:

  Configuration类:该类的对象封转了客户端或者服务器的配置。

  FileSystem类:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作。FileSystem fs = FileSystem.get(conf);通过FileSystem的静态方法get获得该对象。

  FSDataInputStream和FSDataOutputStream:这两个类是HDFS中的输入输出流。分别通过FileSystem的open方法和create方法获得。

代码语言:javascript
复制
package hdfsday01;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

public class HdfsDemo {

    //创建新文件
    public static void CreateFile(String url,String path){
        try {
            //该类的对象封转了客户端或者服务器的配置。
            Configuration config = new Configuration();
            URI uri = new URI("hdfs://192.168.52.140:9000");
            //该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作。
            FileSystem fs = FileSystem.get(uri, config);
            //目标路径
            Path p = new Path("hdfs://192.168.52.140:9000/dirs");
            fs.create(p, new Progressable() {

                @Override
                public void progress() {
                    System.out.print(">>");
                }
            });

            System.out.println("create dir successful!!!");

        } catch (URISyntaxException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //判断文件是否存在于Hdfs
    public static boolean IfFileExits(String FileName)throws Exception 
    {
         Configuration conf = new Configuration();
          FileSystem fs = FileSystem.get(URI.create(FileName),conf);
          Path path = new Path(FileName);
          boolean isExists = fs.exists(path);
          return isExists;
    }

    //删除文件
    public static void delete(String url,String filePath) throws IOException{
        try {
            Configuration conf = new Configuration();
            URI uri = new URI(url);
            FileSystem fs = FileSystem.get(uri,conf);
            Path path = new Path(filePath);
            boolean isok = fs.deleteOnExit(path);
            if(isok){
                System.out.println("delete ok!");
            }else{
                System.out.println("delete failure");
            }
            fs.close();
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

    //上传本地文件
    public static void uploadFile(String url,String src,String dst) throws IOException{
        try {
            Configuration conf = new Configuration();
            URI uri = new URI(url);
            FileSystem fs = FileSystem.get(uri,conf);
            Path srcPath = new Path(src); //原路径
            Path dstPath = new Path(dst); //目标路径
            //调用文件系统的文件复制函数,前面参数是指是否删除原文件,true为删除,默认为false
            fs.copyFromLocalFile(false,srcPath, dstPath);
            //打印文件路径
            System.out.println("Upload to "+conf.get("fs.default.name"));
            System.out.println("------------list files------------"+"\n");
            //列出文件
            FileStatus [] fileStatus = fs.listStatus(dstPath);
            for (FileStatus file : fileStatus) 
            {
                System.out.println(file.getPath());
            }
            fs.close();
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

    //从hdfs下载文件
    public static void downFile(String url,String src,String dst) throws IOException{
        try {
            Configuration conf = new Configuration();
            URI uri = new URI(url);
            FileSystem fs = FileSystem.get(uri,conf);
            Path srcPath = new Path(src); //原路径
            Path dstPath = new Path(dst); //目标路径
            //调用文件系统的文件复制函数,前面参数是指是否删除原文件,true为删除,默认为false
            fs.copyToLocalFile(false,srcPath, dstPath);
            //打印文件路径
            System.out.println("Upload to "+conf.get("fs.default.name"));
            System.out.println("------------list files------------"+"\n");
            FileStatus [] fileStatus = fs.listStatus(dstPath);
            for (FileStatus file : fileStatus) 
            {
                System.out.println(file.getPath());
            }
            fs.close();
        } catch (IllegalArgumentException e) {
        } catch (URISyntaxException e) {
        }
    }


    //文件重命名
    public static void rename(String url,String oldName,String newName) throws IOException{
        try {
            Configuration conf = new Configuration();
            URI uri = new URI(url);
            FileSystem fs = FileSystem.get(uri,conf);
            Path oldPath = new Path(oldName);
            Path newPath = new Path(newName);
            boolean isok = fs.rename(oldPath, newPath);
            if(isok){
                System.out.println("rename ok!");
            }else{
                System.out.println("rename failure");
            }
            fs.close();
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

    //创建目录
    public static void mkdir(String url,String path) throws IOException{
        try {
            Configuration conf = new Configuration();
            URI uri = new URI(url);
            FileSystem fs = FileSystem.get(uri,conf);
            Path srcPath = new Path(path);
            boolean isok = fs.mkdirs(srcPath);
            if(isok){
                System.out.println("create dir ok!");
            }else{
                System.out.println("create dir failure");
            }
            fs.close();
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

    //读取文件的内容
    public static void readFile(String url,String filePath) throws IOException{
        try {
            Configuration conf = new Configuration();
            URI uri = new URI(url);
            FileSystem fs = FileSystem.get(uri,conf);
            Path srcPath = new Path(filePath);
            InputStream in = null;
            try {
                in = fs.open(srcPath);
                IOUtils.copyBytes(in, System.out, 4096, false); //复制到标准输出流
            } finally {
                IOUtils.closeStream(in);
            }
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

  //显示Hdfs一组路径的文件信息
    public static void ListStatus(String args[]) throws Exception 
    {

        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri),conf);
        Path[] paths = new Path[args.length];
                for(int i=0;i<paths.length;i++)
                {
                    paths[i] = new Path(args[i]);
                }
        FileStatus[] status =fs.listStatus(paths);
        Path[] listedPaths = FileUtil.stat2Paths(status);
        for(Path p: listedPaths){
            System.out.println(p);
        }

    }


    //查找文件所在的数据块
    public static void GetBlockInfo(String url,String filename) throws Exception{   
        Configuration conf= new Configuration();
        URI uri = new URI(url);
        FileSystem fs = FileSystem.get(uri,conf);
        Path path = new Path(filename);
        FileStatus fileStatus = fs.getFileStatus(path);                                                  
        BlockLocation[] blkloc=fs.getFileBlockLocations(fileStatus,0,fileStatus.getLen()); //查找文件所在数据块
        for(BlockLocation loc: blkloc){
            for(int i=0;i < loc.getHosts().length;i++)
                System.out.println(loc.getHosts()[i]);
             }
         }
    }

 public static void main(String[] args) throws Exception {
        String url="hdfs://192.168.52.140:9000";
        //CreateFile("hdfs://192.168.52.140:9000","hdfs://192.168.52.140:9000/dirs2");
        //delete("hdfs://192.168.52.140:9000","hdfs://192.168.52.140:9000/dirs");
        //rename(url,"hdfs://192.168.52.140:9000/dir","hdfs://192.168.52.140:9000/newdir");
    //  mkdir(url,"hdfs://192.168.52.140:9000/mydir");
        String relativelyPath =System.getProperty("user.dir")+"/src/log4j.properties";
        String downPath =System.getProperty("user.dir")+"/";
        //uploadFile(url,relativelyPath,"hdfs://192.168.52.140:9000/mydir");

        //readFile(url,"hdfs://192.168.52.140:9000/mydir/log4j.properties");
        //downFile(url,"hdfs://192.168.52.140:9000/mydir/log4j.properties","D:/");
        //System.out.println(IfFileExits("hdfs://192.168.52.140:9000/mydir/log4j.properties"));

        GetBlockInfo(url,"hdfs://192.168.52.140:9000/mydir/log4j.properties");

    }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-04-23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云 HDFS
云 HDFS(Cloud HDFS,CHDFS)为您提供标准 HDFS 访问协议,您无需更改现有代码,即可使用高可用、高可靠、多维度安全、分层命名空间的分布式文件系统。 只需几分钟,您就可以在云端创建和挂载 CHDFS,来实现您大数据存储需求。随着业务需求的变化,您可以实时扩展或缩减存储资源,CHDFS 存储空间无上限,满足您海量大数据存储与分析业务需求。此外,通过 CHDFS,您可以实现计算与存储分离,极大发挥计算资源灵活性,同时实现存储数据永久保存,降低您大数据分析资源成本。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档