privatebooleanprocessResponse(LinkedList<Call> responseQueue, boolean inHandler)throws IOException { boolean error = true; boolean done = false; // 该通道上没有数据要发送 int numElements = 0; Call call = null; try { synchronized (responseQueue) { // If there are no items for this channel, then we are done numElements = responseQueue.size(); if (numElements == 0) { error = false; returntrue; // no more data for this channel. } call = responseQueue.removeFirst(); ... //异步写尽可能多的数据 int numBytes = channelWrite(channel, call.response); if (numBytes < 0) { returntrue; } if (!call.response.hasRemaining()) { //应答数据已经写完 call.connection.decRpcCount(); if (numElements == 1) { // last call fully processes. done = true; // no more data for this channel. } else { done = false; // more calls pending to be sent. } ... } else { // 为什么会有数据没写完的情况: 在Reactor模式中,业务逻辑被分散的I/O事件所打破, 所以Handler需要适当的机制在所需信息 // 还不全的时候保存上下文,并能在下一次IO事件来的时候继续上次的中断处理 // 应答数据没有写完,插入队列头,等待再次发送 call.connection.responseQueue.addFirst(call); if (inHandler) { // 不在Response线程中,在Handler线程中,前面说过当通道空闲时,Handler线程也会调用这个方法往通道中写数据,同样如果 // 数据没写完,就需要交给Responder处理,这是就需要把次没写完的数据标记为Responder感兴趣的事件,等待Responder的 // Selector选择出来并处理 // set the serve time when the response has to be sent later call.timestamp = System.currentTimeMillis(); // 成员变量peding++,该变量表示现在有多少个线程在进行通道注册 incPending(); try { // 唤醒在select()方法上等待的Responder线程 writeSelector.wakeup(); //这样才能调用这个注册方法进行注册 channel.register(writeSelector, SelectionKey.OP_WRITE, call); } catch (ClosedChannelException e) { //Its ok. channel might be closed else where. done = true; } finally { decPending(); } } ... } error = false;// everything went off well } } finally { ... } return done; }
Hadoop RPC Server 是一个典型的Reactor模式的实现。 Reactor模式主要有两个特点