Hadoop NameNode启动之FSDirectiry

前言

作为Hadoop NameNode类启动分析篇的起始篇章。
我们先来了解一下FSDirectiry做了哪些功能点。

一切都因下面这段代码开始(一切的是命运之门的选择, 滑稽)

进入主题

我们先来看看NameNode的构造方法吧。
可以发现会创建一个FSNamesystem对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Create a NameNode at the default location
*/
public NameNode(Configuration conf) throws IOException {
this(getDir(conf), // 获取namenode存放路径文件夹
DataNode.createSocketAddr // 获取datanode端口信息
(conf.get("fs.default.name", "local")).getPort(), conf);
}
/**
* Create a NameNode at the specified location and start it.
*/
public NameNode(File dir, int port, Configuration conf) throws IOException {
this.namesystem = new FSNamesystem(dir, conf);
this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10); // NameNode用来处理来自DataNode的RPC请求的线程数量
this.server = RPC.getServer(this, port, handlerCount, false, conf);
this.server.start();
}

之后我们进入FSNamesystem观察

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public FSNamesystem(File dir, Configuration conf) throws IOException {
this.dir = new FSDirectory(dir); // fsimg和edits的初始化
this.hbthread = new Daemon(new HeartbeatMonitor()); // 心跳监控
this.lmthread = new Daemon(new LeaseMonitor()); // 租约监控
hbthread.start(); // 启动心跳线程
lmthread.start(); // 启动租约监控线程
this.systemStart = System.currentTimeMillis();
this.conf = conf;
this.desiredReplication = conf.getInt("dfs.replication", 3); // 备份数为3
this.maxReplication = desiredReplication;
this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
this.minReplication = 1; // 最小备份数量
this.heartBeatRecheck= 1000; // 1秒发送一次心跳
}

