首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >大数据领域开山鼻祖组件Hadoop核心源码剖析

大数据领域开山鼻祖组件Hadoop核心源码剖析

作者头像
老周聊架构
发布2025-11-20 10:57:36
发布2025-11-20 10:57:36
60
举报

一、源码阅读准备

1、下载Apache Hadoop-2.9.2官方源码

可以作参考,只不过Spring源码是gradle编译,我们这的Hadoop要用maven编译。

2、将源码导入idea中

启动idea在提示界面选择导入

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

等待下载和解决依赖完成,源码导入成功!!!

在这里插入图片描述
在这里插入图片描述

二、NameNode 启动流程

命令启动Hdfs集群

代码语言:javascript
复制
start-dfs.sh

该命令会启动HdfsNameNode以及DataNode,启动NameNode主要是通过 org.apache.hadoop.hdfs.server.namenode.NameNode类。

我们重点关注NameNode在启动过程中做了哪些工作(偏离主线的技术细节不深究)

对于分析启动流程主要关注两部分代码:

代码语言:javascript
复制
public class NameNode extends ReconfigurableBase implements
    NameNodeStatusMXBean {
    //该静态代码块主要是初始化一些HDFS的配置信息
 static {
  //进入之后发现方法是空的,没有任何操作?其实不是观察HdfsConfiguration的静态代码块
   HdfsConfiguration.init();
 }
 ...
}
代码语言:javascript
复制
public class HdfsConfiguration extends Configuration {
 static {
  addDeprecatedKeys();
  
  // adds the default resources
  Configuration.addDefaultResource("hdfs-default.xml");
  Configuration.addDefaultResource("hdfs-site.xml");
 }
 ...
}

NameNode#main

代码语言:javascript
复制
//main方法
public static void main(String argv[]) throws Exception {
//分析传入的参数是否为帮助参数,如果是帮助的话打印帮助信息,并退出。
    if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
      System.exit(0);
    }

    try {
      //格式化输出启动信息,并且创建hook(打印节点关闭信息)
      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
      //创建namenode
      NameNode namenode = createNameNode(argv, null);
      if (namenode != null) {
        //加入集群 
       namenode.join();
      }
    } catch (Throwable e) {
      //异常处理
      LOG.error("Failed to start namenode.", e);
      terminate(1, e);
    }
  }
代码语言:javascript
复制
//关注createNameNode
public static NameNode createNameNode(String argv[], Configuration conf)
      throws IOException {
    LOG.info("createNameNode " + Arrays.asList(argv));
    if (conf == null)
      conf = new HdfsConfiguration();
    // Parse out some generic args into Configuration.
    GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
    argv = hParser.getRemainingArgs();
    // Parse the rest, NN specific args.
    //解析启动的参数
    StartupOption startOpt = parseArguments(argv);
    if (startOpt == null) {
      printUsage(System.err);
      returnnull;
    }
    setStartupOption(conf, startOpt);

    switch (startOpt) {
      case FORMAT: {
        boolean aborted = format(conf, startOpt.getForceFormat(),
            startOpt.getInteractiveFormat());
        terminate(aborted ? 1 : 0);
        returnnull; // avoid javac warning
      }
      case GENCLUSTERID: {
        System.err.println("Generating new cluster id:");
        System.out.println(NNStorage.newClusterID());
        terminate(0);
        returnnull;
      }
      case FINALIZE: {
        System.err.println("Use of the argument '" + StartupOption.FINALIZE +
            "' is no longer supported. To finalize an upgrade, start the NN " +
            " and then run `hdfs dfsadmin -finalizeUpgrade'");
        terminate(1);
        returnnull; // avoid javac warning
      }
      case ROLLBACK: {
        boolean aborted = doRollback(conf, true);
        terminate(aborted ? 1 : 0);
        returnnull; // avoid warning
      }
      case BOOTSTRAPSTANDBY: {
        String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
        int rc = BootstrapStandby.run(toolArgs, conf);
        terminate(rc);
        returnnull; // avoid warning
      }
      case INITIALIZESHAREDEDITS: {
        boolean aborted = initializeSharedEdits(conf,
            startOpt.getForceFormat(),
            startOpt.getInteractiveFormat());
        terminate(aborted ? 1 : 0);
        returnnull; // avoid warning
      }
      case BACKUP:
      case CHECKPOINT: {
        NamenodeRole role = startOpt.toNodeRole();
        DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
        returnnew BackupNode(conf, role);
      }
      case RECOVER: {
        NameNode.doRecovery(startOpt, conf);
        returnnull;
      }
      case METADATAVERSION: {
        printMetadataVersion(conf);
        terminate(0);
        returnnull; // avoid javac warning
      }
      case UPGRADEONLY: {
        DefaultMetricsSystem.initialize("NameNode");
        new NameNode(conf);
        terminate(0);
        returnnull;
      }
      default: {//正常启动进入该分支
       //初始化metric系统
        DefaultMetricsSystem.initialize("NameNode");
        //返回新的NameNode
        returnnew NameNode(conf);
      }
    }
}
代码语言:javascript
复制
//NameNode的构造
public NameNode(Configuration conf) throws IOException {
 this(conf, NamenodeRole.NAMENODE);
}
代码语言:javascript
复制
protected NameNode(Configuration conf, NamenodeRole role)
    throws IOException {
super(conf);
this.tracer = new Tracer.Builder("NameNode").
     conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)).
     build();
