Hadoop源代码分析

Hadoop源代码分析之Hadoop RPC

RPC是一种通过网络从远程计算机上请求服务,而不需要了解底层网络通信技术的协议。是分布式中最常见和常用的网络通信协议。
Hadoop RPC具有以下几个特点:

  1. 透明性: RPC最基本的特性, 用户在一台计算机中调用另外一台计算机的子程序时,用户自身不应该感觉到其间涉及跨机器间的通信,而是感觉在执行一个本地调用。

  2. 高性能: Hadoop的各个系统(如HDFS, MapReduce)均采用了Master/Slaver的结构。因此需要Hadoop的RPC Server能够高性能的处理各个Client的请求。

  3. 可控性: Hadoop需要精确的控制进程间通信,比如连接,超时,缓存等通信细节,而Java RMI过于重量级且用户可控之处太少

Hadoop的相关代码都在org.apache.hadoop.ipc中,其中最主要的类有三个:

Server: Hadoop RPC Server的实现,这是一个抽象类,只有一个抽象方法

1
public abstract Writable call(Class<?> protocol,Writable param, long receiveTime) throws IOException;

具体的实现在 RPC.Server中, 其中主要包括5各类:

  1. Call: 用于储存客户端的请求。

  2. Listener: 监听类,用于监听客户端发来的请求,把数据封装成Call对象, 添加到callQueue。

  3. Handler:请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

  4. Responder:响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。

  5. Connection:连接类,真正的客户端请求读取逻辑在这个类中。其中Listener,Handler,Responder都继承了Thread,在服务器启动时同时启动这三个线程,下面看这个三个线程的run方法

Listener的关键代码:其实和一般的NIO服务器ServerChannal写法差不多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void run() {
...
while (running) {
SelectionKey key = null;
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
// 创建Connection对象,并把Connection做为key的attachment以便下面的doRead方法使用
doAccept(key);
else if (key.isReadable())
// 主要调用Connection的readAndProcess方法,读取客户端发送过来的数据进行处理,分为三个步骤
// 1.读取IPC连接魔数和协议版本号并完成版本检查
// 2.进行连接头检查,主要调用了两个函数processHeader():保证服务器实现了IPC接口和获取用户信息
// authorize():保证用户有相关的权限访问远程接口
// 3.调用processData()方法处理数据,主要是新建一个Call对象,读取数据填充这个Call的成员变量,并加入到callQueue队列中
doRead(key);
}
} catch (IOException e) {
}
key = null;
}
}
...
}

Handler才是真正执行客户端发过来的远程调用,其关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void run() {
...
while (running) {
try {
// pop the queue; maybe blocked here,从callQueue队列中取出call对象进行处理
final Call call = callQueue.take();
...
//Subject.doAs()是java的鉴权与授权服务(JAAS)中的方法
value = Subject.doAs(call.connection.user,
new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
return call(call.connection.protocol,call.param, call.timestamp);
/**调用Server的那个抽象方法(当然是调用它子类的实现啦 (RPC.Server.call()))。其实现的关键代码如下:**/
public Writable call(Class<?> protocol, Writable param, long receivedTime)throws IOException {
try {
//这个就是我们常用的java反射调用方法啦
Invocation call = (Invocation)param;
Method method = protocol.getMethod(call.getMethodName(),call.getParameterClasses());
method.setAccessible(true);
Object value = method.invoke(instance, call.getParameters());
...
return new ObjectWritable(method.getReturnType(), value);
} catch (InvocationTargetException e) {
...
}
}
}
);
}
...
//把返回结果储存在这个Call中的response
setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR,
value, errorClass, error);
// 把数据回写给客户端,其中会调用一个Response中一个非常重要的方法processResponse()
// 这个方法只在通道空闲时响应(要处理的Call队列长度为1),忙时不会响应,而是交个Responder
// 进行集中处理和响应
responder.doRespond(call);
}
...
}

Responder关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void run() {
...
while (running) {
try {
waitPending(); // If a channel is being registered, wait.
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key);//把数据写到缓存区从而发送给客户端,同样调用了processResponse()方法
}
} catch (IOException e) {
...
}
}
}
}

其中响应请求回写数据最关键的代码 processResponse()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException {
boolean error = true;
boolean done = false; // 该通道上没有数据要发送
int numElements = 0;
Call call = null;
try {
synchronized (responseQueue) {
// If there are no items for this channel, then we are done
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; // no more data for this channel.
}
call = responseQueue.removeFirst();
...
//异步写尽可能多的数据
int numBytes = channelWrite(channel, call.response);
if (numBytes < 0) {
return true;
}
if (!call.response.hasRemaining()) {
//应答数据已经写完
call.connection.decRpcCount();
if (numElements == 1) { // last call fully processes.
done = true; // no more data for this channel.
} else {
done = false; // more calls pending to be sent.
}
...
} else {
// 为什么会有数据没写完的情况: 在Reactor模式中,业务逻辑被分散的I/O事件所打破, 所以Handler需要适当的机制在所需信息
// 还不全的时候保存上下文,并能在下一次IO事件来的时候继续上次的中断处理
// 应答数据没有写完,插入队列头,等待再次发送
call.connection.responseQueue.addFirst(call);
if (inHandler) {
// 不在Response线程中,在Handler线程中,前面说过当通道空闲时,Handler线程也会调用这个方法往通道中写数据,同样如果
// 数据没写完,就需要交给Responder处理,这是就需要把次没写完的数据标记为Responder感兴趣的事件,等待Responder的
// Selector选择出来并处理
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
// 成员变量peding++,该变量表示现在有多少个线程在进行通道注册
incPending();
try {
// 唤醒在select()方法上等待的Responder线程
writeSelector.wakeup();
//这样才能调用这个注册方法进行注册
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
done = true;
} finally {
decPending();
}
}
...
}
error = false;// everything went off well
}
} finally {
...
}
return done;
}

Hadoop RPC Server 是一个典型的Reactor模式的实现。
Reactor模式主要有两个特点

  1. 通过派发/分离I/O操作事件提高系统的并发性能
  2. 提供粗粒度的并发控制,使用单线程实现,避免了负责的同步处理

//TODO 写Hadoop RPC是如何实现Reactor模式的

Hadoop RPC参数调优

  • Reader线程数目: (ipc.server.read.threadpool.size), 默认是1

  • 每个Handler线程对应的Call数目: (ipc.server.handler.queue.size)指定, 默认100

  • Handler线程数目: JobTracker 和 NameNode 是HDFS中的两个RPC Server,其对应的Handler数目分别有参数(mapred.job.tracker.handler.count)和(dfs.namenode.service.handler.count)指定,默认值都为10,这个参数的配置会极大的影响性能。因为Handler线程处理业务逻辑,而其中有可能牵扯计算密集或I/O密集,线程少,耗时的业务逻辑会让大部分的线程阻塞,而响应快的请求得不到及时的处理,这时Reader收集的请求队列会长期处于满的状态,导致通讯恶化,线程过多,又会导致频繁的切换线程的开销

  • 客户端重试次数: (ipc.client.connect.max.retries)指定,默认为10