Hadoop源码分析5: RPC基本线程

摘要:
1.数据记录FileStatusupblicclassFileStatussimplementsWritable{privateStringfilename;privatelonger;static{//registerIPCFileStatusWritableFactories.setFactory(FileStatus.class,newWritableFactory(){publicWri
1. 数据记录FileStatus

public class FileStatus implements Writable {
      private String filename;
       private long time;   
    static {  // register IPCFileStatus
       WritableFactories.setFactory
           (FileStatus.class,
            new WritableFactory() {
                publicWritable newInstance() { return new FileStatus(); } } );
    }
    
    public FileStatus() {   
    }
    
    publicFileStatus(String filename) {
      this.filename=filename;
      this.time=(newDate()).getTime();
    }
 

   @Override
    publicvoid readFields(DataInput in) throws IOException {
      this.filename= Text.readString(in);
      this.time =in.readLong();
    }

   @Override
    publicvoid write(DataOutput out) throws IOException {
      Text.writeString(out, filename);
       out.writeLong(time);
    }  
    //getter , setter
}

2.服务接口和实现

public interface Query extends VersionedProtocol{
   FileStatus getFileStatus(Stringfilename);
 }

public class QueryImpl implements Query {
   protected QueryImpl() {
    }

   @Override
    publicFileStatus getFileStatus(String filename) {
      FileStatus status=newFileStatus(filename);
      System.out.println("MethodgetFileStatus Called, return: "+status);
      return status;
    }

   @Override
    publiclong getProtocolVersion(String protocol, long clientVersion)throws IOException {
      System.out.println("protocol:"+protocol);
      System.out.println("clientVersion:"+clientVersion);
       returnMyServer.IPC_VER;
    }
}
3.服务器端
public class MyServer {
    publicstatic final int IPC_PORT = 32121;
    publicstatic final long IPC_VER = 5473L;
    publicstatic void main(String[] args) throws IOException{ 

      QueryImpl queryService=new QueryImpl();       
      Server server =RPC.getServer(queryService, 
                         "0.0.0.0",IPC_PORT, 
                         2,true,
                         newConfiguration());
      server.start();
      System.out.println("Serverready, press any key to stop");
      System.in.read();
      server.stop();
      System.out.println("Serverstopped");
    }
}

4.客户端
public class MyClient {
    publicstatic void main(String[] args) throws Exception { 
       InetSocketAddressaddr=new InetSocketAddress("localhost", MyServer.IPC_PORT);
      Query query=(Query) RPC.getProxy(Query.class, MyServer.IPC_VER,addr,new Configuration());
      FileStatusstatus=query.getFileStatus("/tmp/testIPC");
      System.out.println(status);
      RPC.stopProxy(query); 
    }
}

5.服务器端执行以下getServer时候:
Server server= RPC.getServer(queryService, 
                        "0.0.0.0",IPC_PORT, 
                        2, true,
                        newConfiguration());
即以下方法,
使用2
Handlers
public static Server getServer(final Object instance,final String bindAddress, final int port, 
final intnumHandlers, 
 final boolean verbose,Configuration conf) 

(1).构造newConfiguration()时候会加载 core-default.xml,core-site.xml。

(2).实际返回的是 org.apache.hadoop.ipc.RPC.Server 对象(继承自org.apache.hadoop.ipc.Server

 (3).生成
1org.apache.hadoop.ipc.Server.Listener 线程,注册SelectionKey.OP_ACCEPT 事件
1org.apache.hadoop.ipc.Server.Listener.Reader线程(个数在IPC_SERVER_RPC_READ_THREADS_KEY中配置),并且立即启动线程,监控 Readable的事件。
1org.apache.hadoop.ipc.Server.Responder线程,监控Writable事件和Call
 
6.服务器端执行start 的时候

public synchronized void start() {
   responder.start();
   listener.start();
    handlers = newHandler[handlerCount];
    
    for (int i = 0; i <handlerCount; i++) {
     handlers[i] = new Handler(i);
     handlers[i].start();
    }
  }

立即启动org.apache.hadoop.ipc.Server.Responder线程
立即启动org.apache.hadoop.ipc.Server.Listener 线程
立即构造启动2org.apache.hadoop.ipc.Server.Handler 线程,用于监控 Call。

此时共有5个线程,其内容主要如下:

1个 org.apache.hadoop.ipc.Server.Listener

    privateServerSocketChannel acceptChannel = null; //the accept channel
    private Selectorselector = null; //the selector that we usefor the server
    private Reader[] readers= null;
    private intcurrentReader = 0;
    privateInetSocketAddress address; //the address webind at
    private Random rand =new Random();
    private longlastCleanupRunTime = 0; //the last time whena cleanup connec-
                                  //-tion (for idle connections) ran
    private longcleanupInterval = 10000; //the minimuminterval between 
                                   //two cleanup runs
    private intbacklogLength = conf.getInt("ipc.server.listen.queue.size",128);
    private ExecutorServicereadPool; 

    public void run(){
    //ThreadLocal
   SERVER.set(Server.this);
      while(running) {
       SelectionKey key = null;
       try {
         selector.select();
         Iterator iter =selector.selectedKeys().iterator();
         while (iter.hasNext()){
           key =iter.next();
          iter.remove();
           try{
            if (key.isValid()){ 
             //如有了连接事件
              if(key.isAcceptable())
               doAccept(key);
            }
           } catch(IOException e) {
           }
           key =null;
         }
       } catch (OutOfMemoryError e) {
         ....................
       } catch (Exception e) {
         ....................
       }
       cleanupConnections(false);
     }
     LOG.info("Stopping " + this.getName());

     synchronized (this) {
       try {
         acceptChannel.close();
         selector.close();
       } catch (IOException e) { }
       selector= null;
       acceptChannel= null;       
       // clean up allconnections
       while (!connectionList.isEmpty()) {
        closeConnection(connectionList.remove(0));
       }
     }
    }

void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
     Connection c = null;
     ServerSocketChannel server = (ServerSocketChannel)key.channel();
     SocketChannel channel;
      while((channel = server.accept()) != null) {
       channel.configureBlocking(false);
      channel.socket().setTcpNoDelay(tcpNoDelay);
       //使用简单的robin轮询算法选择Reader
       Reader reader = getReader();
       try {
        //唤醒readSelector
        reader.startAdd();
         //registerChannel实际就是注册写事件
         SelectionKey readKey =reader.registerChannel(channel);
         //构建Connection对象
         c = newConnection(readKey, channel,System.currentTimeMillis());
         readKey.attach(c);
         synchronized (connectionList){
           //connectionList 是 List,用于维护多个客户端连接
          connectionList.add(numConnections, c);
          numConnections++;
         }
         if(LOG.isDebugEnabled())
          LOG.debug("Server connection from " + c.toString() +
              "; # active connections: " +numConnections +
              "; # queued calls: " +callQueue.size());         
       } finally {
        reader.finishAdd(); 
       }

     }
    }

   使用简单的robin轮询算法选择Reader
    // The method that willreturn the next reader to work with
    // Simplisticimplementation of round robin for now
    ReadergetReader() {
     currentReader = (currentReader + 1) % readers.length;
     return readers[currentReader];
    }

     
     public void startAdd() {
       adding = true;
       readSelector.wakeup();
     }

    //registerChannel实际就是注册写事件
    public synchronizedSelectionKey registerChannel(SocketChannel channel)
                                                 throws IOException {
         returnchannel.register(readSelector, SelectionKey.OP_READ);
     }

  //唤醒其他进程   
   public synchronized voidfinishAdd() {
       adding = false;
       this.notify();       
     }


1个 org.apache.hadoop.ipc.Server.Listener.Reader 

     private volatile boolean adding = false;
     private Selector readSelector = null;

     Reader(Selector readSelector) {
       this.readSelector = readSelector;
     }
     public void run() {
       LOG.info("Starting SocketReader");
       synchronized (this) {
         while (running) {
          SelectionKey key = null;
           try{
            readSelector.select();
            while (adding) {
              this.wait(1000);
            }            

            Iterator iter =readSelector.selectedKeys().iterator();
            while (iter.hasNext()) {
              key = iter.next();
              iter.remove();
              if (key.isValid()) {
                if(key.isReadable()) {
                 doRead(key);
                }
              }
              key = null;
            }
           } catch(InterruptedException e) {
            if (running) {                  // unexpected -- log it
              LOG.info(getName() + "caught: " +
                     StringUtils.stringifyException(e));
            }
           } catch(IOException ex) {
            LOG.error("Error in Reader", ex);
           }
         }
       }
     }

    voiddoRead(SelectionKey key) throws InterruptedException {
      intcount = 0;
     Connection c = (Connection)key.attachment();
      if (c== null) {
       return;  
     }
     c.setLastContact(System.currentTimeMillis());
     
      try{
       count = c.readAndProcess();
      }catch (InterruptedException ieo) {
       LOG.info(getName() + ": readAndProcess caughtInterruptedException", ieo);
       throw ieo;
      }catch (Exception e) {
       LOG.info(getName() + ": readAndProcess threwexception " + e + ". Count of bytes read: " + count, e);
       count = -1; //so that the (count < 0) blockis executed
     }
      if(count < 0) {
       if (LOG.isDebugEnabled())
         LOG.debug(getName() + ":disconnecting client " + 
                 c + ". Number of active connections: "+
                 numConnections);
       closeConnection(c);
       c = null;
     }
      else{
      c.setLastContact(System.currentTimeMillis());
     }
    }  

    public intreadAndProcess() throws IOException, InterruptedException{
      while(true) {
          
       int count = -1;
       if (dataLengthBuffer.remaining() > 0) {
         count =channelRead(channel, dataLengthBuffer);      
         if (count < 0 ||dataLengthBuffer.remaining() > 0) 
           returncount;
       }
     
       if (!rpcHeaderRead) {
         //Every connection isexpected to send the header.
         if (rpcHeaderBuffer == null){
          rpcHeaderBuffer = ByteBuffer.allocate(2);
         }
         count =channelRead(channel, rpcHeaderBuffer);
         if (count < 0 ||rpcHeaderBuffer.remaining() > 0) {
           returncount;
         }
         int version =rpcHeaderBuffer.get(0);
         byte[] method = new byte[]{rpcHeaderBuffer.get(1)};
         authMethod =AuthMethod.read(new DataInputStream(
            new ByteArrayInputStream(method)));
         dataLengthBuffer.flip();        
         if(!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION){
           //Warningis ok since this is not supposed to happen.
          LOG.warn("Incorrect header or version mismatch from "+ 
                  hostAddress + ":" + remotePort+
                  " got version " + version+ 
                  " expected version " +CURRENT_VERSION);
           return-1;
         }
        dataLengthBuffer.clear();
         if (authMethod == null){
           throw newIOException("Unable to read authentication method");
         }
         if (isSecurityEnabled&& authMethod == AuthMethod.SIMPLE) {
          AccessControlException ae = new AccessControlException(
              "Authentication isrequired");
          setupResponse(authFailedResponse, authFailedCall,Status.FATAL,
              null,ae.getClass().getName(), ae.getMessage());
          responder.doRespond(authFailedCall);
           throwae;
         }
         if (!isSecurityEnabled&& authMethod != AuthMethod.SIMPLE) {
          doSaslReply(SaslStatus.SUCCESS, new IntWritable(
             SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
           authMethod= AuthMethod.SIMPLE;
           // clienthas already sent the initial Sasl message and we
           // shouldignore it. Both client and server should fall back
           // tosimple auth from now on.
          skipInitialSaslHandshake = true;
         }
         if (authMethod !=AuthMethod.SIMPLE) {
           useSasl =true;
         }
         
         rpcHeaderBuffer = null;
         rpcHeaderRead = true;
         continue;
       }
       
       if (data == null) {
        dataLengthBuffer.flip();
         dataLength =dataLengthBuffer.getInt();
      
         if (dataLength ==Client.PING_CALL_ID) {
          if(!useWrap) { //covers the !useSasl too
            dataLengthBuffer.clear();
            return 0;  //ping message
           }
         }
         if (dataLength < 0){
          LOG.warn("Unexpected data length " + dataLength + "!! from "+ 
              getHostAddress());
         }
         data =ByteBuffer.allocate(dataLength);
       }
       
       count = channelRead(channel, data);
       
       if (data.remaining() == 0) {
        dataLengthBuffer.clear();
         data.flip();
         if (skipInitialSaslHandshake){
           data =null;
          skipInitialSaslHandshake = false;
          continue;
         }
         boolean isHeaderRead =headerRead;
         if (useSasl) {
          saslReadAndProcess(data.array());
         } else {
          processOneRpc(data.array());
         }
         data = null;
         if (!isHeaderRead) {
          continue;
         }
       } 
       return count;
     }
    }

 private intchannelRead(ReadableByteChannelchannel, 
                      ByteBuffer buffer) throws IOException {
    
    int count =(buffer.remaining() <= NIO_BUFFER_LIMIT) ?
              channel.read(buffer) :channelIO(channel, null, buffer);
    if (count > 0){
     rpcMetrics.incrReceivedBytes(count);
    }
    return count;
  }

 private void processOneRpc(byte[]buf) throws IOException,
       InterruptedException {
      if(headerRead) {
       processData(buf);
      }else {
       processHeader(buf);
       headerRead = true;
       if (!authorizeConnection()) {
         throw newAccessControlException("Connection from " + this
            + " for protocol " + header.getProtocol()
            + " is unauthorized for user " + user);
       }
     }
    }

    private voidprocessData(byte[] buf) throws IOException, InterruptedException {
     DataInputStream dis =
       new DataInputStream(newByteArrayInputStream(buf));
      intid = dis.readInt();                 // try to read an id
       
      if(LOG.isDebugEnabled())
       LOG.debug(" got #" + id);

     Writable param = ReflectionUtils.newInstance(paramClass,conf);//read param
     param.readFields(dis);       
       
     Call call = new Call(id, param, this);
     callQueue.put(call);            // queue the call; maybeblocked here
     incRpcCount();  // Increment the rpc count
    }

org.apache.hadoop.ipc.Server.Listener.Reader线程接受客户端输入,将其包装为Call对象,放置在callQueue( 一个BlockingQueue )中。

2个org.apache.hadoop.ipc.Server.Handler 线程

@Override
    public void run(){
     LOG.info(getName() + ": starting");
     SERVER.set(Server.this);
     ByteArrayOutputStream buf = 
       newByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
      while(running) {
       try {
         final Call call =callQueue.take(); // pop the queue; maybe blocked here

         if(LOG.isDebugEnabled())
          LOG.debug(getName() + ": has #" + call.id + " from " +
                   call.connection);
         
         String errorClass =null;
         String error = null;
         Writable value = null;

         CurCall.set(call);
         try {
           // Makethe call as the user via Subject.doAs, thus associating
           // thecall with the Subject
           if(call.connection.user == null) {
            value = call(call.connection.protocol,call.param, 
                       call.timestamp);
           } else{
            value = 
             call.connection.user.doAs
                (newPrivilegedExceptionAction() {
                  @Override
                  public Writable run() throwsException {
                    // make thecall
                    returncall(call.connection.protocol, 
                              call.param,call.timestamp);

                  }
                }
                );
           }
         } catch (Throwable e) {
          LOG.info(getName()+", call "+call+": error: " + e, e);
           errorClass= e.getClass().getName();
           error =StringUtils.stringifyException(e);
         }
         CurCall.set(null);
         synchronized(call.connection.responseQueue) {
           //setupResponse() needs to be sync'ed togetherwith 
           //responder.doResponse() since setupResponse may use
           // SASL toencrypt response data and SASL enforces
           // its ownmessage ordering.
          setupResponse(buf, call, 
                     (error ==null) ? Status.SUCCESS : Status.ERROR, 
                     value,errorClass, error);
         // Discard the large buf andreset it back to 
         // smaller size to freeupheap
         if (buf.size() >maxRespSize) {
          LOG.warn("Large response size " + buf.size() + " for call "+ 
              call.toString());
            buf = newByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
           }
          responder.doRespond(call);
         }
       } catch (InterruptedException e) {
         if (running) {                     // unexpected -- log it
          LOG.info(getName() + " caught: " +
                 StringUtils.stringifyException(e));
         }
       } catch (Exception e) {
         LOG.info(getName() + "caught: " +
                StringUtils.stringifyException(e));
       }
     }
     LOG.info(getName() + ": exiting");
    }

call的一个实现例子,直接调用反射

public Writable call(Class  protocol,Writable param, long receivedTime) 
    throws IOException{
      try{
       Invocation call = (Invocation)param;
       if (verbose) log("Call: " + call);

       Method method =
        protocol.getMethod(call.getMethodName(),
                             call.getParameterClasses());
       method.setAccessible(true);

       long startTime =System.currentTimeMillis();
       Object value = method.invoke(instance,call.getParameters());
       int processingTime = (int)(System.currentTimeMillis() - startTime);
       int qTime = (int)(startTime-receivedTime);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Served: " +call.getMethodName() +
                 " queueTime= " + qTime +
                 " procesingTime= " + processingTime);
       }
       rpcMetrics.addRpcQueueTime(qTime);
      rpcMetrics.addRpcProcessingTime(processingTime);
      rpcMetrics.addRpcProcessingTime(call.getMethodName(),processingTime);
       if (verbose) log("Return: "+value);

       return newObjectWritable(method.getReturnType(), value);

      }catch (InvocationTargetException e) {
       Throwable target = e.getTargetException();
       if (target instanceof IOException) {
         throw(IOException)target;
       } else {
         IOException ioe = newIOException(target.toString());
        ioe.setStackTrace(target.getStackTrace());
         throw ioe;
       }
      }catch (Throwable e) {
       if (!(e instanceof IOException)) {
         LOG.error("Unexpectedthrowable object ", e);
       }
       IOException ioe = newIOException(e.toString());
       ioe.setStackTrace(e.getStackTrace());
       throw ioe;
     }
    }
  }

private void setupResponse(ByteArrayOutputStreamresponse, 
                         Call call,Status status, 
                         Writablerv, String errorClass, String error) 
  throws IOException {
    response.reset();
    DataOutputStream out =new DataOutputStream(response);
    out.writeInt(call.id);             // write call id
   out.writeInt(status.state);          // writestatus

    if (status ==Status.SUCCESS) {
     rv.write(out);
    } else {
     WritableUtils.writeString(out, errorClass);
     WritableUtils.writeString(out, error);
    }
    if(call.connection.useWrap) {
     wrapWithSasl(response, call);
    }
   call.setResponse(ByteBuffer.wrap(response.toByteArray()));
  }

  void doRespond(Call call) throwsIOException {
     synchronized (call.connection.responseQueue) {
      call.connection.responseQueue.addLast(call);
       if (call.connection.responseQueue.size() == 1){
        processResponse(call.connection.responseQueue, true);
       }
     }
    }

    private booleanprocessResponse(LinkedList responseQueue,
                               booleaninHandler) throws IOException {
     boolean error = true;
     boolean done = false;      // there is more data for this channel.
      intnumElements = 0;
      Callcall = null;
      try{
       synchronized (responseQueue) {
         //
         // If there are no items forthis channel, then we are done
         //
         numElements =responseQueue.size();
         if (numElements == 0) {
           error =false;
           returntrue;            // no more data for this channel.
         }
         //
         // Extract the firstcall
         //
         call =responseQueue.removeFirst();
         SocketChannel channel =call.connection.channel;
         if (LOG.isDebugEnabled()){
          LOG.debug(getName() + ": responding to #" + call.id + " from "+
                   call.connection);
         }
         //
         // Send as much data as wecan in the non-blocking fashion
         //
         int numBytes =channelWrite(channel, call.response);
         if (numBytes < 0) {
           returntrue;
         }
         if(!call.response.hasRemaining()) {
          call.connection.decRpcCount();
           if(numElements == 1) {    // lastcall fully processes.
            done = true;           // no more data for thischannel.
           } else{
            done = false;          // more calls pending to besent.
           }
           if(LOG.isDebugEnabled()) {
            LOG.debug(getName() + ": responding to #" +call.id + " from " +
                    call.connection + " Wrote " + numBytes + " bytes.");
           }
         } else {
           //
           // If wewere unable to write the entire response out,then 
           // insertin Selector queue. 
           //
          call.connection.responseQueue.addFirst(call);
          
           if(inHandler) {
            // set the serve time when the response has tobe sent later
            call.timestamp =System.currentTimeMillis();
            
            incPending();
            try {
              // Wakeup the thread blockedon select, only then can the call 
              // to channel.register()complete.
             writeSelector.wakeup();
             channel.register(writeSelector, SelectionKey.OP_WRITE,call);
            } catch (ClosedChannelException e) {
              //Its ok. channel might beclosed else where.
              done = true;
            } finally {
              decPending();
            }
           }
           if(LOG.isDebugEnabled()) {
            LOG.debug(getName() + ": responding to #" +call.id + " from " +
                    call.connection + " Wrote partial " + numBytes+ 
                     "bytes.");
           }
         }
         error = false;           // everything went off well
       }
      }finally {
       if (error && call != null) {
         LOG.warn(getName()+", call "+ call + ": output error");
         done = true;            // error. no more data for this channel.
        closeConnection(call.connection);
       }
     }
     return done;
    }

 private intchannelWrite(WritableByteChannelchannel, 
                       ByteBuffer buffer) throwsIOException {
    
    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT)?
              channel.write(buffer) : channelIO(null, channel,buffer);
    if (count > 0){
     rpcMetrics.incrSentBytes(count);
    }
    return count;
  }
