1、下载Apache Hadoop-2.9.2官方源码
可以作参考,只不过Spring源码是gradle编译,我们这的Hadoop要用maven编译。
2、将源码导入idea中
启动idea在提示界面选择导入







等待下载和解决依赖完成,源码导入成功!!!

命令启动Hdfs集群
start-dfs.sh
该命令会启动Hdfs的NameNode以及DataNode,启动NameNode主要是通过 org.apache.hadoop.hdfs.server.namenode.NameNode类。
我们重点关注NameNode在启动过程中做了哪些工作(偏离主线的技术细节不深究)
对于分析启动流程主要关注两部分代码:
public class NameNode extends ReconfigurableBase implements
NameNodeStatusMXBean {
//该静态代码块主要是初始化一些HDFS的配置信息
static {
//进入之后发现方法是空的,没有任何操作?其实不是观察HdfsConfiguration的静态代码块
HdfsConfiguration.init();
}
...
}
public class HdfsConfiguration extends Configuration {
static {
addDeprecatedKeys();
// adds the default resources
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
}
...
}
NameNode#main
//main方法
public static void main(String argv[]) throws Exception {
//分析传入的参数是否为帮助参数,如果是帮助的话打印帮助信息,并退出。
if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
System.exit(0);
}
try {
//格式化输出启动信息,并且创建hook(打印节点关闭信息)
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
//创建namenode
NameNode namenode = createNameNode(argv, null);
if (namenode != null) {
//加入集群
namenode.join();
}
} catch (Throwable e) {
//异常处理
LOG.error("Failed to start namenode.", e);
terminate(1, e);
}
}
//关注createNameNode
public static NameNode createNameNode(String argv[], Configuration conf)
throws IOException {
LOG.info("createNameNode " + Arrays.asList(argv));
if (conf == null)
conf = new HdfsConfiguration();
// Parse out some generic args into Configuration.
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
argv = hParser.getRemainingArgs();
// Parse the rest, NN specific args.
//解析启动的参数
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage(System.err);
returnnull;
}
setStartupOption(conf, startOpt);
switch (startOpt) {
case FORMAT: {
boolean aborted = format(conf, startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
returnnull; // avoid javac warning
}
case GENCLUSTERID: {
System.err.println("Generating new cluster id:");
System.out.println(NNStorage.newClusterID());
terminate(0);
returnnull;
}
case FINALIZE: {
System.err.println("Use of the argument '" + StartupOption.FINALIZE +
"' is no longer supported. To finalize an upgrade, start the NN " +
" and then run `hdfs dfsadmin -finalizeUpgrade'");
terminate(1);
returnnull; // avoid javac warning
}
case ROLLBACK: {
boolean aborted = doRollback(conf, true);
terminate(aborted ? 1 : 0);
returnnull; // avoid warning
}
case BOOTSTRAPSTANDBY: {
String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
int rc = BootstrapStandby.run(toolArgs, conf);
terminate(rc);
returnnull; // avoid warning
}
case INITIALIZESHAREDEDITS: {
boolean aborted = initializeSharedEdits(conf,
startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
returnnull; // avoid warning
}
case BACKUP:
case CHECKPOINT: {
NamenodeRole role = startOpt.toNodeRole();
DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
returnnew BackupNode(conf, role);
}
case RECOVER: {
NameNode.doRecovery(startOpt, conf);
returnnull;
}
case METADATAVERSION: {
printMetadataVersion(conf);
terminate(0);
returnnull; // avoid javac warning
}
case UPGRADEONLY: {
DefaultMetricsSystem.initialize("NameNode");
new NameNode(conf);
terminate(0);
returnnull;
}
default: {//正常启动进入该分支
//初始化metric系统
DefaultMetricsSystem.initialize("NameNode");
//返回新的NameNode
returnnew NameNode(conf);
}
}
}
//NameNode的构造
public NameNode(Configuration conf) throws IOException {
this(conf, NamenodeRole.NAMENODE);
}
protected NameNode(Configuration conf, NamenodeRole role)
throws IOException {
super(conf);
this.tracer = new Tracer.Builder("NameNode").
conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)).
build();
this.tracerConfigurationManager =
new TracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf);
this.role = role;
// 设置NameNode#clientNamenodeAddress为"hdfs://localhost:9000"
setClientNamenodeAddress(conf);
String nsId = getNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
// HA相关
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
state = createHAState(getStartupOption(conf));
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
initializeGenericKeys(conf, nsId, namenodeId);
// 完成实际的初始化工作
initialize(getConf());
// HA相关
try {
haContext.writeLock();
state.prepareToEnterState(haContext);
state.enterState(haContext);
} finally {
haContext.writeUnlock();
}
} catch (IOException e) {
this.stopAtException(e);
throw e;
} catch (HadoopIllegalArgumentException e) {
this.stopAtException(e);
throw e;
}
this.started.set(true);
}
继续看下 initialize(getConf());
尽管本地没有开启HA(haEnabled=false),namenode依然拥有一个HAState,namenode 的HAState状态为active.
// 完成实际的初始化工作
protected void initialize(Configuration conf) throws IOException {
if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
if (intervals != null) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
intervals);
}
}
UserGroupInformation.setConfiguration(conf);
loginAsNameNodeUser(conf);
// 初始化metric
NameNode.initMetrics(conf, this.getRole());
StartupProgressMetrics.register(startupProgress);
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
// 启动httpServer
if (NamenodeRole.NAMENODE == role) {
startHttpServer(conf);
}
// 从namenode目录加载fsimage与editlog,初始化FsNamesystem、FsDirectory、 LeaseManager等
loadNamesystem(conf);
// 创建RpcServer,封装了NameNodeRpcServer clientRpcServer,支持 ClientNamenodeProtocol、DatanodeProtocolPB等协议
rpcServer = createRpcServer(conf);
initReconfigurableBackoffKey();
if (clientNamenodeAddress == null) {
// This is expected for MiniDFSCluster. Set it now using
// the RPC server's bind address.
clientNamenodeAddress =
NetUtils.getHostPortString(getNameNodeAddress());
LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
+ " this namenode/service.");
}
if (NamenodeRole.NAMENODE == role) {
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
// 启动执行多个非常重要工作的多个线程
startCommonServices(conf);
startMetricsLogger(conf);
}
NameNode#startCommonServices
private void startCommonServices(Configuration conf) throws IOException {
// 创建NameNodeResourceChecker、激活BlockManager等
namesystem.startCommonServices(conf, haContext);
registerNNSMXBean();
// 角色非`NamenodeRole.NAMENODE`的在此处启动HttpServer
if (NamenodeRole.NAMENODE != role) {
startHttpServer(conf);
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
// 启动RPCServer
rpcServer.start();
try {
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
ServicePlugin.class);
} catch (RuntimeException e) {
String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY);
LOG.error("Unable to load NameNode plugins. Specified list of plugins: " +
pluginsValue, e);
throw e;
}
// 启动各插件
for (ServicePlugin p: plugins) {
try {
p.start(this);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
LOG.info(getRole() + " RPC up at: " + getNameNodeAddress());
if (rpcServer.getServiceRpcAddress() != null) {
LOG.info(getRole() + " service RPC up at: "
+ rpcServer.getServiceRpcAddress());
}
}
FSNamesystem#startCommonServices
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
writeLock();
this.haContext = haContext;
try {
// 创建NameNodeResourceChecker,并立即检查一次
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
assert !blockManager.isPopulatingReplQueues();
// 设置一些启动过程中的信息
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAFEMODE);
long completeBlocksTotal = getCompleteBlocksTotal();
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
completeBlocksTotal);
// 设置已完成的数据块总量以及激活BlockManager
blockManager.activate(conf, completeBlocksTotal);
} finally {
writeUnlock("startCommonServices");
}
registerMXBean();
DefaultMetricsSystem.instance().register(this);
if (inodeAttributeProvider != null) {
inodeAttributeProvider.start();
dir.setINodeAttributeProvider(inodeAttributeProvider);
}
snapshotManager.registerMXBean();
InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
this.nameNodeHostName = (serviceAddress != null) ?
serviceAddress.getHostName() : "";
}
public void activate(Configuration conf, long blockTotal) {
// 启动PendingReplicationMonitor
pendingReplications.start();
// 激活DatanodeManager:启动DecommissionManager--Monitor、HeartbeatManager--Monitor
datanodeManager.activate(conf);
this.replicationThread.setName("ReplicationMonitor");
// 启动BlockManager--ReplicationMonitor
this.replicationThread.start();
this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal);
}
namenode的主要责任是文件元信息与数据块映射的管理。相应的,namenode的启动流程需要关注与客户端、datanode通信的工作线程,文件元信息的管理机制,数据块的管理机制等。其中, RpcServer主要负责与客户端、datanode通信,FSDirectory主要负责管理文件元信息。
datanode的Main Class是DataNode,先找到DataNode.main()
public class DataNode extends ReconfigurableBase
implements InterDatanodeProtocol, ClientDatanodeProtocol,
TraceAdminProtocol, DataNodeMXBean, ReconfigurationProtocol {
publicstaticfinal Logger LOG = LoggerFactory.getLogger(DataNode.class);
static{
HdfsConfiguration.init();
}
...
public static void main(String args[]) {
if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
System.exit(0);
}
secureMain(args, null);
}
public static void secureMain(String args[], SecureResources resources) {
int errorCode = 0;
try {
// 打印启动信息
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
// 完成创建datanode的主要工作
DataNode datanode = createDataNode(args, null, resources);
if (datanode != null) {
datanode.join();
} else {
errorCode = 1;
}
} catch (Throwable e) {
LOG.error("Exception in secureMain", e);
terminate(1, e);
} finally {
// We need to terminate the process here because either shutdown was called
// or some disk related conditions like volumes tolerated or volumes required
// condition was not met. Also, In secure mode, control will go to Jsvc
// and Datanode process hangs if it does not exit.
LOG.warn("Exiting Datanode");
terminate(errorCode);
}
}
}
public static DataNode createDataNode(String args[], Configuration conf,
SecureResources resources) throws IOException {
// 完成大部分初始化的工作,并启动部分工作线程
DataNode dn = instantiateDataNode(args, conf, resources);
if (dn != null) {
// 启动剩余工作线程
dn.runDatanodeDaemon();
}
return dn;
}
public void runDatanodeDaemon() throws IOException {
// 在DataNode.instantiateDataNode()执行过程中会调用该方法(见后)
blockPoolManager.startAll();
// start dataXceiveServer
dataXceiverServer.start();
if (localDataXceiverServer != null) {
localDataXceiverServer.start();
}
ipcServer.setTracer(tracer);
ipcServer.start();
startPlugins(getConf());
}
public static DataNode instantiateDataNode(String args[],
Configuration conf) throws IOException {
return instantiateDataNode(args, conf, null);
}
public static DataNode instantiateDataNode(String args [], Configuration conf,
SecureResources resources) throws IOException {
if (conf == null)
conf = new HdfsConfiguration();
if (args != null) {
// parse generic hadoop options
GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
args = hParser.getRemainingArgs();
}
if (!parseArguments(args, conf)) {
printUsage(System.err);
returnnull;
}
Collection<StorageLocation> dataLocations = getStorageLocations(conf);
UserGroupInformation.setConfiguration(conf);
SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, getHostName(conf));
return makeInstance(dataLocations, conf, resources);
}
// DataNode.makeInstance()开始创建DataNode
static DataNode makeInstance(Collection<StorageLocation> dataDirs,
Configuration conf, SecureResources resources) throws IOException {
List<StorageLocation> locations;
StorageLocationChecker storageLocationChecker =
new StorageLocationChecker(conf, new Timer());
try {
// 检查数据目录的权限
locations = storageLocationChecker.check(conf, dataDirs);
} catch (InterruptedException ie) {
thrownew IOException("Failed to instantiate DataNode", ie);
}
DefaultMetricsSystem.initialize("DataNode");
assert locations.size() > 0 : "number of data directories should be > 0";
returnnew DataNode(conf, locations, storageLocationChecker, resources);
}
DataNode(final Configuration conf,
final List<StorageLocation> dataDirs,
final StorageLocationChecker storageLocationChecker,
final SecureResources resources) throws IOException {
super(conf);
this.tracer = createTracer(conf);
this.tracerConfigurationManager =
new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
this.fileIoProvider = new FileIoProvider(conf, this);
this.blockScanner = new BlockScanner(this);
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
this.usersWithLocalPathAccess = Arrays.asList(
conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
this.connectToDnViaHostname = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.getHdfsBlockLocationsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.isPermissionEnabled = conf.getBoolean(
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
this.pipelineSupportECN = conf.getBoolean(
DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED,
DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT);
confVersion = "core-" +
conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
",hdfs-" +
conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
this.volumeChecker = new DatasetVolumeChecker(conf, new Timer());
// Determine whether we should try to pass file descriptors to clients.
if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT)) {
String reason = DomainSocket.getLoadingFailureReason();
if (reason != null) {
LOG.warn("File descriptor passing is disabled because " + reason);
this.fileDescriptorPassingDisabledReason = reason;
} else {
LOG.info("File descriptor passing is enabled.");
this.fileDescriptorPassingDisabledReason = null;
}
} else {
this.fileDescriptorPassingDisabledReason =
"File descriptor passing was not configured.";
LOG.debug(this.fileDescriptorPassingDisabledReason);
}
try {
hostName = getHostName(conf);
LOG.info("Configured hostname is " + hostName);
startDataNode(dataDirs, resources);
} catch (IOException ie) {
shutdown();
throw ie;
}
finalint dncCacheMaxSize =
conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY,
DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ;
datanodeNetworkCounts =
CacheBuilder.newBuilder()
.maximumSize(dncCacheMaxSize)
.build(new CacheLoader<String, Map<String, Long>>() {
@Override
public Map<String, Long> load(String key) throws Exception {
final Map<String, Long> ret = new HashMap<String, Long>();
ret.put("networkErrors", 0L);
return ret;
}
});
initOOBTimeout();
this.storageLocationChecker = storageLocationChecker;
}
void startDataNode(List<StorageLocation> dataDirectories,
SecureResources resources
) throws IOException {
// settings global for all BPs in the Data Node
this.secureResources = resources;
synchronized (this) {
this.dataDirs = dataDirectories;
}
this.dnConf = new DNConf(this);
checkSecureConfig(dnConf, getConf(), resources);
if (dnConf.maxLockedMemory > 0) {
if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
thrownew RuntimeException(String.format(
"Cannot start datanode because the configured max locked memory" +
" size (%s) is greater than zero and native code is not available.",
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
}
if (Path.WINDOWS) {
NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
} else {
long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
if (dnConf.maxLockedMemory > ulimit) {
thrownew RuntimeException(String.format(
"Cannot start datanode because the configured max locked memory" +
" size (%s) of %d bytes is more than the datanode's available" +
" RLIMIT_MEMLOCK ulimit of %d bytes.",
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
dnConf.maxLockedMemory,
ulimit));
}
}
}
LOG.info("Starting DataNode with maxLockedMemory = " +
dnConf.maxLockedMemory);
int volFailuresTolerated = dnConf.getVolFailuresTolerated();
int volsConfigured = dnConf.getVolsConfigured();
if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) {
thrownew DiskErrorException("Invalid value configured for "
+ "dfs.datanode.failed.volumes.tolerated - " + volFailuresTolerated
+ ". Value configured is either less than 0 or >= "
+ "to the number of configured volumes (" + volsConfigured + ").");
}
// 初始化DataStorage
storage = new DataStorage();
// global DN settings
// 注册JMX
registerMXBean();
// 初始化DataXceiver(流式通信),DataNode runDatanodeDaemon()中启动
initDataXceiver();
// 启动InfoServer(Web UI)
startInfoServer();
// 启动JVMPauseMonitor(反向监控JVM情况,可通过JMX查询)
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(getConf());
pauseMonitor.start();
// BlockPoolTokenSecretManager is required to create ipc server.
this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
// Login is done by now. Set the DN user name.
dnUserName = UserGroupInformation.getCurrentUser().getUserName();
LOG.info("dnUserName = " + dnUserName);
LOG.info("supergroup = " + supergroup);
// 初始化IpcServer(RPC通信),DataNode-runDatanodeDaemon()中启动
initIpcServer();
metrics = DataNodeMetrics.create(getConf(), getDisplayName());
peerMetrics = dnConf.peerStatsEnabled ?
DataNodePeerMetrics.create(getDisplayName()) : null;
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
blockRecoveryWorker = new BlockRecoveryWorker(this);
// 按照namespace(nameservice)、namenode的结构进行初始化
blockPoolManager = new BlockPoolManager(this);
blockPoolManager.refreshNamenodes(getConf());
// Create the ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool.
readaheadPool = ReadaheadPool.getInstance();
saslClient = new SaslDataTransferClient(dnConf.getConf(),
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
startMetricsLogger();
if (dnConf.diskStatsEnabled) {
diskMetrics = new DataNodeDiskMetrics(this,
dnConf.outliersReportIntervalMs);
}
}
BlockPoolManager#refreshNamenodes()
//BlockPoolManager抽象了datanode提供的数据块存储服务。BlockPoolManager按照 namespace(nameservice)、namenode结构组织。
//除了初始化过程主动调用,还可以由namespace通过datanode心跳过程下达刷新命令
void refreshNamenodes(Configuration conf)
throws IOException {
LOG.info("Refresh request received for nameservices: " + conf.get
(DFSConfigKeys.DFS_NAMESERVICES));
Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
.getNNServiceRpcAddressesForCluster(conf);
Map<String, Map<String, InetSocketAddress>> newLifelineAddressMap = DFSUtil
.getNNLifelineRpcAddressesForCluster(conf);
synchronized (refreshNamenodesLock) {
doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
}
}
BlockPoolManager#doRefreshNamenodes()
private void doRefreshNamenodes(
Map<String, Map<String, InetSocketAddress>> addrMap,
Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
throws IOException {
assert Thread.holdsLock(refreshNamenodesLock);
Set<String> toRefresh = Sets.newLinkedHashSet();
Set<String> toAdd = Sets.newLinkedHashSet();
Set<String> toRemove;
synchronized (this) {
// Step 1. For each of the new nameservices, figure out whether
// it's an update of the set of NNs for an existing NS,
// or an entirely new nameservice.
for (String nameserviceId : addrMap.keySet()) {
if (bpByNameserviceId.containsKey(nameserviceId)) {
toRefresh.add(nameserviceId);
} else {
toAdd.add(nameserviceId);
}
}
// Step 2. Any nameservices we currently have but are no longer present
// need to be removed.
toRemove = Sets.newHashSet(Sets.difference(
bpByNameserviceId.keySet(), addrMap.keySet()));
assert toRefresh.size() + toAdd.size() ==
addrMap.size() :
"toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
" toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
" toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);
// Step 3. Start new nameservices
if (!toAdd.isEmpty()) {
LOG.info("Starting BPOfferServices for nameservices: " +
Joiner.on(",").useForNull("<default>").join(toAdd));
for (String nsToAdd : toAdd) {
Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToAdd);
Map<String, InetSocketAddress> nnIdToLifelineAddr =
lifelineAddrMap.get(nsToAdd);
ArrayList<InetSocketAddress> addrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
ArrayList<InetSocketAddress> lifelineAddrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
for (String nnId : nnIdToAddr.keySet()) {
addrs.add(nnIdToAddr.get(nnId));
lifelineAddrs.add(nnIdToLifelineAddr != null ?
nnIdToLifelineAddr.get(nnId) : null);
}
// 为每个namespace创建对应的BPOfferService
BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);
bpByNameserviceId.put(nsToAdd, bpos);
offerServices.add(bpos);
}
}
// 然后通过startAll启动所有BPOfferService
startAll();
}
// Step 4. Shut down old nameservices. This happens outside
// of the synchronized(this) lock since they need to call
// back to .remove() from another thread
if (!toRemove.isEmpty()) {
LOG.info("Stopping BPOfferServices for nameservices: " +
Joiner.on(",").useForNull("<default>").join(toRemove));
for (String nsToRemove : toRemove) {
BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
bpos.stop();
bpos.join();
// they will call remove on their own
}
}
// Step 5. Update nameservices whose NN list has changed
if (!toRefresh.isEmpty()) {
LOG.info("Refreshing list of NNs for nameservices: " +
Joiner.on(",").useForNull("<default>").join(toRefresh));
for (String nsToRefresh : toRefresh) {
final BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToRefresh);
Map<String, InetSocketAddress> nnIdToLifelineAddr =
lifelineAddrMap.get(nsToRefresh);
final ArrayList<InetSocketAddress> addrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
final ArrayList<InetSocketAddress> lifelineAddrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
for (String nnId : nnIdToAddr.keySet()) {
addrs.add(nnIdToAddr.get(nnId));
lifelineAddrs.add(nnIdToLifelineAddr != null ?
nnIdToLifelineAddr.get(nnId) : null);
}
try {
UserGroupInformation.getLoginUser()
.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
bpos.refreshNNList(addrs, lifelineAddrs);
returnnull;
}
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
ioe.initCause(ex.getCause());
throw ioe;
}
}
}
}
BlockPoolManager#createBPOS()
protected BPOfferService createBPOS(
final String nameserviceId,
List<InetSocketAddress> nnAddrs,
List<InetSocketAddress> lifelineNnAddrs) {
return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn);
}
BPOfferService(
final String nameserviceId,
List<InetSocketAddress> nnAddrs,
List<InetSocketAddress> lifelineNnAddrs,
DataNode dn) {
Preconditions.checkArgument(!nnAddrs.isEmpty(),
"Must pass at least one NN.");
Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(),
"Must pass same number of NN addresses and lifeline addresses.");
this.nameserviceId = nameserviceId;
this.dn = dn;
for (int i = 0; i < nnAddrs.size(); ++i) {
this.bpServices.add(new BPServiceActor(nnAddrs.get(i),
lifelineNnAddrs.get(i), this));
}
}
然后再来看下BlockPoolManager#doRefreshNamenodes()中的startAll()
//BlockPoolManager#startAll()启动所有BPOfferService(实际是启动所有 BPServiceActor)。
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
for (BPOfferService bpos : offerServices) {
bpos.start();
}
returnnull;
}
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
ioe.initCause(ex.getCause());
throw ioe;
}
}
最后我们来看下:DataNode#initBlockPool()
在datanode启动的主流程中,启动了多种工作线程,包括InfoServer、JVMPauseMonitor、 BPServiceActor等。其中,最重要的是BPServiceActor线程,真正代表datanode与namenode通信的正是BPServiceActor线程。
void initBlockPool(BPOfferService bpos) throws IOException {
NamespaceInfo nsInfo = bpos.getNamespaceInfo();
if (nsInfo == null) {
thrownew IOException("NamespaceInfo not found: Block pool " + bpos
+ " should have retrieved namespace info before initBlockPool.");
}
setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
// Register the new block pool with the BP manager.
// 将blockpool注册到BlockPoolManager
blockPoolManager.addBlockPool(bpos);
// In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc.
// 初步初始化存储结构
initStorage(nsInfo);
// Exclude failed disks before initializing the block pools to avoid startup
// failures.
// 检查磁盘损坏
checkDiskError();
// 将blockpool添加到FsDatasetIpml,并继续初始化存储结构
data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
// 启动扫描器
initDirectoryScanner(getConf());
}
高并发访问NameNode会遇到什么样的问题:
经过学习HDFS的元数据管理机制,Client每次请求NameNode修改一条元数据(比如说申请上传一个文件,都要写一条edits log,包括两个步骤:
edits文件JournalNodes集群(Hadoop HA集群--结合zookeeper来学习)。高并发的难点主要在于数据的多线程安全以及每个操作效率!!!
对于多线程安全:
NameNode在写edits log时几个原则:
产生的问题:
如果每次都是在一个加锁的代码块里,生成txid,然后写磁盘文件edits log,这种既有同步锁又有写磁 盘操作非常耗时!!!
HDFS优化解决方案
问题产生的原因主要是在于,写edits时串行化排队生成自增txid + 写磁盘操作费时, HDFS的解决方案:
分段加锁机制
首先各个线程依次第一次获取锁,生成顺序递增的txid,然后将edits写入内存双缓冲的区域1,接着就立马第一次释放锁了。趁着这个空隙,后面的线程就可以再次立马第一次获取锁,然后立即写自己的 edits到内存缓冲。
双缓冲机制
程序中将会开辟两份一模一样的内存空间,一个为bufCurrent,产生的数据会直接写入到这个 bufCurrent,而另一个叫bufReady,在bufCurrent数据写入(达到一定标准)后,两片内存就会 exchange(交换)。直接交换双缓冲的区域1和区域2。保证接收客户端写入数据请求的都是操作内存而不是同步写磁盘。

双缓冲源码分析,找到FsEditLog#logEdit()
void logEdit(final FSEditLogOp op) {
//是否同步的标识
boolean needsSync = false;
synchronized (this) {
assert isOpenForWrite() :
"bad state: " + state;
// wait if an automatic sync is scheduled
// 如果当前操作被其它线程调度,则等待1s钟
waitIfAutoSyncScheduled();
// check if it is time to schedule an automatic sync
needsSync = doEditTransaction(op);
if (needsSync) {
// 标识bufCurrent满了,进行双缓冲刷写
isAutoSyncScheduled = true;
}
}
// Sync the log if an automatic sync is required.
if (needsSync) {
// 将缓冲区数据刷写到磁盘
logSync();
}
}
欢迎大家关注我的公众号【老周聊架构】,AI、大数据、云原生、物联网等相关领域的技术知识分享。