this.tracerConfigurationManager =
     new TracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf);
this.role = role;
// 设置NameNode#clientNamenodeAddress为"hdfs://localhost:9000"
 setClientNamenodeAddress(conf);
 String nsId = getNameServiceId(conf);
 String namenodeId = HAUtil.getNameNodeId(conf, nsId);
// HA相关
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
 state = createHAState(getStartupOption(conf));
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
   initializeGenericKeys(conf, nsId, namenodeId);
   // 完成实际的初始化工作
   initialize(getConf());
   // HA相关
   try {
     haContext.writeLock();
     state.prepareToEnterState(haContext);
     state.enterState(haContext);
   } finally {
     haContext.writeUnlock();
   }
 } catch (IOException e) {
   this.stopAtException(e);
   throw e;
 } catch (HadoopIllegalArgumentException e) {
   this.stopAtException(e);
   throw e;
 }
this.started.set(true);
}

继续看下 initialize(getConf());

尽管本地没有开启HA(haEnabled=false)namenode依然拥有一个HAStatenamenodeHAState状态为active.

代码语言:javascript
复制
// 完成实际的初始化工作
protected void initialize(Configuration conf) throws IOException {
if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
    String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
    if (intervals != null) {
      conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
        intervals);
    }
  }

  UserGroupInformation.setConfiguration(conf);
  loginAsNameNodeUser(conf);

// 初始化metric
  NameNode.initMetrics(conf, this.getRole());
  StartupProgressMetrics.register(startupProgress);

  pauseMonitor = new JvmPauseMonitor();
  pauseMonitor.init(conf);
  pauseMonitor.start();
  metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

// 启动httpServer
if (NamenodeRole.NAMENODE == role) {
    startHttpServer(conf);
  }

// 从namenode目录加载fsimage与editlog,初始化FsNamesystem、FsDirectory、 LeaseManager等
  loadNamesystem(conf);

// 创建RpcServer,封装了NameNodeRpcServer clientRpcServer,支持 ClientNamenodeProtocol、DatanodeProtocolPB等协议
  rpcServer = createRpcServer(conf);

  initReconfigurableBackoffKey();

if (clientNamenodeAddress == null) {
    // This is expected for MiniDFSCluster. Set it now using 
    // the RPC server's bind address.
    clientNamenodeAddress = 
        NetUtils.getHostPortString(getNameNodeAddress());
    LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
        + " this namenode/service.");
  }
if (NamenodeRole.NAMENODE == role) {
    httpServer.setNameNodeAddress(getNameNodeAddress());
    httpServer.setFSImage(getFSImage());
  }

