privateServerSocketChannel acceptChannel = null; //the accept channel
private Selectorselector = null; //the selector that we usefor the server
private Reader[] readers= null;
private intcurrentReader = 0;
privateInetSocketAddress address; //the address webind at
private Random rand =new Random();
private longlastCleanupRunTime = 0; //the last time whena cleanup connec-
//-tion (for idle connections) ran
private longcleanupInterval = 10000; //the minimuminterval between
//two cleanup runs
private intbacklogLength = conf.getInt("ipc.server.listen.queue.size",128);
private ExecutorServicereadPool;
public void run(){
//ThreadLocal
SERVER.set(Server.this);
while(running) {
SelectionKey key = null;
try {
selector.select();
Iterator iter =selector.selectedKeys().iterator();
while (iter.hasNext()){
key =iter.next();
iter.remove();
try{
if (key.isValid()){
//如有了连接事件
if(key.isAcceptable())
doAccept(key);
}
} catch(IOException e) {
}
key =null;
}
} catch (OutOfMemoryError e) {
....................
} catch (Exception e) {
....................
}
cleanupConnections(false);
}
LOG.info("Stopping " + this.getName());
synchronized (this) {
try {
acceptChannel.close();
selector.close();
} catch (IOException e) { }
selector= null;
acceptChannel= null;
// clean up allconnections
while (!connectionList.isEmpty()) {
closeConnection(connectionList.remove(0));
}
}
}
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel channel;
while((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
//使用简单的robin轮询算法选择Reader
Reader reader = getReader();
try {
//唤醒readSelector
reader.startAdd();
//registerChannel实际就是注册写事件
SelectionKey readKey =reader.registerChannel(channel);
//构建Connection对象
c = newConnection(readKey, channel,System.currentTimeMillis());
readKey.attach(c);
synchronized (connectionList){
//connectionList 是 List,用于维护多个客户端连接
connectionList.add(numConnections, c);
numConnections++;
}
if(LOG.isDebugEnabled())
LOG.debug("Server connection from " + c.toString() +
"; # active connections: " +numConnections +
"; # queued calls: " +callQueue.size());
} finally {
reader.finishAdd();
}
}
}
使用简单的robin轮询算法选择Reader
// The method that willreturn the next reader to work with
// Simplisticimplementation of round robin for now
ReadergetReader() {
currentReader = (currentReader + 1) % readers.length;
return readers[currentReader];
}
public void startAdd() {
adding = true;
readSelector.wakeup();
}
//registerChannel实际就是注册写事件
public synchronizedSelectionKey registerChannel(SocketChannel channel)
throws IOException {
returnchannel.register(readSelector, SelectionKey.OP_READ);
}
//唤醒其他进程
public synchronized voidfinishAdd() {
adding = false;
this.notify();
}
private volatile boolean adding = false;
private Selector readSelector = null;
Reader(Selector readSelector) {
this.readSelector = readSelector;
}
public void run() {
LOG.info("Starting SocketReader");
synchronized (this) {
while (running) {
SelectionKey key = null;
try{
readSelector.select();
while (adding) {
this.wait(1000);
}
Iterator iter =readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if(key.isReadable()) {
doRead(key);
}
}
key = null;
}
} catch(InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(getName() + "caught: " +
StringUtils.stringifyException(e));
}
} catch(IOException ex) {
LOG.error("Error in Reader", ex);
}
}
}
}
voiddoRead(SelectionKey key) throws InterruptedException {
intcount = 0;
Connection c = (Connection)key.attachment();
if (c== null) {
return;
}
c.setLastContact(System.currentTimeMillis());
try{
count = c.readAndProcess();
}catch (InterruptedException ieo) {
LOG.info(getName() + ": readAndProcess caughtInterruptedException", ieo);
throw ieo;
}catch (Exception e) {
LOG.info(getName() + ": readAndProcess threwexception " + e + ". Count of bytes read: " + count, e);
count = -1; //so that the (count < 0) blockis executed
}
if(count < 0) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ":disconnecting client " +
c + ". Number of active connections: "+
numConnections);
closeConnection(c);
c = null;
}
else{
c.setLastContact(System.currentTimeMillis());
}
}
public intreadAndProcess() throws IOException, InterruptedException{
while(true) {
int count = -1;
if (dataLengthBuffer.remaining() > 0) {
count =channelRead(channel, dataLengthBuffer);
if (count < 0 ||dataLengthBuffer.remaining() > 0)
returncount;
}
if (!rpcHeaderRead) {
//Every connection isexpected to send the header.
if (rpcHeaderBuffer == null){
rpcHeaderBuffer = ByteBuffer.allocate(2);
}
count =channelRead(channel, rpcHeaderBuffer);
if (count < 0 ||rpcHeaderBuffer.remaining() > 0) {
returncount;
}
int version =rpcHeaderBuffer.get(0);
byte[] method = new byte[]{rpcHeaderBuffer.get(1)};
authMethod =AuthMethod.read(new DataInputStream(
new ByteArrayInputStream(method)));
dataLengthBuffer.flip();
if(!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION){
//Warningis ok since this is not supposed to happen.
LOG.warn("Incorrect header or version mismatch from "+
hostAddress + ":" + remotePort+
" got version " + version+
" expected version " +CURRENT_VERSION);
return-1;
}
dataLengthBuffer.clear();
if (authMethod == null){
throw newIOException("Unable to read authentication method");
}
if (isSecurityEnabled&& authMethod == AuthMethod.SIMPLE) {
AccessControlException ae = new AccessControlException(
"Authentication isrequired");
setupResponse(authFailedResponse, authFailedCall,Status.FATAL,
null,ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
throwae;
}
if (!isSecurityEnabled&& authMethod != AuthMethod.SIMPLE) {
doSaslReply(SaslStatus.SUCCESS, new IntWritable(
SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
authMethod= AuthMethod.SIMPLE;
// clienthas already sent the initial Sasl message and we
// shouldignore it. Both client and server should fall back
// tosimple auth from now on.
skipInitialSaslHandshake = true;
}
if (authMethod !=AuthMethod.SIMPLE) {
useSasl =true;
}
rpcHeaderBuffer = null;
rpcHeaderRead = true;
continue;
}
if (data == null) {
dataLengthBuffer.flip();
dataLength =dataLengthBuffer.getInt();
if (dataLength ==Client.PING_CALL_ID) {
if(!useWrap) { //covers the !useSasl too
dataLengthBuffer.clear();
return 0; //ping message
}
}
if (dataLength < 0){
LOG.warn("Unexpected data length " + dataLength + "!! from "+
getHostAddress());
}
data =ByteBuffer.allocate(dataLength);
}
count = channelRead(channel, data);
if (data.remaining() == 0) {
dataLengthBuffer.clear();
data.flip();
if (skipInitialSaslHandshake){
data =null;
skipInitialSaslHandshake = false;
continue;
}
boolean isHeaderRead =headerRead;
if (useSasl) {
saslReadAndProcess(data.array());
} else {
processOneRpc(data.array());
}
data = null;
if (!isHeaderRead) {
continue;
}
}
return count;
}
}
private intchannelRead(ReadableByteChannelchannel,
ByteBuffer buffer) throws IOException {
int count =(buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.read(buffer) :channelIO(channel, null, buffer);
if (count > 0){
rpcMetrics.incrReceivedBytes(count);
}
return count;
}
private void processOneRpc(byte[]buf) throws IOException,
InterruptedException {
if(headerRead) {
processData(buf);
}else {
processHeader(buf);
headerRead = true;
if (!authorizeConnection()) {
throw newAccessControlException("Connection from " + this
+ " for protocol " + header.getProtocol()
+ " is unauthorized for user " + user);
}
}
}
private voidprocessData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(newByteArrayInputStream(buf));
intid = dis.readInt(); // try to read an id
if(LOG.isDebugEnabled())
LOG.debug(" got #" + id);
Writable param = ReflectionUtils.newInstance(paramClass,conf);//read param
param.readFields(dis);
Call call = new Call(id, param, this);
callQueue.put(call); // queue the call; maybeblocked here
incRpcCount(); // Increment the rpc count
}
即org.apache.hadoop.ipc.Server.Listener.Reader线程接受客户端输入,将其包装为Call对象,放置在callQueue( 一个BlockingQueue )中。
@Override
public void run(){
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
ByteArrayOutputStream buf =
newByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
while(running) {
try {
final Call call =callQueue.take(); // pop the queue; maybe blocked here
if(LOG.isDebugEnabled())
LOG.debug(getName() + ": has #" + call.id + " from " +
call.connection);
String errorClass =null;
String error = null;
Writable value = null;
CurCall.set(call);
try {
// Makethe call as the user via Subject.doAs, thus associating
// thecall with the Subject
if(call.connection.user == null) {
value = call(call.connection.protocol,call.param,
call.timestamp);
} else{
value =
call.connection.user.doAs
(newPrivilegedExceptionAction() {
@Override
public Writable run() throwsException {
// make thecall
returncall(call.connection.protocol,
call.param,call.timestamp);
}
}
);
}
} catch (Throwable e) {
LOG.info(getName()+", call "+call+": error: " + e, e);
errorClass= e.getClass().getName();
error =StringUtils.stringifyException(e);
}
CurCall.set(null);
synchronized(call.connection.responseQueue) {
//setupResponse() needs to be sync'ed togetherwith
//responder.doResponse() since setupResponse may use
// SASL toencrypt response data and SASL enforces
// its ownmessage ordering.
setupResponse(buf, call,
(error ==null) ? Status.SUCCESS : Status.ERROR,
value,errorClass, error);
// Discard the large buf andreset it back to
// smaller size to freeupheap
if (buf.size() >maxRespSize) {
LOG.warn("Large response size " + buf.size() + " for call "+
call.toString());
buf = newByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
}
responder.doRespond(call);
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(getName() + " caught: " +
StringUtils.stringifyException(e));
}
} catch (Exception e) {
LOG.info(getName() + "caught: " +
StringUtils.stringifyException(e));
}
}
LOG.info(getName() + ": exiting");
}
call的一个实现例子,直接调用反射
public Writable call(Class protocol,Writable param, long receivedTime)
throws IOException{
try{
Invocation call = (Invocation)param;
if (verbose) log("Call: " + call);
Method method =
protocol.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);
long startTime =System.currentTimeMillis();
Object value = method.invoke(instance,call.getParameters());
int processingTime = (int)(System.currentTimeMillis() - startTime);
int qTime = (int)(startTime-receivedTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Served: " +call.getMethodName() +
" queueTime= " + qTime +
" procesingTime= " + processingTime);
}
rpcMetrics.addRpcQueueTime(qTime);
rpcMetrics.addRpcProcessingTime(processingTime);
rpcMetrics.addRpcProcessingTime(call.getMethodName(),processingTime);
if (verbose) log("Return: "+value);
return newObjectWritable(method.getReturnType(), value);
}catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof IOException) {
throw(IOException)target;
} else {
IOException ioe = newIOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
throw ioe;
}
}catch (Throwable e) {
if (!(e instanceof IOException)) {
LOG.error("Unexpectedthrowable object ", e);
}
IOException ioe = newIOException(e.toString());
ioe.setStackTrace(e.getStackTrace());
throw ioe;
}
}
}
private void setupResponse(ByteArrayOutputStreamresponse,
Call call,Status status,
Writablerv, String errorClass, String error)
throws IOException {
response.reset();
DataOutputStream out =new DataOutputStream(response);
out.writeInt(call.id); // write call id
out.writeInt(status.state); // writestatus
if (status ==Status.SUCCESS) {
rv.write(out);
} else {
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
if(call.connection.useWrap) {
wrapWithSasl(response, call);
}
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
void doRespond(Call call) throwsIOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1){
processResponse(call.connection.responseQueue, true);
}
}
}
private booleanprocessResponse(LinkedList responseQueue,
booleaninHandler) throws IOException {
boolean error = true;
boolean done = false; // there is more data for this channel.
intnumElements = 0;
Callcall = null;
try{
synchronized (responseQueue) {
//
// If there are no items forthis channel, then we are done
//
numElements =responseQueue.size();
if (numElements == 0) {
error =false;
returntrue; // no more data for this channel.
}
//
// Extract the firstcall
//
call =responseQueue.removeFirst();
SocketChannel channel =call.connection.channel;
if (LOG.isDebugEnabled()){
LOG.debug(getName() + ": responding to #" + call.id + " from "+
call.connection);
}
//
// Send as much data as wecan in the non-blocking fashion
//
int numBytes =channelWrite(channel, call.response);
if (numBytes < 0) {
returntrue;
}
if(!call.response.hasRemaining()) {
call.connection.decRpcCount();
if(numElements == 1) { // lastcall fully processes.
done = true; // no more data for thischannel.
} else{
done = false; // more calls pending to besent.
}
if(LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" +call.id + " from " +
call.connection + " Wrote " + numBytes + " bytes.");
}
} else {
//
// If wewere unable to write the entire response out,then
// insertin Selector queue.
//
call.connection.responseQueue.addFirst(call);
if(inHandler) {
// set the serve time when the response has tobe sent later
call.timestamp =System.currentTimeMillis();
incPending();
try {
// Wakeup the thread blockedon select, only then can the call
// to channel.register()complete.
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE,call);
} catch (ClosedChannelException e) {
//Its ok. channel might beclosed else where.
done = true;
} finally {
decPending();
}
}
if(LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" +call.id + " from " +
call.connection + " Wrote partial " + numBytes+
"bytes.");
}
}
error = false; // everything went off well
}
}finally {
if (error && call != null) {
LOG.warn(getName()+", call "+ call + ": output error");
done = true; // error. no more data for this channel.
closeConnection(call.connection);
}
}
return done;
}
private intchannelWrite(WritableByteChannelchannel,
ByteBuffer buffer) throwsIOException {
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT)?
channel.write(buffer) : channelIO(null, channel,buffer);
if (count > 0){
rpcMetrics.incrSentBytes(count);
}
return count;
}
org.apache.hadoop.ipc.Server.Handler 线程从callQueue拿出Call,使用反射调用方法运行,将结果放入responseQueue或者直接写给客户端
private Selector writeSelector;
private int pending; // connections waiting to register
final static intPURGE_INTERVAL = 900000; // 15mins
Responder() throwsIOException {
this.setName("IPC Server Responder");
this.setDaemon(true);
writeSelector = Selector.open(); // create a selector
pending = 0;
}
@Override
public void run(){
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
longlastPurgeTime = 0; // last check for oldcalls.
while(running) {
try {
waitPending(); // If a channel is beingregistered, wait.
writeSelector.select(PURGE_INTERVAL);
Iterator iter =writeSelector.selectedKeys().iterator();
while (iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
try{
if (key.isValid() &&key.isWritable()) {
doAsyncWrite(key);
}
} catch(IOException e) {
LOG.info(getName() + ": doAsyncWrite threwexception " + e);
}
}
long now =System.currentTimeMillis();
if (now < lastPurgeTime +PURGE_INTERVAL) {
continue;
}
lastPurgeTime = now;
//
// If there were some callsthat have not been sent out for a
// long time, discardthem.
//
LOG.debug("Checking for oldcall responses.");
ArrayList calls;
// get the list of channelsfrom list of keys.
synchronized(writeSelector.keys()) {
calls =new ArrayList(writeSelector.keys().size());
iter =writeSelector.keys().iterator();
while(iter.hasNext()) {
SelectionKey key = iter.next();
Call call = (Call)key.attachment();
if (call != null && key.channel() ==call.connection.channel) {
calls.add(call);
}
}
}
for(Call call : calls){
try{
doPurge(call, now);
} catch(IOException e) {
LOG.warn("Error in purging old calls " +e);
}
}
} catch (OutOfMemoryError e) {
//
// we can run out of memoryif we have too many threads
// log the event and sleepfor a minute and give
// some thread(s) a chance tofinish
//
LOG.warn("Out of Memory inserver select", e);
try { Thread.sleep(60000); }catch (Exception ie) {}
} catch (Exception e) {
LOG.warn("Exception inResponder " +
StringUtils.stringifyException(e));
}
}
LOG.info("Stopping " + this.getName());
}
private voiddoAsyncWrite(SelectionKey key) throws IOException {
Callcall = (Call)key.attachment();
if(call == null) {
return;
}
if(key.channel() != call.connection.channel) {
throw new IOException("doAsyncWrite: badchannel");
}
synchronized(call.connection.responseQueue) {
if(processResponse(call.connection.responseQueue, false)){
try {
key.interestOps(0);
} catch(CancelledKeyException e) {
LOG.warn("Exception while changing ops : " + e);
}
}
}
}
// Remove calls that have beenpending in the responseQueue
// for a longtime.
//
private voiddoPurge(Call call, long now) throws IOException {
LinkedList responseQueue = call.connection.responseQueue;
synchronized (responseQueue) {
Iterator iter =responseQueue.listIterator(0);
while (iter.hasNext()) {
call = iter.next();
if (now > call.timestamp +PURGE_INTERVAL) {
closeConnection(call.connection);
break;
}
}
}
}