org.apache.hadoop.ipc.Server.Handler 线程从callQueue拿出Call,使用反射调用方法运行,将结果放入responseQueue或者直接写给客户端

1org.apache.hadoop.ipc.Server.Responder 

 private Selector writeSelector;
    private int pending;       // connections waiting to register
    
    final static intPURGE_INTERVAL = 900000; // 15mins

    Responder() throwsIOException {
     this.setName("IPC Server Responder");
     this.setDaemon(true);
     writeSelector = Selector.open(); // create a selector
     pending = 0;
    }

    @Override
    public void run(){
     LOG.info(getName() + ": starting");
     SERVER.set(Server.this);
      longlastPurgeTime = 0;   // last check for oldcalls.

      while(running) {
       try {
         waitPending();    // If a channel is beingregistered, wait.
        writeSelector.select(PURGE_INTERVAL);
         Iterator iter =writeSelector.selectedKeys().iterator();
         while (iter.hasNext()){
          SelectionKey key = iter.next();
          iter.remove();
           try{
            if (key.isValid() &&key.isWritable()) {
               doAsyncWrite(key);
            }
           } catch(IOException e) {
            LOG.info(getName() + ": doAsyncWrite threwexception " + e);
           }
         }
         long now =System.currentTimeMillis();
         if (now < lastPurgeTime +PURGE_INTERVAL) {
          continue;
         }
         lastPurgeTime = now;
         //
         // If there were some callsthat have not been sent out for a
         // long time, discardthem.
         //
         LOG.debug("Checking for oldcall responses.");
         ArrayList calls;
         
         // get the list of channelsfrom list of keys.
         synchronized(writeSelector.keys()) {
           calls =new ArrayList(writeSelector.keys().size());
           iter =writeSelector.keys().iterator();
           while(iter.hasNext()) {
            SelectionKey key = iter.next();
            Call call = (Call)key.attachment();
            if (call != null && key.channel() ==call.connection.channel) { 
              calls.add(call);
            }
           }
         }
         
         for(Call call : calls){
           try{
            doPurge(call, now);
           } catch(IOException e) {
            LOG.warn("Error in purging old calls " +e);
           }
         }
       } catch (OutOfMemoryError e) {
         //
         // we can run out of memoryif we have too many threads
         // log the event and sleepfor a minute and give
         // some thread(s) a chance tofinish
         //
         LOG.warn("Out of Memory inserver select", e);
         try { Thread.sleep(60000); }catch (Exception ie) {}
       } catch (Exception e) {
         LOG.warn("Exception inResponder " + 
                StringUtils.stringifyException(e));
       }
     }
     LOG.info("Stopping " + this.getName());
    }



 private voiddoAsyncWrite(SelectionKey key) throws IOException {
      Callcall = (Call)key.attachment();
      if(call == null) {
       return;
     }
      if(key.channel() != call.connection.channel) {
       throw new IOException("doAsyncWrite: badchannel");
     }

     synchronized(call.connection.responseQueue) {
       if(processResponse(call.connection.responseQueue, false)){
         try {
          key.interestOps(0);
         } catch(CancelledKeyException e) {
          
          LOG.warn("Exception while changing ops : " + e);
         }
       }
     }
    }

