Hadoop NameNode启动之心跳机制

前言

HDFS的心跳机制是如何设计的?如果我们自己来做又是如果判断一个服务是否有效?
假设不是一个分布式程序,而是一个多线程程序。有10个线程,一个master线程,9个slave线程。
你需要知道这9个slave线程是否还是活跃有效的。
那么,在本文最后会给出一些不错的文章链接。

源码分析

那么到底是谁给谁发送心跳呢?一般来说是DataNode向NameNode发送心跳,然后判断DataNode是否已经失效了。那么DataNode是如何发送心跳的呢?

DataNode.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void run() {
LOG.info("Starting DataNode in: "+data.data);
while (shouldRun) { // 当datanode停止服务的时候, 改变了会由true变为false
try {
offerService(); // 查看这里
} catch (Exception ex) {
LOG.info("Exception: " + ex);
if (shouldRun) {
LOG.info("Lost connection to namenode. Retrying...");
try {
Thread.sleep(5000); // 5秒钟发送一次心跳
} catch (InterruptedException ie) {
}
}
}
}
LOG.info("Finishing DataNode in: "+data.data);
}
public void offerService() throws Exception {
long wakeups = 0;
long lastHeartbeat = 0, lastBlockReport = 0;
long sendStart = System.currentTimeMillis();
int heartbeatsSent = 0;
LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
while (shouldRun) {
long now = System.currentTimeMillis();
synchronized (receivedBlockList) {
// HEARTBEAT_INTERVAL = 3000 (3s)
// 当前时间减去上一次的时间大于3秒后发送一次心跳
if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {
// 向NameNode发送心跳, 把当前的容量和剩余的容量发送过去
namenode.sendHeartbeat(localName, data.getCapacity(), data.getRemaining());
// 最后一次发送心跳时间记录
lastHeartbeat = now;
}
}
...
}
}

