专栏首页扎心了老铁使用HDFS客户端java api读取hadoop集群上的信息

使用HDFS客户端java api读取hadoop集群上的信息

本文介绍使用hdfs java api的配置方法。

1、先解决依赖,pom

<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
            <scope>provided</scope>
        </dependency>

2、配置文件,存放hdfs集群配置信息,基本都是来源于core-site.xml和hdfs-site.xml,可以根据hdfs集群client端配置文件里的信息进行填写

#============== hadoop ===================
hdfs.fs.defaultFS=hdfs://mycluster-tj
hdfs.ha.zookeeper.quorum=XXXX-apache00.XX01,XXXX-apache01.XX01,XXXX-apache02.XX01
hdfs.dfs.nameservices=XXXX
hdfs.dfs.ha.namenodes.mycluster-tj=XX1,XX2
hdfs.dfs.namenode.rpc-address.mycluster-tj.nn1=XXXX-apachenn01.XX01:8020
hdfs.dfs.namenode.rpc-address.mycluster-tj.nn2=XXXX-apachenn02.XX01:8020

3、java client api

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.Date;


public class HadoopClient {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    private FileSystem fs;
    private String defaultFS;
    private String zKQuorum;
    private String nameServices;
    private String nameNodes;
    private String rpcAddressNN1;
    private String rpcAddressNN2;

    public void setDefaultFS(String defaultFS) {
        this.defaultFS = defaultFS;
    }
    public String getDefaultFS() {
        return defaultFS;
    }
    public void setZKQuorum(String zKQuorum) {
        this.zKQuorum = zKQuorum;
    }
    public String getzKQuorum() {
        return zKQuorum;
    }
    public void setNameServices(String nameServices) {
        this.nameServices = nameServices;
    }
    public String getNameServices() {
        return nameServices;
    }
    public void setNameNodes(String nameNodes) {
        this.nameNodes = nameNodes;
    }
    public String getNameNodes() {
        return nameNodes;
    }
    public void setRpcAddressNN1(String rpcAddressNN1) {
        this.rpcAddressNN1 = rpcAddressNN1;
    }
    public String getRpcAddressNN1() {
        return rpcAddressNN1;
    }
    public void setRpcAddressNN2(String rpcAddressNN2) {
        this.rpcAddressNN2 = rpcAddressNN2;
    }
    public String getRpcAddressNN2() {
        return rpcAddressNN2;
    }