现在我们只关注FSDirectory类做了哪些功能即可。
为了观察方便, 我把需要研究的代码抽离出来了,并且写成一个测试类方便进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public FSDirectory(File dir) throws IOException {
// configuration中配置name的dir路径
File fullimage = new File(dir, "image");
// 如果不存在则表示没有对NameNode进行过初始化
if (! fullimage.exists()) {
throw new IOException("NameNode not formatted: " + dir);
}
// edits文件
File edits = new File(dir, "edits");
// 加载和保存
if (loadFSImage(fullimage, edits)) {
saveFSImage(fullimage, edits);
}
synchronized (this) {
this.ready = true;
this.notifyAll();
this.editlog = new DataOutputStream(new FileOutputStream(edits));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
public class FSDirectorTest {
class INode {
public String name;
public INode parent;
public TreeMap children = new TreeMap();
public Block blocks[];
/**
*/
INode(String name, INode parent, Block blocks[]) {
this.name = name;
this.parent = parent;
this.blocks = blocks;
}
/**
* 添加文件块, 其实就是把blk_id写入到children中(到时候在saveImage中使用)
* @param target
* @param blks
* @return
*/
INode addNode(String target, Block blks[]) {
if (getNode(target) != null) {
return null;
} else {
String parentName = DFSFile.getDFSParent(target);
if (parentName == null) {
return null;
}
INode parentNode = getNode(parentName);
if (parentNode == null) {
return null;
} else {
// 读取的fsimag文件数据, 这里的targetName其实就是我们HDFS上的文件名
// blks就是存储在DataNode上的blk文件名字
// parentNode是我们的父节点(头结点)
String targetName = new File(target).getName();
INode newItem = new INode(targetName, parentNode, blks);
// 之后会被saveImage进行使用
parentNode.children.put(targetName, newItem);
return newItem;
}
}
}
int numItemsInTree() {
int total = 0;
for (Iterator it = children.values().iterator(); it.hasNext();) {
INode child = (INode) it.next();
total += child.numItemsInTree();
}
return total + 1;
}
/**
* This is the external interface
*/
INode getNode(String target) {
if (!target.startsWith("/") || target.length() == 0) {
return null;
} else if (parent == null && "/".equals(target)) {
return this;
} else {
Vector components = new Vector();
int start = 0;
int slashid = 0;
while (start < target.length() && (slashid = target.indexOf('/', start)) >= 0) {
components.add(target.substring(start, slashid));
start = slashid + 1;
}
if (start < target.length()) {
components.add(target.substring(start));
}
return getNode(components, 0);
}
}
/**
*/
INode getNode(Vector components, int index) {
if (!name.equals((String) components.elementAt(index))) {
return null;
}
if (index == components.size() - 1) {
return this;
}
// Check with children
INode child = (INode) children.get(components.elementAt(index + 1));
if (child == null) {
return null;
} else {
return child.getNode(components, index + 1);
}
}
/**
* 通过递归调用把元数据信息写入到fsimage.new文件中
* 其实就是把loadFSImage方法加载到内存的数据读取出来然后写入进去
*/
void saveImage(String parentPrefix, DataOutputStream out) throws IOException {
String fullName = "";
if (parent != null) {
fullName = parentPrefix + "/" + name;
new UTF8(fullName).write(out);
if (blocks == null) {
out.writeInt(0);
} else {
out.writeInt(blocks.length);
for (int i = 0; i < blocks.length; i++) {
blocks[i].write(out);
}
}
}
for (Iterator it = children.values().iterator(); it.hasNext();) {
INode child = (INode) it.next();
child.saveImage(fullName, out);
}
}
}
static String FS_IMAGE = "fsimage";
static String NEW_FS_IMAGE = "fsimage.new";
static String OLD_FS_IMAGE = "fsimage.old";
// 即我们的头结点
INode rootDir = new INode("", null, null);
TreeSet activeBlocks = new TreeSet();
public boolean unprotectedAddFile(UTF8 name, Block blocks[]) {
synchronized (rootDir) {
if (blocks != null) {
// Add file->block mapping
for (int i = 0; i < blocks.length; i++) {
activeBlocks.add(blocks[i]);
}
}
return (rootDir.addNode(name.toString(), blocks) != null);
}
}
public boolean loadFSImage(File fsdir, File edits) throws IOException {
File curFile = new File(fsdir, FS_IMAGE);
File newFile = new File(fsdir, NEW_FS_IMAGE);
File oldFile = new File(fsdir, OLD_FS_IMAGE);
// 这里的判断挺有意思的
// saveFSImage中途失败还没从命名或者一些已经命名好了的文件,通过不同判断进行修复
if (oldFile.exists() && curFile.exists()) {
oldFile.delete();
if (edits.exists()) {
edits.delete();
}
} else if (oldFile.exists() && newFile.exists()) {
newFile.renameTo(curFile);
oldFile.delete();
} else if (curFile.exists() && newFile.exists()) {
newFile.delete();
}
if (curFile.exists()) {
DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(curFile)));
try {
int numFiles = in.readInt();
for (int i = 0; i < numFiles; i++) {
UTF8 name = new UTF8();
name.readFields(in);
int numBlocks = in.readInt();
if (numBlocks == 0) {
unprotectedAddFile(name, null);
} else {
Block blocks[] = new Block[numBlocks];
for (int j = 0; j < numBlocks; j++) {
blocks[j] = new Block();
blocks[j].readFields(in);
}
unprotectedAddFile(name, blocks);
}
}
} finally {
in.close();
}
}
return true;
// if (edits.exists() && loadFSEdits(edits) > 0) {
// return true;
// } else {
// return false;
// }
}
public void saveFSImage(File fullimage, File edits) throws IOException {
File curFile = new File(fullimage, FS_IMAGE);
File newFile = new File(fullimage, NEW_FS_IMAGE);
File oldFile = new File(fullimage, OLD_FS_IMAGE);
//
// Write out data
//
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(newFile)));
try {
out.writeInt(rootDir.numItemsInTree() - 1);
rootDir.saveImage("", out);
} finally {
out.close();
}
//
// Atomic move sequence(这里说的原子操作我之前想成是类似事务的功能了,所在自己抛出一个异常,看看是否会回退,结果并没有)
// 1-4步:把当前fsimage修改为old文件,并将saveImage方法写入数据的文件重命名为fsimage,之后删除edits和old文件
// 1. Move cur to old
curFile.renameTo(oldFile);
// int i = 1 / 0; // 测试原子性
// 2. Move new to cur
newFile.renameTo(curFile);
// 3. Remove pending-edits file (it's been integrated with newFile)
edits.delete();
// 4. Delete old
oldFile.delete();
}
public static void main(String[] args) throws IOException {
File dir = new File("tmp/hadoopx/dfs/name");
File fullimage = new File(dir, "image");
File edits = new File(dir, "edits");
FSDirectorTest fsDirectorTest = new FSDirectorTest();
boolean loadFSImage = fsDirectorTest.loadFSImage(fullimage, edits);
System.out.println(loadFSImage);
if (loadFSImage) {
fsDirectorTest.saveFSImage(fullimage, edits);
}
}
}

流程图?

也算不上是一个流程图,只不过把代码上一些内容以图片方式呈现。

addNode

写入到.new的文件后然后重命名为fsimage之后再删除.old的文件这样就算加载完毕了。

人生苦短,我要打赏!