好, 接下来看一下NameNode是如何处理的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public void sendHeartbeat(String sender, long capacity, long remaining) {
namesystem.gotHeartbeat(new UTF8(sender), capacity, remaining);
}
// 这里加了一个对象锁的时候, 其实里面不需要加锁heartbeats,datanodeMap之类的不需要加锁
// 上面的想法是最初的, 但是忽然想到, 对象是同一个,但是你无法保证共享变量不会被多个线程
// 进行修改, 而且被修改的对象被其它对象获取到引用的时候又会出现对于的线程安全问题 [参考样例1]
public synchronized void gotHeartbeat(UTF8 name, long capacity, long remaining) {
synchronized (heartbeats) { // treeset,放入需要进行心跳的机器节点名称
synchronized (datanodeMap) {
LOG.info("currThread Run => " + Thread.currentThread().getName());
LOG.info("cur sync ? " + this);
long capacityDiff = 0;
long remainingDiff = 0;
// 获取对应节点的信息
DatanodeInfo nodeinfo = (DatanodeInfo) datanodeMap.get(name);
// 如果是一个新节点
if (nodeinfo == null) {
LOG.info("Got brand-new heartbeat from " + name);
nodeinfo = new DatanodeInfo(name, capacity, remaining);
// 加入到datanode map中
datanodeMap.put(name, nodeinfo);
capacityDiff = capacity;
remainingDiff = remaining;
} else {
// 这里是一个有趣的地方
// 该节点发送了最新的容量和剩余的容量过来, 和历史节点信息进行比较
// 假设我们的服务器扩展了新的磁盘比原来的更大
// 那么就赋值到到对应的变量上
capacityDiff = capacity - nodeinfo.getCapacity();
remainingDiff = remaining - nodeinfo.getRemaining();
// 不管是否改变, 这里的节点信息都要删除。
heartbeats.remove(nodeinfo);
// 这里会进行更新节点时间以及容量
nodeinfo.updateHeartbeat(capacity, remaining);
}
// 将更新后的节点添加到心跳中
heartbeats.add(nodeinfo);
// 并更新集群总容量大小
totalCapacity += capacityDiff;
totalRemaining += remainingDiff;
}
}
}
public void updateHeartbeat(long capacity, long remaining) {
this.capacityBytes = capacity;
this.remainingBytes = remaining;
this.lastUpdate = System.currentTimeMillis(); // 当前时间
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// 心跳监控线程
class HeartbeatMonitor implements Runnable {
/**
*/
public void run() {
while (fsRunning) {
heartbeatCheck();
try {
Thread.sleep(heartBeatRecheck); //1s发送一次心跳
} catch (InterruptedException ie) {
}
}
}
}
/** 心跳检查方法 */
synchronized void heartbeatCheck() {
synchronized (heartbeats) {
DatanodeInfo nodeInfo = null;
// 如果节点最后更新的时候小于当前时间减去10分钟, 那么表示当前节点已经失去联系了
// 10分钟我们可以认为是网络的延迟
while ((heartbeats.size() > 0) &&
((nodeInfo = (DatanodeInfo) heartbeats.first()) != null) &&
(nodeInfo.lastUpdate() < System.currentTimeMillis() - EXPIRE_INTERVAL)) {
LOG.info("Lost heartbeat for " + nodeInfo.getName());
// 删除对应的DataNode节点信息(不在进行心跳联系)
heartbeats.remove(nodeInfo);
// 删除对应的DataNode
synchronized (datanodeMap) {
datanodeMap.remove(nodeInfo.getName());
}
// 将该DataNode的容量从总容量中去除
totalCapacity -= nodeInfo.getCapacity();
totalRemaining -= nodeInfo.getRemaining();
// 获取到该节点的Block信息
Block deadblocks[] = nodeInfo.getBlocks();
if (deadblocks != null) {
for (int i = 0; i < deadblocks.length; i++) {
// 把需要删除的block信息发送出去
removeStoredBlock(deadblocks[i], nodeInfo);
}
}
if (heartbeats.size() > 0) {
nodeInfo = (DatanodeInfo) heartbeats.first();
}
}
}
}
synchronized void removeStoredBlock(Block block, DatanodeInfo node) {
// 从blockMap中获取对应的节点名称信息
TreeSet containingNodes = (TreeSet) blocksMap.get(block);
// 如果不存在的话则抛出异常
if (containingNodes == null || ! containingNodes.contains(node)) {
throw new IllegalArgumentException("No machine mapping found for block " + block + ", which should be at node " + node);
}
// 删除节点信息
containingNodes.remove(node);
// 检查block块是否有效, 如果当前block的副本小于配置的信息, 则需要进行复制
if (dir.isValidBlock(block) && (containingNodes.size() < this.desiredReplication)) {
synchronized (neededReplications) {
neededReplications.add(block);
}
}
// ...?
TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(node.getName());
if (excessBlocks != null) {
excessBlocks.remove(block);
if (excessBlocks.size() == 0) {
excessReplicateMap.remove(node.getName());
}
}
}

样例

例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public static class Demo {
private byte[] b = new byte[1];
// 其实res不应该是public,这样的话该变量会被其它对象获取到引用,从而修改元素,但是演示方便我置位public
public List<String> res = new ArrayList<String>();
// 如果仅仅只是对象锁的话,在修改res对象的时候,别的线程也在修改, 那么就出现问题了
public synchronized void write1() throws InterruptedException {
synchronized (b) {
System.out.println("w1 res = " + res);
res.add("1");
}
}
public void write2() {
synchronized (b) {
System.out.println("w2 res = " + res);
res.add("2");
}
}
}
public static void main(String[] args) throws InterruptedException {
final Demo demo = new Demo();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
demo.write1();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
demo.write2();
}
}, "t2");
t1.start();
t2.start();
}