// 启动执行多个非常重要工作的多个线程
  startCommonServices(conf);
  startMetricsLogger(conf);
}

NameNode#startCommonServices

代码语言:javascript
复制
private void startCommonServices(Configuration conf) throws IOException {
// 创建NameNodeResourceChecker、激活BlockManager等
  namesystem.startCommonServices(conf, haContext);
  registerNNSMXBean();
// 角色非`NamenodeRole.NAMENODE`的在此处启动HttpServer
if (NamenodeRole.NAMENODE != role) {
    startHttpServer(conf);
    httpServer.setNameNodeAddress(getNameNodeAddress());
    httpServer.setFSImage(getFSImage());
  }
// 启动RPCServer
  rpcServer.start();
try {
    plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
        ServicePlugin.class);
  } catch (RuntimeException e) {
    String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY);
    LOG.error("Unable to load NameNode plugins. Specified list of plugins: " +
        pluginsValue, e);
    throw e;
  }
// 启动各插件
for (ServicePlugin p: plugins) {
    try {
      p.start(this);
    } catch (Throwable t) {
      LOG.warn("ServicePlugin " + p + " could not be started", t);
    }
  }
  LOG.info(getRole() + " RPC up at: " + getNameNodeAddress());
if (rpcServer.getServiceRpcAddress() != null) {
    LOG.info(getRole() + " service RPC up at: "
        + rpcServer.getServiceRpcAddress());
  }
}

FSNamesystem#startCommonServices

代码语言:javascript
复制
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
  writeLock();
this.haContext = haContext;
try {
    // 创建NameNodeResourceChecker,并立即检查一次
    nnResourceChecker = new NameNodeResourceChecker(conf);
    checkAvailableResources();
    assert !blockManager.isPopulatingReplQueues();
    // 设置一些启动过程中的信息
    StartupProgress prog = NameNode.getStartupProgress();
    prog.beginPhase(Phase.SAFEMODE);
    long completeBlocksTotal = getCompleteBlocksTotal();
    prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
        completeBlocksTotal);
    // 设置已完成的数据块总量以及激活BlockManager
    blockManager.activate(conf, completeBlocksTotal);
  } finally {
    writeUnlock("startCommonServices");
  }

  registerMXBean();
  DefaultMetricsSystem.instance().register(this);
if (inodeAttributeProvider != null) {
    inodeAttributeProvider.start();
    dir.setINodeAttributeProvider(inodeAttributeProvider);
  }
  snapshotManager.registerMXBean();
  InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
this.nameNodeHostName = (serviceAddress != null) ?
      serviceAddress.getHostName() : "";
}
代码语言:javascript
复制
public void activate(Configuration conf, long blockTotal) {
// 启动PendingReplicationMonitor
  pendingReplications.start();
// 激活DatanodeManager:启动DecommissionManager--Monitor、HeartbeatManager--Monitor
  datanodeManager.activate(conf);
this.replicationThread.setName("ReplicationMonitor");
// 启动BlockManager--ReplicationMonitor
this.replicationThread.start();
this.blockReportThread.start();
  mxBeanName = MBeans.register("NameNode", "BlockStats", this);
  bmSafeMode.activate(blockTotal);
}

namenode的主要责任是文件元信息与数据块映射的管理。相应的,namenode的启动流程需要关注与客户端、datanode通信的工作线程,文件元信息的管理机制,数据块的管理机制等。其中, RpcServer主要负责与客户端、datanode通信,FSDirectory主要负责管理文件元信息。

三、DataNode 启动流程

datanodeMain ClassDataNode,先找到DataNode.main()

代码语言:javascript
复制
public class DataNode extends ReconfigurableBase
    implements InterDatanodeProtocol, ClientDatanodeProtocol,
        TraceAdminProtocol, DataNodeMXBean, ReconfigurationProtocol {
publicstaticfinal Logger LOG = LoggerFactory.getLogger(DataNode.class);

static{
    HdfsConfiguration.init();
  }

  ...

public static void main(String args[]) {
    if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
      System.exit(0);
    }
    secureMain(args, null);
  }