processResponse 的实现在org.apache.hadoop.ipc.Server.Handler中也有调用

// Remove calls that have beenpending in the responseQueue 
    // for a longtime.
    //
    private voiddoPurge(Call call, long now) throws IOException {
     LinkedList responseQueue = call.connection.responseQueue;
     synchronized (responseQueue) {
       Iterator iter =responseQueue.listIterator(0);
       while (iter.hasNext()) {
         call = iter.next();
         if (now > call.timestamp +PURGE_INTERVAL) {
          closeConnection(call.connection);
          break;
         }
       }
     }
    }


免责声明:文章转载自《Hadoop源码分析5: RPC基本线程》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇史上最全 Java 中各种锁的介绍js获取图片信息(一)-----获取图片的原始尺寸下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

element dropdown源码

dropdown.vue <script>import Clickoutside from 'element-ui/src/utils/clickoutside'; import Emitter from 'element-ui/src/mixins/emitter'; import Migrating from 'element-ui...

Java IO 关闭流的方式

Java IO 关闭流的方式 分类 练习:将分割文件中的流关闭方式改为finally形式 练习:文件合并中的流关闭方式改为try()形式 传送门:这里更详细 分类 在try中关闭 弊端是如果文件不存在或者读取的时候有问题而抛出异常,那么就不会执行流的关闭语句,存在资源占用隐患 在finally中关闭 这是标准的关闭流的方式 1、首先把引用声...

