對namenode啟動時的相關操作及相關類有一個大體了解,后續深入研究時,再對本文進行補充
?
>實現類
HDFS 啟動腳本為 $HADOOP_HOME/sbin/start-dfs.sh ,查看 start-dfs.sh 可以看出, namenode 是通過 bin /hdfs 命令來啟動
$
vi
start-dfs.
sh
# namenodes
NAMENODES
=$($HADOOP_PREFIX/bin/hdfs getconf -
namenodes)
echo
"
Starting namenodes on [$NAMENODES]
"
"
$HADOOP_PREFIX/sbin/hadoop-daemons.sh
"
\
--config
"
$HADOOP_CONF_DIR
"
\
--hostnames
"
$NAMENODES
"
\
--script
"
$bin/hdfs
"
start namenode $nameStartOpt
#
---------------------------------------------------------
?
查看 $HADOOP_HOME/bin/hdfs ,可以找到 namenode 啟動所調用的 java 類。
$
vi
bin/
hdfs:
if
[
"
$COMMAND
"
=
"
namenode
"
] ;
then
CLASS
=
'
org.apache.hadoop.hdfs.server.namenode.NameNode
'
HADOOP_OPTS
=
"
$HADOOP_OPTS $HADOOP_NAMENODE_OPTS
"
?
>源碼查看
按照前文 hadoop2.5.2學習及實踐筆記(二)—— 編譯源代碼及導入源碼至 eclipse 步驟,源碼已經導入到 eclipse 中,快捷鍵 ctrl +shift+R 搜索并打開 NameNode .java 查看源碼
?
NameNode 類中有一個靜態代碼塊,表示在加載器加載 NameNode 類過程中的準備階段,就會執行代碼塊中的代碼。 HdfsConfiguration 的 init () 方法的方法體是空的,這里的作用是觸發對 HdfsConfiguration 的主動調用,從而保證在執行 NameNode 類相關調用時,如果 HdfsConfiguration 類沒有被加載和初始化,先觸發 HdfsConfiguration 的初始化過程。
//
org.apache.hadoop.hdfs.server.namenode.NameNode.java
static
{
//
HdfsConfiguration類init()方法:public static void init() {}
HdfsConfiguration.init();
}
?
查看其 main 方法,可以看出 namenode 相關操作的主要入口方法是 createNameNode ( String argv[], Configuration conf ) 方法。
//
org.apache.hadoop.hdfs.server.namenode.NameNode.java
public
static
void
main(String argv[])
throws
Exception {
if
(DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out,
true
)) {
System.exit(
0
);
}
try
{
//
打印namenode啟動或關閉日志信息
StringUtils.startupShutdownMessage(NameNode.
class
, argv, LOG);
//
namenode相關主要操作
NameNode namenode =
createNameNode
(argv,
null
);
if
(namenode !=
null
) {
//
向客戶端和datanode提供RPC服務,直到RPC服務器結束運行
namenode.join();
}
}
catch
(Throwable e) {
LOG.fatal(
"Exception in namenode join"
, e);
terminate(
1
, e);
}
}
?
createNameNode 方法中通過一個 switch 語句對不同的命令執行不同的操作。比如搭建環境時格式化文件系統時的操作,可以查看FORMAT分支。
//
org.apache.hadoop.hdfs.server.namenode.NameNode.java
public
static
NameNode
createNameNode
(String argv[], Configuration conf)
throws
IOException {
LOG.info(
"createNameNode " +
Arrays.asList(argv));
if
(conf ==
null
)
conf
=
new
HdfsConfiguration();
//
參數為空時默認: -regular
StartupOption startOpt =
parseArguments(argv);
if
(startOpt ==
null
) {
printUsage(System.err);
return
null
;
}
setStartupOption(conf, startOpt);
switch
(startOpt) {
case
FORMAT: {
//
格式化文件系統,偽分布式環境搭建時調用過namenode -format命令
boolean
aborted =
format(conf, startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted
? 1 : 0
);
return
null
;
//
avoid javac warning
}
case
GENCLUSTERID: {
System.err.println(
"Generating new cluster id:"
);
System.out.println(NNStorage.newClusterID());
terminate(
0
);
return
null
;
}
case
FINALIZE: {
System.err.println(
"Use of the argument '" + StartupOption.FINALIZE +
"' is no longer supported. To finalize an upgrade, start the NN " +
" and then run `hdfs dfsadmin -finalizeUpgrade'"
);
terminate(
1
);
return
null
;
//
avoid javac warning
}
case
ROLLBACK: {
boolean
aborted = doRollback(conf,
true
);
terminate(aborted
? 1 : 0
);
return
null
;
//
avoid warning
}
case
BOOTSTRAPSTANDBY: {
String toolArgs[]
= Arrays.copyOfRange(argv, 1
, argv.length);
int
rc =
BootstrapStandby.run(toolArgs, conf);
terminate(rc);
return
null
;
//
avoid warning
}
case
INITIALIZESHAREDEDITS: {
boolean
aborted =
initializeSharedEdits(conf,
startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted
? 1 : 0
);
return
null
;
//
avoid warning
}
case
BACKUP:
case
CHECKPOINT: {
//
backupnode和checkpointnode啟動
NamenodeRole role =
startOpt.toNodeRole();
DefaultMetricsSystem.initialize(role.toString().replace(
" ", ""
));
//
backupnode繼承NameNode類,代碼最終執行的還是NameNode的構造方法
return
new
BackupNode(conf, role);
}
case
RECOVER: {
NameNode.doRecovery(startOpt, conf);
return
null
;
}
case
METADATAVERSION: {
printMetadataVersion(conf);
terminate(
0
);
return
null
;
//
avoid javac warning
}
default
: {
DefaultMetricsSystem.initialize(
"NameNode"
);
//
啟動時startOpt=“-regular”,代碼執行default分支,通過構造函數返回一個namenode實例
return
new
NameNode(conf);
}
}
}
?
namenode 的構造方法
//
org.apache.hadoop.hdfs.server.namenode.NameNode.java
public
NameNode(Configuration conf)
throws
IOException {
this
(conf, NamenodeRole.NAMENODE);
}
protected
NameNode(Configuration conf, NamenodeRole role)
throws
IOException {
this
.conf =
conf;
this
.role =
role;
//
獲取fs.defaultFS,設置namenode地址
setClientNamenodeAddress(conf);
String nsId
=
getNameServiceId(conf);
String namenodeId
=
HAUtil.getNameNodeId(conf, nsId);
//
是否啟用HA
this
.haEnabled =
HAUtil.isHAEnabled(conf, nsId);
//
HA狀態:啟用/備用
state =
createHAState(getStartupOption(conf));
//
讀取dfs.ha.allow.stale.reads,設置namenode在備用狀態時是否允許讀操作,默認為false
this
.allowStaleStandbyReads =
HAUtil.shouldAllowStandbyReads(conf);
this
.haContext =
createHAContext();
try
{
//
聯邦環境下,使用該方法配置一系列使用一個邏輯上的nsId組合在一起的namenode
initializeGenericKeys(conf, nsId, namenodeId);
//
namenode初始化
initialize(conf);
try
{
haContext.writeLock();
state.prepareToEnterState(haContext);
//
namenode進入相應狀態:active state/backup state/standby state
state.enterState(haContext);
}
finally
{
haContext.writeUnlock();
}
}
catch
(IOException e) {
this
.stop();
throw
e;
}
catch
(HadoopIllegalArgumentException e) {
this
.stop();
throw
e;
}
}
?
namenode初始化方法代碼
//
org.apache.hadoop.hdfs.server.namenode.NameNode.java
protected
void
initialize
(Configuration conf)
throws
IOException {
if
(conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) ==
null
) {
String intervals
=
conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
if
(intervals !=
null
) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
intervals);
}
}
//
設置權限,根據hadoop.security.authentication獲取認證方式及規則
UserGroupInformation.setConfiguration(conf);
//
登錄:如果認證方式為simple則退出該方法
//
否則調用UserGroupInformation.loginUserFromKeytab進行登陸,登陸使用dfs.namenode.kerberos.principal作為用戶名
loginAsNameNodeUser(conf);
//
初始化度量系統,用于度量namenode服務狀態
NameNode.initMetrics(conf,
this
.getRole());
StartupProgressMetrics.register(startupProgress);
if
(NamenodeRole.NAMENODE ==
role) {
//
啟動http服務器
startHttpServer(conf);
}
//根據命令對命名空間進行操作,如:前文所述啟動時
加載本地命名空間鏡像和應用編輯日志,在內存中建立命名空間的映像
loadNamesystem(conf);
//
創建RPC服務器
rpcServer =
createRpcServer(conf);
if
(clientNamenodeAddress ==
null
) {
//
This is expected for MiniDFSCluster. Set it now using
//
the RPC server's bind address.
clientNamenodeAddress =
NetUtils.getHostPortString(rpcServer.getRpcAddress());
LOG.info(
"Clients are to use " + clientNamenodeAddress + " to access"
+ " this namenode/service."
);
}
if
(NamenodeRole.NAMENODE ==
role) {
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
pauseMonitor
=
new
JvmPauseMonitor(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
//
啟動活動狀態和備用狀態的公共服務:RPC服務和namenode的插件程序啟動
startCommonServices(conf);
}
?
loadNamesystem(Configuration conf) 方法調用 FSNamesystem 類的 loadFromDisk(Configuration conf) 。前文提到的, namenode 啟動時從本地文件系統加載鏡像并重做編輯日志,都在此方法中實現。
//
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.java
static
FSNamesystem
loadFromDisk
(Configuration conf)
throws
IOException {
//
必須的編輯日志目錄檢查
checkConfiguration(conf);
//
設在NNStorage,并初始化編輯日志目錄。NNStorage主要功能是管理namenode使用的存儲目錄
FSImage fsImage =
new
FSImage(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
//
根據指定的鏡像創建FSNamesystem對象
FSNamesystem namesystem =
new
FSNamesystem(conf, fsImage,
false
);
StartupOption startOpt
=
NameNode.getStartupOption(conf);
if
(startOpt ==
StartupOption.RECOVER) {
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
}
long
loadStart =
now();
try
{
//
加載鏡像、重做編輯日志,并打開一個新編輯文件都在此方法中
namesystem.loadFSImage(startOpt);
}
catch
(IOException ioe) {
LOG.warn(
"Encountered exception loading fsimage"
, ioe);
fsImage.close();
throw
ioe;
}
long
timeTakenToLoadFSImage = now() -
loadStart;
LOG.info(
"Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs"
);
NameNodeMetrics nnMetrics
=
NameNode.getNameNodeMetrics();
if
(nnMetrics !=
null
) {
nnMetrics.setFsImageLoadTime((
int
) timeTakenToLoadFSImage);
}
return
namesystem;
}
private
void
loadFSImage
(StartupOption startOpt)
throws
IOException {
final
FSImage fsImage =
getFSImage();
//
format before starting up if requested
if
(startOpt ==
StartupOption.FORMAT) {
fsImage.format(
this
, fsImage.getStorage().determineClusterId());
//
reuse current id
startOpt
=
StartupOption.REGULAR;
}
boolean
success =
false
;
writeLock();
try
{
//
We shouldn't be calling saveNamespace if we've come up in standby state.
MetaRecoveryContext recovery =
startOpt.createRecoveryContext();
final boolean staleImage = fsImage.recoverTransitionRead(startOpt, this, recovery);
if
(RollingUpgradeStartupOption.ROLLBACK.matches(startOpt)) {
rollingUpgradeInfo
=
null
;
}
final
boolean
needToSave = staleImage && !haEnabled && !
isRollingUpgrade();
LOG.info(
"Need to save fs image? " +
needToSave
+ " (staleImage=" + staleImage + ", haEnabled=" +
haEnabled
+ ", isRollingUpgrade=" + isRollingUpgrade() + ")"
);
if
(needToSave) {
fsImage.saveNamespace(
this
);
}
else
{
//
No need to save, so mark the phase done.
StartupProgress prog =
NameNode.getStartupProgress();
prog.beginPhase(Phase.SAVING_CHECKPOINT);
prog.endPhase(Phase.SAVING_CHECKPOINT);
}
//
This will start a new log segment and write to the seen_txid file, so
//
we shouldn't do it when coming up in standby state
if
(!haEnabled || (haEnabled && startOpt ==
StartupOption.UPGRADE)) {
fsImage.openEditLogForWrite();
}
success
=
true
;
}
finally
{
if
(!
success) {
fsImage.close();
}
writeUnlock();
}
imageLoadComplete();
}
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