public static void secureMain(String args[], SecureResources resources) {
    int errorCode = 0;
    try {
      // 打印启动信息
      StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
      // 完成创建datanode的主要工作
      DataNode datanode = createDataNode(args, null, resources);
      if (datanode != null) {
        datanode.join();
      } else {
        errorCode = 1;
      }
    } catch (Throwable e) {
      LOG.error("Exception in secureMain", e);
      terminate(1, e);
    } finally {
      // We need to terminate the process here because either shutdown was called
      // or some disk related conditions like volumes tolerated or volumes required
      // condition was not met. Also, In secure mode, control will go to Jsvc
      // and Datanode process hangs if it does not exit.
      LOG.warn("Exiting Datanode");
      terminate(errorCode);
    }
  }
}
代码语言:javascript
复制
public static DataNode createDataNode(String args[], Configuration conf,
    SecureResources resources) throws IOException {
  // 完成大部分初始化的工作,并启动部分工作线程
  DataNode dn = instantiateDataNode(args, conf, resources);
  if (dn != null) {
    // 启动剩余工作线程
    dn.runDatanodeDaemon();
  }
  return dn;
}
代码语言:javascript
复制
public void runDatanodeDaemon() throws IOException {
// 在DataNode.instantiateDataNode()执行过程中会调用该方法(见后)
  blockPoolManager.startAll();

// start dataXceiveServer
  dataXceiverServer.start();
if (localDataXceiverServer != null) {
    localDataXceiverServer.start();
  }
  ipcServer.setTracer(tracer);
  ipcServer.start();
  startPlugins(getConf());
}
代码语言:javascript
复制
public static DataNode instantiateDataNode(String args[],
 Configuration conf) throws IOException {
return instantiateDataNode(args, conf, null);
}

public static DataNode instantiateDataNode(String args [], Configuration conf,
    SecureResources resources) throws IOException {
if (conf == null)
    conf = new HdfsConfiguration();

if (args != null) {
    // parse generic hadoop options
    GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
    args = hParser.getRemainingArgs();
  }

if (!parseArguments(args, conf)) {
    printUsage(System.err);
    returnnull;
  }
  Collection<StorageLocation> dataLocations = getStorageLocations(conf);
  UserGroupInformation.setConfiguration(conf);
  SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
      DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, getHostName(conf));
return makeInstance(dataLocations, conf, resources);
}
代码语言:javascript
复制
// DataNode.makeInstance()开始创建DataNode
static DataNode makeInstance(Collection<StorageLocation> dataDirs,
    Configuration conf, SecureResources resources) throws IOException {
  List<StorageLocation> locations;
  StorageLocationChecker storageLocationChecker =
      new StorageLocationChecker(conf, new Timer());
try {
    // 检查数据目录的权限
    locations = storageLocationChecker.check(conf, dataDirs);
  } catch (InterruptedException ie) {
    thrownew IOException("Failed to instantiate DataNode", ie);
  }
  DefaultMetricsSystem.initialize("DataNode");

assert locations.size() > 0 : "number of data directories should be > 0";
returnnew DataNode(conf, locations, storageLocationChecker, resources);
}
代码语言:javascript
复制
DataNode(final Configuration conf,
         final List<StorageLocation> dataDirs,
         final StorageLocationChecker storageLocationChecker,
         final SecureResources resources) throws IOException {
super(conf);
this.tracer = createTracer(conf);
this.tracerConfigurationManager =
      new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
this.fileIoProvider = new FileIoProvider(conf, this);
this.blockScanner = new BlockScanner(this);
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
      DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);