原!linux机器 配置自动scp脚本

 方式一: 1.安装相关依赖包 yum install -y tcl tclx tcl-develyum -y install expect 2.脚本 scp.sh #!/usr/bin/expect #获取输入参数set f1 [lindex $argv 0]set f2 [lindex $argv 1]set dir [lindex $argv 2]s...

Node.js源码初探~我很好奇

前言: 最近在看Node.js,看了一段时间后便想着看看Node.js源码,自己本地调试调试;现在便说说这个过程中的坑,以及一些需要注意的地方;       Node.js需要一定C++基础,建议看完C++Primer再看,否则V8的好多表达方式,指针,引用,模板之类的会看不懂;       代码已上传GitHub地址:   https://github....

SQLiteHelperSQLite帮助类

最近做项目用到了SQLite数据库,就自己写了个SQLite帮助类,类似于SQLHelper。 不过是按照我常用方式写的,主要与SQLHelper不同的是 1、这个帮助类并没有内置ConnectionString,是需要在调用方法的时候指定的,这样的好处的是:在一般的三层架构时都会在Helper里指定一个数据库连接,但是如果我又想用这个帮助类但是我想查询其...

架构师技能体系

一、构成架构师的技能体系二、阅读源码,分析源码知识点总汇这张图详细介绍了源码中所用到的经典设计思想及常用设计模式,先打好内功基础,了解大牛是如何写代码的,从而吸收大牛的代码功力。 结合Spring5和MyBatis源码,带你理解作者框架思维,帮助大家寻找分析源码的切入点,在思想上来一次巨大的升华。 这个任务感觉是“成为一个高级Java开发工程师”,即对常用...