样例2:抽取出心跳监控代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
public class HeartbeatMonitorTest {
public static final Logger LOG = LogFormatter.getLogger("cn.base.test.HeartbeatMonitorTest");
private int heartBeatRecheck = 3000;
private boolean fsRunning = true;
private TreeMap datanodeMap = new TreeMap();
private TreeMap blocksMap = new TreeMap();
private TreeMap excessReplicateMap = new TreeMap();
private FSDirectory dir;
private int desiredReplication;
private TreeSet neededReplications = new TreeSet();
private long totalCapacity = 0, totalRemaining = 0;
private static long EXPIRE_INTERVAL = 1 * 60 * 1000;
private Daemon hbthread = null;
private long capacityDiff = 0;
private long remainingDiff = 0;
TreeSet heartbeats = new TreeSet(new Comparator() {
public int compare(Object o1, Object o2) {
DatanodeInfo d1 = (DatanodeInfo) o1;
DatanodeInfo d2 = (DatanodeInfo) o2;
long lu1 = d1.lastUpdate();
long lu2 = d2.lastUpdate();
if (lu1 < lu2) {
return -1;
} else if (lu1 > lu2) {
return 1;
} else {
return d1.getName().compareTo(d2.getName());
}
}
});
class DatanodeInfo {
private long capacityBytes, remainingBytes, lastUpdate;
private volatile TreeSet blocks;
private UTF8 name;
public DatanodeInfo(UTF8 name, long capacity, long remaining) {
this.name = name;
this.blocks = new TreeSet();
updateHeartbeat(capacity, remaining);
}
public void updateHeartbeat(long capacity, long remaining) {
this.capacityBytes = capacity;
this.remainingBytes = remaining;
this.lastUpdate = System.currentTimeMillis();
}
public Block[] getBlocks() {
return (Block[]) blocks.toArray(new Block[blocks.size()]);
}
public Iterator getBlockIterator() {
return blocks.iterator();
}
public long getCapacity() {
return capacityBytes;
}
public long getRemaining() {
return remainingBytes;
}
public long lastUpdate() {
return lastUpdate;
}
public UTF8 getName() {
return name;
}
}
class HeartbeatMonitorTask implements Runnable {
@Override
public void run() {
while (true) {
heartbeatCheck();
try {
Thread.sleep(heartBeatRecheck);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
synchronized void heartbeatCheck() {
synchronized (heartbeats) {
DatanodeInfo nodeInfo = null;
while ((heartbeats.size() > 0) && ((nodeInfo = (DatanodeInfo) heartbeats.first()) != null)
&& (nodeInfo.lastUpdate() < System.currentTimeMillis() - EXPIRE_INTERVAL)) {
LOG.info("Lost heartbeat for " + nodeInfo.getName());
heartbeats.remove(nodeInfo);
synchronized (datanodeMap) {
datanodeMap.remove(nodeInfo.getName());
}
totalCapacity -= nodeInfo.getCapacity();
totalRemaining -= nodeInfo.getRemaining();
Block deadblocks[] = nodeInfo.getBlocks();
if (deadblocks != null) {
for (int i = 0; i < deadblocks.length; i++) {
removeStoredBlock(deadblocks[i], nodeInfo);
}
}
if (heartbeats.size() > 0) {
nodeInfo = (DatanodeInfo) heartbeats.first();
}
}
}
}
synchronized void removeStoredBlock(Block block, DatanodeInfo node) {
TreeSet containingNodes = (TreeSet) blocksMap.get(block);
if (containingNodes == null || !containingNodes.contains(node)) {
throw new IllegalArgumentException(
"No machine mapping found for block " + block + ", which should be at node " + node);
}
containingNodes.remove(node);
//
// It's possible that the block was removed because of a datanode
// failure. If the block is still valid, check if replication is
// necessary. In that case, put block on a possibly-will-
// be-replicated list.
//
if (dir.isValidBlock(block) && (containingNodes.size() < this.desiredReplication)) {
synchronized (neededReplications) {
neededReplications.add(block);
}
}
//
// We've removed a block from a node, so it's definitely no longer
// in "excess" there.
//
TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(node.getName());
if (excessBlocks != null) {
excessBlocks.remove(block);
if (excessBlocks.size() == 0) {
excessReplicateMap.remove(node.getName());
}
}
}
public HeartbeatMonitorTest() {
this.hbthread = new Daemon(new HeartbeatMonitorTask(), "A");
}
public void start() throws InterruptedException {
String name = "JokerdeMacBook-Pro.local:50010";
long capacity = 250140434432L;
long remaining = 98054681805L;
UTF8 n = new UTF8(name);
DatanodeInfo nodeinfo = new DatanodeInfo(n, capacity, remaining);
this.datanodeMap.put(n, nodeinfo);
this.capacityDiff = capacity;
this.remainingDiff = remaining;
this.heartbeats.add(nodeinfo);
this.hbthread.start();
this.hbthread.join();
}
public static void main(String[] args) throws InterruptedException {
HeartbeatMonitorTest task = new HeartbeatMonitorTest();
task.start();
}
}

结束语

到这里, 我们结束了整个流程,由DataNode向NameNode发送心跳。
NameNode判断当前节点是否超时,如果是则进行对应的操作,否则一直持续相关操作。

最后在分享一些心跳监控的设计资源链接:
http://liaojieliang.com/heartbeat-protocal-design/
http://blog.csdn.net/baidu20008/article/details/45022461
http://www.raychase.net/3758

人生苦短,我要打赏!