this.usersWithLocalPathAccess = Arrays.asList(
      conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
this.connectToDnViaHostname = conf.getBoolean(
      DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
      DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.getHdfsBlockLocationsEnabled = conf.getBoolean(
      DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
      DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
      DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.isPermissionEnabled = conf.getBoolean(
      DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
      DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
this.pipelineSupportECN = conf.getBoolean(
      DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED,
      DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT);

  confVersion = "core-" +
      conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
      ",hdfs-" +
      conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");

this.volumeChecker = new DatasetVolumeChecker(conf, new Timer());

// Determine whether we should try to pass file descriptors to clients.
if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
            HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT)) {
    String reason = DomainSocket.getLoadingFailureReason();
    if (reason != null) {
      LOG.warn("File descriptor passing is disabled because " + reason);
      this.fileDescriptorPassingDisabledReason = reason;
    } else {
      LOG.info("File descriptor passing is enabled.");
      this.fileDescriptorPassingDisabledReason = null;
    }
  } else {
    this.fileDescriptorPassingDisabledReason =
        "File descriptor passing was not configured.";
    LOG.debug(this.fileDescriptorPassingDisabledReason);
  }

try {
    hostName = getHostName(conf);
    LOG.info("Configured hostname is " + hostName);
    startDataNode(dataDirs, resources);
  } catch (IOException ie) {
    shutdown();
    throw ie;
  }
finalint dncCacheMaxSize =
      conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY,
          DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ;
  datanodeNetworkCounts =
      CacheBuilder.newBuilder()
          .maximumSize(dncCacheMaxSize)
          .build(new CacheLoader<String, Map<String, Long>>() {
            @Override
            public Map<String, Long> load(String key) throws Exception {
              final Map<String, Long> ret = new HashMap<String, Long>();
              ret.put("networkErrors", 0L);
              return ret;
            }
          });

  initOOBTimeout();
this.storageLocationChecker = storageLocationChecker;
}
代码语言:javascript
复制
void startDataNode(List<StorageLocation> dataDirectories,
                   SecureResources resources
                   ) throws IOException {

// settings global for all BPs in the Data Node
this.secureResources = resources;
synchronized (this) {
    this.dataDirs = dataDirectories;
  }
this.dnConf = new DNConf(this);
  checkSecureConfig(dnConf, getConf(), resources);

if (dnConf.maxLockedMemory > 0) {
    if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
      thrownew RuntimeException(String.format(
          "Cannot start datanode because the configured max locked memory" +
          " size (%s) is greater than zero and native code is not available.",
          DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
    }
    if (Path.WINDOWS) {
      NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
    } else {
      long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
      if (dnConf.maxLockedMemory > ulimit) {
        thrownew RuntimeException(String.format(
          "Cannot start datanode because the configured max locked memory" +
          " size (%s) of %d bytes is more than the datanode's available" +
          " RLIMIT_MEMLOCK ulimit of %d bytes.",
          DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
          dnConf.maxLockedMemory,
          ulimit));
      }
    }
  }
  LOG.info("Starting DataNode with maxLockedMemory = " +
      dnConf.maxLockedMemory);

int volFailuresTolerated = dnConf.getVolFailuresTolerated();
int volsConfigured = dnConf.getVolsConfigured();
if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) {
    thrownew DiskErrorException("Invalid value configured for "
        + "dfs.datanode.failed.volumes.tolerated - " + volFailuresTolerated
        + ". Value configured is either less than 0 or >= "
        + "to the number of configured volumes (" + volsConfigured + ").");
  }

// 初始化DataStorage
  storage = new DataStorage();

// global DN settings
// 注册JMX
  registerMXBean();
// 初始化DataXceiver(流式通信),DataNode runDatanodeDaemon()中启动
  initDataXceiver();
// 启动InfoServer(Web UI)
  startInfoServer();
// 启动JVMPauseMonitor(反向监控JVM情况,可通过JMX查询)
  pauseMonitor = new JvmPauseMonitor();
  pauseMonitor.init(getConf());
  pauseMonitor.start();

// BlockPoolTokenSecretManager is required to create ipc server.
this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();

// Login is done by now. Set the DN user name.
  dnUserName = UserGroupInformation.getCurrentUser().getUserName();
  LOG.info("dnUserName = " + dnUserName);
  LOG.info("supergroup = " + supergroup);
// 初始化IpcServer(RPC通信),DataNode-runDatanodeDaemon()中启动
  initIpcServer();

  metrics = DataNodeMetrics.create(getConf(), getDisplayName());
  peerMetrics = dnConf.peerStatsEnabled ?
      DataNodePeerMetrics.create(getDisplayName()) : null;
  metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

  blockRecoveryWorker = new BlockRecoveryWorker(this);

// 按照namespace(nameservice)、namenode的结构进行初始化
  blockPoolManager = new BlockPoolManager(this);
  blockPoolManager.refreshNamenodes(getConf());

// Create the ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool.
  readaheadPool = ReadaheadPool.getInstance();
  saslClient = new SaslDataTransferClient(dnConf.getConf(),
      dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
  saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
  startMetricsLogger();

if (dnConf.diskStatsEnabled) {
    diskMetrics = new DataNodeDiskMetrics(this,
        dnConf.outliersReportIntervalMs);
  }
}

BlockPoolManager#refreshNamenodes()

代码语言:javascript
复制
//BlockPoolManager抽象了datanode提供的数据块存储服务。BlockPoolManager按照 namespace(nameservice)、namenode结构组织。 
//除了初始化过程主动调用,还可以由namespace通过datanode心跳过程下达刷新命令
void refreshNamenodes(Configuration conf)
    throws IOException {
  LOG.info("Refresh request received for nameservices: " + conf.get
          (DFSConfigKeys.DFS_NAMESERVICES));

  Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
          .getNNServiceRpcAddressesForCluster(conf);
  Map<String, Map<String, InetSocketAddress>> newLifelineAddressMap = DFSUtil
          .getNNLifelineRpcAddressesForCluster(conf);

synchronized (refreshNamenodesLock) {
    doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
  }
}

BlockPoolManager#doRefreshNamenodes()

代码语言:javascript
复制
private void doRefreshNamenodes(
    Map<String, Map<String, InetSocketAddress>> addrMap,
    Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
    throws IOException {
assert Thread.holdsLock(refreshNamenodesLock);

  Set<String> toRefresh = Sets.newLinkedHashSet();
  Set<String> toAdd = Sets.newLinkedHashSet();
  Set<String> toRemove;

synchronized (this) {
    // Step 1. For each of the new nameservices, figure out whether
    // it's an update of the set of NNs for an existing NS,
    // or an entirely new nameservice.
    for (String nameserviceId : addrMap.keySet()) {
      if (bpByNameserviceId.containsKey(nameserviceId)) {
        toRefresh.add(nameserviceId);
      } else {
        toAdd.add(nameserviceId);
      }
    }
    
    // Step 2. Any nameservices we currently have but are no longer present
    // need to be removed.
    toRemove = Sets.newHashSet(Sets.difference(
        bpByNameserviceId.keySet(), addrMap.keySet()));
    
    assert toRefresh.size() + toAdd.size() ==
      addrMap.size() :
        "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
        "  toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
        "  toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);

    
    // Step 3. Start new nameservices
    if (!toAdd.isEmpty()) {
      LOG.info("Starting BPOfferServices for nameservices: " +
          Joiner.on(",").useForNull("<default>").join(toAdd));
    
      for (String nsToAdd : toAdd) {
        Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToAdd);
        Map<String, InetSocketAddress> nnIdToLifelineAddr =
            lifelineAddrMap.get(nsToAdd);
        ArrayList<InetSocketAddress> addrs =
            Lists.newArrayListWithCapacity(nnIdToAddr.size());
        ArrayList<InetSocketAddress> lifelineAddrs =
            Lists.newArrayListWithCapacity(nnIdToAddr.size());
        for (String nnId : nnIdToAddr.keySet()) {
          addrs.add(nnIdToAddr.get(nnId));
          lifelineAddrs.add(nnIdToLifelineAddr != null ?
              nnIdToLifelineAddr.get(nnId) : null);
        }
        // 为每个namespace创建对应的BPOfferService
        BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);
        bpByNameserviceId.put(nsToAdd, bpos);
        offerServices.add(bpos);
      }
    }
    // 然后通过startAll启动所有BPOfferService
    startAll();
  }

// Step 4. Shut down old nameservices. This happens outside
// of the synchronized(this) lock since they need to call
// back to .remove() from another thread
if (!toRemove.isEmpty()) {
    LOG.info("Stopping BPOfferServices for nameservices: " +
        Joiner.on(",").useForNull("<default>").join(toRemove));
    
    for (String nsToRemove : toRemove) {
      BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
      bpos.stop();
      bpos.join();
      // they will call remove on their own
    }
  }

// Step 5. Update nameservices whose NN list has changed
if (!toRefresh.isEmpty()) {
    LOG.info("Refreshing list of NNs for nameservices: " +
        Joiner.on(",").useForNull("<default>").join(toRefresh));
    
    for (String nsToRefresh : toRefresh) {
      final BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
      Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToRefresh);
      Map<String, InetSocketAddress> nnIdToLifelineAddr =
          lifelineAddrMap.get(nsToRefresh);
      final ArrayList<InetSocketAddress> addrs =
          Lists.newArrayListWithCapacity(nnIdToAddr.size());
      final ArrayList<InetSocketAddress> lifelineAddrs =
          Lists.newArrayListWithCapacity(nnIdToAddr.size());
      for (String nnId : nnIdToAddr.keySet()) {
        addrs.add(nnIdToAddr.get(nnId));
        lifelineAddrs.add(nnIdToLifelineAddr != null ?
            nnIdToLifelineAddr.get(nnId) : null);
      }
      try {
        UserGroupInformation.getLoginUser()
            .doAs(new PrivilegedExceptionAction<Object>() {
              @Override
              public Object run() throws Exception {
                bpos.refreshNNList(addrs, lifelineAddrs);
                returnnull;
              }
            });
      } catch (InterruptedException ex) {
        IOException ioe = new IOException();
        ioe.initCause(ex.getCause());
        throw ioe;
      }
    }
  }
}