    public void init() {
        try {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", defaultFS);
            conf.set("ha.zookeeper.quorum", zKQuorum);
            conf.set("dfs.nameservice", nameServices);
            conf.set("dfs.ha.namenodes.mycluster-tj", nameNodes);
            conf.set("dfs.namenode.rpc-address.mycluster-tj.nn1", rpcAddressNN1);
            conf.set("dfs.namenode.rpc-address.mycluster-tj.nn2", rpcAddressNN2);
            fs = FileSystem.get(new URI(defaultFS), conf);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    public void stop() {
        try {
            fs.close();
        } catch(Exception e) {

        }
    }

    public boolean exists(String path) {
        boolean isExists = false;
        try {
            Path hdfsPath = new Path(path);
            isExists = fs.exists(hdfsPath);
        } catch (Exception ex) {
            logger.error("exists error: {}", ex.getMessage());
        }
        return isExists;
    }

    public String getModificationTime(String path) throws IOException {
        String modifyTime = null;
        try {
            Path hdfsPath = new Path(path);
            FileStatus fileStatus = fs.getFileStatus(hdfsPath);
            long modifyTimestamp = fileStatus.getModificationTime();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
            Date date = new Date(modifyTimestamp);
            modifyTime = simpleDateFormat.format(date);
        } catch(Exception ex) {
            logger.error("getModificationTime error: {}", ex.getMessage());
        }
        return modifyTime;
    }


}

4、configuration

import com.xiaoju.dqa.prometheus.client.hadoop.HadoopClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class HadoopConfiguration {
    @Value("${hdfs.fs.defaultFS}")
    private String defaultFS;
    @Value("${hdfs.ha.zookeeper.quorum}")
    private String zKQuorum;
    @Value("${hdfs.dfs.nameservices}")
    private String nameServices;
    @Value("${hdfs.dfs.ha.namenodes.mycluster-tj}")
    private String nameNodes;
    @Value("${hdfs.dfs.namenode.rpc-address.mycluster-tj.nn1}")
    private String rpcAddressNN1;
    @Value("${hdfs.dfs.namenode.rpc-address.mycluster-tj.nn2}")
    private String rpcAddressNN2;

    @Bean(initMethod = "init", destroyMethod = "stop")
    public HadoopClient hadoopClient() {
        HadoopClient hadoopClient = new HadoopClient();
        hadoopClient.setDefaultFS(defaultFS);
        hadoopClient.setZKQuorum(zKQuorum);
        hadoopClient.setNameServices(nameServices);
        hadoopClient.setNameNodes(nameNodes);
        hadoopClient.setRpcAddressNN1(rpcAddressNN1);
        hadoopClient.setRpcAddressNN2(rpcAddressNN2);
        return hadoopClient;
    }
}

今天被一个问题坑的要死了,回来补这篇文章。

如果你要访问的集群采用了viewfs方式管理数据,按照本文上面的方法链接集群是有问题。会导致由URI和nameservices解析成功的namenode才可以访问,而其他的访问不了!!!

如果你想解决这个问题,在api部分你要去掉URI部分和nameservices配置,直接使用集群客户端hdfs-site.xml和core-site.xml

应该是这样的。

package com.xiaoju.dqa.jazz.hadoop.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;


public class HadoopClient {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    private FileSystem fs;

    public void init() {
        try {
            Configuration conf = new Configuration();
            conf.addResource("core-site.xml");
            conf.addResource("hdfs-site.xml");
            conf.addResource("mount-table.xml");
            fs = FileSystem.get(conf);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    public void stop() {
        try {
            fs.close();
        } catch(Exception e) {

        }
    }

    public boolean exists(String path) {
        boolean isExists = true;
        try {
            Path hdfsPath = new Path(path);
            isExists = fs.exists(hdfsPath);
        } catch (Exception e) {
            logger.error("[HDFS]判断文件是否存在失败", e);
        }
        return isExists;
    }

    public String getModificationTime(String path) throws IOException {
        String modifyTime = null;
        try {
            Path hdfsPath = new Path(path);
            FileStatus fileStatus = fs.getFileStatus(hdfsPath);
            long modifyTimestamp = fileStatus.getModificationTime();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
            Date date = new Date(modifyTimestamp);
            modifyTime = simpleDateFormat.format(date);
        } catch(Exception e) {
            logger.error("[HDFS]获取最近修改时间失败", e);
        }
        return modifyTime;
    }

    public long getPathSize(String path) throws IOException {
        long size = -1L;
        try {
            Path hdfsPath = new Path(path);
            size = fs.getContentSummary(hdfsPath).getLength();
        } catch (Exception e) {
            logger.error("[HDFS]获取路径大小失败", e);
        }
        return size;

    }

}

config中也不需要传任何参数了

package com.xiaoju.dqa.jazz.hadoop.configuration;

import com.xiaoju.dqa.jazz.hadoop.client.HadoopClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class HadoopConfig {

    @Bean(initMethod = "init", destroyMethod = "stop")
    public HadoopClient hadoopClient() {
        HadoopClient hadoopClient = new HadoopClient();
        return hadoopClient;
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • commons-pool与commons-pool2连接池(Hadoop连接池)

    commons-pool和commons-pool2是用来建立对象池的框架,提供了一些将对象池化必须要实现的接口和一些默认动作。对象池化之后可以通过pool的概...

    用户1225216
  • springboot mybatis优雅的添加多数据源

    springboot的原则是简化配置,本文试图不通过xml配置,使用configuration配置数据源,并进行简单的数据访问。 并且配置了多数据源,在开发过程...

    用户1225216
  • 大数据算法设计模式(2) - 左外链接(leftOuterJoin) spark实现

    左外链接(leftOuterJoin) spark实现 package com.kangaroo.studio.algorithms.join; impor...

    用户1225216
  • 使用 C# (.NET Core) 实现命令设计模式 (Command Pattern)

    本文的概念内容来自深入浅出设计模式一书. 项目需求 ? 有这样一个可编程的新型遥控器, 它有7个可编程插槽, 每个插槽可连接不同的家用电器设备. 每个插槽对应两...

    solenovex
  • 魔改 TypeAdapterFactory

    感慨:Retrofit2 虽好,但是,有时候总感觉 Java 这门语言还是美中不足啊!

    HelloVass
  • 数据处理经验总结·大数据文件处理参考值

    4g文本文件,56GB系统内存,20GB堆内存。 全部先读入List<String[]>,一行对应一个String[],读入阶段CPU使用100%,然后所有Li...

    陈黎栋
  • Spring官网阅读(十一)ApplicationContext详细介绍(上)

    从上图中可以发现,ApplicationContext接口继承了很多接口,这些接口我们可以将其分为五类:

    程序员DMZ
  • 使用Redis实现延时任务(一)

    最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。

    Throwable
  • 「超级 AI」的种子?复杂到人类难以评价的问题,可以教会一个 AI

    AI 科技评论按:正如我们仍在自然语言处理的漫漫征途上摸索,AI 安全的课题也仍然没有得到系统的解决。作为前沿探索的积极分子,OpenAI 也不断提出新的思路,...

    AI科技评论
  • TDD练功房之FizzBuzz

    题目内容 有一名体育老师,在某次离下课还有五分钟时,决定玩一个报数游戏。此时有100名学生在上课,游戏的规则如下:

    用户1130025

扫码关注云+社区

领取腾讯云代金券