HDFS:常用客户端API及IO操作

简洁代码的配置

将core-site.xml复制到根目录下,配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<!-- 指定HDFS中NameNode的地址 -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://pdc:9000</value>
    </property>

    <!-- 指定hadoop运行时产生文件的存储目录 -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/module/hadoop-2.7.2/data/tmp</value>
    </property>
</configuration>

代码模板

    @Test
    public void mkdirAtHDFS() throws Exception{
        //1.创建配置信息对象
        Configuration configuration = new Configuration();
        //2.创建文件系统,如果配置上面的core-site.xml,则传configuration即可
        FileSystem fs = FileSystem.get(new URI("hdfs://pdc:9000"),configuration, "root");   
        //3.调用API
        fs.xxx(xxx);
    }

1.获取文件系统

FileSystem fs = FileSystem.get(configuration);

2.文件上传

API:

// 创建要上传文件所在的本地路径
Path src = new Path("e:/pdc.txt");
//  创建要上传到hdfs的目标路径
Path dst = new Path("hdfs://pdc:9000/user/pdc");
//  拷贝文件,并删除源文件
fs.copyFromLocalFile(true,src, dst);

注:不用hadoop的API进行流操作是因为运算可能会基于Spark,更快,用io就不用依赖于hadoop IO:

//创建输入流
FileInputStream inStream = new FileInputStream(new File("e:/hello.txt"));
//获取输出路径
String putFileName = "hdfs://pdc:9000/user/pdc/hello1.txt";
Path writePath = new Path(putFileName);
//创建输出流
FSDataOutputStream outStream = fs.create(writePath);
// 5 流对接
try{
    IOUtils.copyBytes(inStream, outStream, 4096, false);
}catch(Exception e){
    e.printStackTrace();
}finally{
    IOUtils.closeStream(inStream);
    IOUtils.closeStream(outStream);
}

3.文件下载

API:

fs.copyToLocalFile(false, new Path("hdfs://pdc:9000/user/atguigu/hello.txt"), new Path("e:/hellocopy.txt"), true);

IO:

//获取读取文件路径
String filename = "hdfs://pdc:9000/user/pdc/hello1.txt";
//创建读取path
Path readPath = new Path(filename);
// 4 创建输入流
FSDataInputStream inStream = fs.open(readPath);
//流对接输出到控制台
try{
    IOUtils.copyBytes(inStream, System.out, 4096, false);
}catch(Exception e){
    e.printStackTrace();
}finally{
    IOUtils.closeStream(inStream);
}

4.创建目录

fs.mkdirs(new Path("hdfs://pdc:9000/user/pdc/output"));

5.删除文件夹

fs.delete(new Path("hdfs://pdc:9000/user/root/output"), true);

6.修改文件名

fs.rename(new Path("hdfs://pdc:9000/user/root/hello.txt"), new Path("hdfs://pdc:9000/user/root/hellonihao.txt"));

7.查看文件详情

RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
//注意返回的是迭代器
while (listFiles.hasNext()) {
    LocatedFileStatus fileStatus = listFiles.next();
        
    System.out.println(fileStatus.getPath().getName());
    System.out.println(fileStatus.getBlockSize());
    System.out.println(fileStatus.getPermission());
    System.out.println(fileStatus.getLen());
        
    BlockLocation[] blockLocations = fileStatus.getBlockLocations();
        
    for (BlockLocation bl : blockLocations) {
            
        System.out.println("block-offset:" + bl.getOffset());
            
        String[] hosts = bl.getHosts();
            
        for (String host : hosts) {
            System.out.println(host);
        }
    }
}

8.查看文件夹

//获取查询路径下的文件状态信息
FileStatus[] listStatus = fs.listStatus(new Path("/"));

//遍历所有文件状态
for (FileStatus status : listStatus) {
    if (status.isFile()) {
        System.out.println("f--" + status.getPath().getName());
    } else {
        System.out.println("d--" + status.getPath().getName());
    }
}

9.IO流定位文件读取

下载第一块

@Test
//定位下载第一块内容
public void readFileSeek1() throws Exception {

    //创建配置信息对象
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://pdc:9000"), configuration, "root");
    //获取输入流路径
    Path path = new Path("hdfs://pdc:9000/user/root/tmp/hadoop-2.7.2.tar.gz");
    //打开输入流
    FSDataInputStream fis = fs.open(path);
    // 4 创建输出流
    FileOutputStream fos = new FileOutputStream("e:/hadoop-2.7.2.tar.gz.part1");
    // 5 流对接
    byte[] buf = new byte[1024];
    for (int i = 0; i < 128 * 1024; i++) {
        fis.read(buf);
        fos.write(buf);
    }
    //关闭流
    IOUtils.closeStream(fis);
    IOUtils.closeStream(fos);
}

下载第二块

@Test
// 定位下载第二块内容
public void readFileSeek2() throws Exception{
    //创建配置信息对象
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://pdc:9000"), configuration, "root");
    //获取输入流路径
    Path path = new Path("hdfs://pdc:9000/user/pdc/tmp/hadoop-2.7.2.tar.gz");
    //打开输入流
    FSDataInputStream fis = fs.open(path);
    //创建输出流
    FileOutputStream fos = new FileOutputStream("e:/hadoop-2.7.2.tar.gz.part2");
    //定位偏移量(第二块的首位)
    fis.seek(1024 * 1024 * 128);
    //流对接
    IOUtils.copyBytes(fis, fos, 1024);
    //关闭流
    IOUtils.closeStream(fis);
    IOUtils.closeStream(fos);
}

合并文件 cmd中:

type hadoop-2.7.2.tar.gz.part2 >> hadoop-2.7.2.tar.gz.part1

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券