BlockPoolManager#createBPOS()

代码语言:javascript
复制
protected BPOfferService createBPOS(
    final String nameserviceId,
    List<InetSocketAddress> nnAddrs,
    List<InetSocketAddress> lifelineNnAddrs) {
  return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn);
}
代码语言:javascript
复制
BPOfferService(
    final String nameserviceId,
    List<InetSocketAddress> nnAddrs,
    List<InetSocketAddress> lifelineNnAddrs,
    DataNode dn) {
  Preconditions.checkArgument(!nnAddrs.isEmpty(),
      "Must pass at least one NN.");
  Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(),
      "Must pass same number of NN addresses and lifeline addresses.");
this.nameserviceId = nameserviceId;
this.dn = dn;

for (int i = 0; i < nnAddrs.size(); ++i) {
    this.bpServices.add(new BPServiceActor(nnAddrs.get(i),
        lifelineNnAddrs.get(i), this));
  }
}

然后再来看下BlockPoolManager#doRefreshNamenodes()中的startAll()

代码语言:javascript
复制
//BlockPoolManager#startAll()启动所有BPOfferService(实际是启动所有 BPServiceActor)。
synchronized void startAll() throws IOException {
try {
    UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<Object>() {
          @Override
          public Object run() throws Exception {
            for (BPOfferService bpos : offerServices) {
              bpos.start();
            }
            returnnull;
          }
        });
  } catch (InterruptedException ex) {
    IOException ioe = new IOException();
    ioe.initCause(ex.getCause());
    throw ioe;
  }
}

最后我们来看下:DataNode#initBlockPool()

datanode启动的主流程中,启动了多种工作线程,包括InfoServerJVMPauseMonitorBPServiceActor等。其中,最重要的是BPServiceActor线程,真正代表datanodenamenode通信的正是BPServiceActor线程。

代码语言:javascript
复制
void initBlockPool(BPOfferService bpos) throws IOException {
  NamespaceInfo nsInfo = bpos.getNamespaceInfo();
if (nsInfo == null) {
    thrownew IOException("NamespaceInfo not found: Block pool " + bpos
        + " should have retrieved namespace info before initBlockPool.");
  }

  setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());

// Register the new block pool with the BP manager.
// 将blockpool注册到BlockPoolManager
  blockPoolManager.addBlockPool(bpos);

// In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc.
// 初步初始化存储结构
  initStorage(nsInfo);

// Exclude failed disks before initializing the block pools to avoid startup
// failures.
// 检查磁盘损坏
  checkDiskError();

// 将blockpool添加到FsDatasetIpml,并继续初始化存储结构
  data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
  blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
// 启动扫描器
  initDirectoryScanner(getConf());
}

四、NameNode如何支撑高并发访问(双缓冲机制)

高并发访问NameNode会遇到什么样的问题:

经过学习HDFS的元数据管理机制,Client每次请求NameNode修改一条元数据(比如说申请上传一个文件,都要写一条edits log,包括两个步骤:

  • 写入本地磁盘--edits文件
  • 通过网络传输给JournalNodes集群(Hadoop HA集群--结合zookeeper来学习)。

高并发的难点主要在于数据的多线程安全以及每个操作效率!!!

对于多线程安全:

NameNode在写edits log时几个原则:

  • 写入数据到edits_log必须保证每条edits都有一个全局顺序递增的transactionId(简称为txid), 这样才可以标识出来一条一条的edits的先后顺序。
  • 如果要保证每条edits的txid都是递增的,就必须得加同步锁。也就是每个线程修改了元数据,要写 一条edits 的时候,都必须按顺序排队获取锁后,才能生成一个递增的txid,代表这次要写的edits 的序号。

产生的问题:

如果每次都是在一个加锁的代码块里,生成txid,然后写磁盘文件edits log,这种既有同步锁又有写磁 盘操作非常耗时!!!

HDFS优化解决方案

问题产生的原因主要是在于,写edits时串行化排队生成自增txid + 写磁盘操作费时, HDFS的解决方案:

  • 串行化:使用分段锁
  • 写磁盘:使用双缓冲

分段加锁机制

首先各个线程依次第一次获取锁,生成顺序递增的txid,然后将edits写入内存双缓冲的区域1,接着就立马第一次释放锁了。趁着这个空隙,后面的线程就可以再次立马第一次获取锁,然后立即写自己的 edits到内存缓冲。

双缓冲机制

程序中将会开辟两份一模一样的内存空间,一个为bufCurrent,产生的数据会直接写入到这个 bufCurrent,而另一个叫bufReady,在bufCurrent数据写入(达到一定标准)后,两片内存就会 exchange(交换)。直接交换双缓冲的区域1和区域2。保证接收客户端写入数据请求的都是操作内存而不是同步写磁盘。

在这里插入图片描述
在这里插入图片描述

双缓冲源码分析,找到FsEditLog#logEdit()

代码语言:javascript
复制
void logEdit(final FSEditLogOp op) {
//是否同步的标识
boolean needsSync = false;
synchronized (this) {
    assert isOpenForWrite() :
      "bad state: " + state;
    
    // wait if an automatic sync is scheduled
    // 如果当前操作被其它线程调度,则等待1s钟
    waitIfAutoSyncScheduled();

    // check if it is time to schedule an automatic sync
    needsSync = doEditTransaction(op);
    if (needsSync) {
      // 标识bufCurrent满了,进行双缓冲刷写
      isAutoSyncScheduled = true;
    }
  }

// Sync the log if an automatic sync is required.
if (needsSync) {
    // 将缓冲区数据刷写到磁盘
    logSync();
  }
}

欢迎大家关注我的公众号【老周聊架构】,AI、大数据、云原生、物联网等相关领域的技术知识分享。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-07-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 老周聊架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、源码阅读准备
  • 二、NameNode 启动流程
  • 三、DataNode 启动流程
  • 四、NameNode如何支撑高并发访问(双缓冲机制)
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档