前言
上篇文章介绍了 NioEndpoint,其中讲到了在 NioEndpoint#startInternal 方法里创建并启动了 Acceptor 和 Poller,线程。本篇文章先看 Acceptor,下篇文章再看 Poller。
1. Acceptor
Acceptor 的构造方法声明为:
private final AbstractEndpoint<?,U> endpoint;
public Acceptor(AbstractEndpoint<?,U> endpoint) {
this.endpoint = endpoint;
}
其中 endpoint 参数是在 NioEndpoint#startAcceptorThreads 方法里 new Acceptor 时传入的 NioEndpoint 对象。
Acceptor 实现了 Runnable 方法,因此它的 run 方法是 Acceptor 的关键。
@Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (endpoint.isRunning()) { // Loop if endpoint is paused while (endpoint.isPaused() && endpoint.isRunning()) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!endpoint.isRunning()) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait endpoint.countUpOrAwaitConnection(); // Endpoint might have been paused while waiting for latch // If that is the case, don't accept new connections if (endpoint.isPaused()) { continue; } U socket = null; try { // Accept the next incoming connection from the server // socket socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { // We didn't get a socket endpoint.countDownConnection(); if (endpoint.isRunning()) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (endpoint.isRunning() && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); String msg = sm.getString("endpoint.accept.fail"); // APR specific. // Could push this down but not sure it is worth the trouble. if (t instanceof Error) { Error e = (Error) t; if (e.getError() == 233) { // Not an error on HP-UX so log as a warning // so it can be filtered out on that platform // See bug 50273 log.warn(msg, t); } else { log.error(msg, t); } } else { log.error(msg, t); } } } state = AcceptorState.ENDED; }
run 方法的代码被包裹在一个 while 循环里,while 循环的判断条件是 endpoint.isRunning(),也就是 NioEndpoint 的父类 AbstractEndpoint 里的 running 字段。
/**
* Running state of the endpoint.
*/
protected volatile boolean running = false;
这个 running 字段在 NioEndpoint#startInternal 方法里被置为 true。在 NioEndpoint#stopInternal 方法里 running 置为 false,
在最外层的 while 循环里,就是 run 方法的核心了。
// Loop if endpoint is paused while (endpoint.isPaused() && endpoint.isRunning()) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } }
首先判断 endpoint.isPaused() 是不是为 true,如果是就让线程 sleep 50毫秒,并把 Acceptor 的状态设置为 AcceptorState.PAUSED。
这个 endpoint.isPaused() 跟 isRunning 方法类似,也就是判断
AbstractEndpoint 里的一个 paused 属性,起声明如下
/** * Will be set to true whenever the endpoint is paused. */ protected volatile boolean paused = false;
这个 pause 的是在 AbstractEndpoint#pause 里置为 true 的。
然后把 Acceptor 的状态改为 AcceptorState.RUNNING。
接着进入 try 语句块。首先调用 endpoint.countUpOrAwaitConnection()
//if we have reached max connections, wait endpoint.countUpOrAwaitConnection();
protected void countUpOrAwaitConnection() throws InterruptedException { if (maxConnections==-1) return; LimitLatch latch = connectionLimitLatch; if (latch!=null) latch.countUpOrAwait(); }
可以看出,countUpOrAwaitConnection 这个方法是判断是否已超过 maxConnections,如果是就调用 latch.countUpOrAwait() 等待。
然后调用 endpoint.serverSocketAccept() 方法,返回一个泛型对象,这个泛型对象的具体类型在 NioEndpoint 对象中就确立了。
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
public abstract class AbstractJsseEndpoint<S,U> extends AbstractEndpoint<S,U>
从 NioEndpoint 和 AbstractJsseEndpoint 的声明中可以看出,泛型 U 的具体类型是 SocketChannel。即 java.nio.channels.SocketChannel。
也就是说 endpoint.serverSocketAccept() 获取的是一个 SocketChannel 对象。
@Override protected SocketChannel serverSocketAccept() throws Exception { return serverSock.accept(); }
serverSocketAccept 就是简单调用 serverSock.accept() 方法获取一个 SocketChannel 对象。在 nio 编程里,可以认为一个 SocketChannel 对象代表一个服务端与客户端的连接。
这个 serverSock 就是在 NioEndpoint#initServerSocket() 里调用 ServerSocketChannel.open() 初始化的。
拿到这个 SocketChannel 对象之后就配置这个对象
// Configure the socket if (endpoint.isRunning() && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); }
上面代码的逻辑很简单,就是调用 endpoint.setSocketOptions(socket) 方法,如果不成功就调用 endpoint.closeSocket(socket) 方法。destroySocket(socket) 方法内部也是调用 closeSocket 方法。
protected void destroySocket(U socket) { closeSocket(socket); }
@Override
protected void closeSocket(SocketChannel socket) {
countDownConnection();
try {
socket.socket().close();
} catch (IOException ioe) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.close"), ioe);
}
}
try {
socket.close();
} catch (IOException ioe) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.close"), ioe);
}
}
}
closeSocket 在 NioEndpoint 里,而 destroySocket 在 AbstractEndpoint 里。closeSocket 方法逻辑很简单就是调用 SocketChannel.socket().close() 和 SocketChannel.close() 方法。
关键地方在于 endpoint.setSocketOptions(socket) 方法。
1.1 NioEndpoint#setSocketOptions
/**
* Process the specified connection.
* @param socket The socket channel
* @return <code>true</code> if the socket was correctly configured
* and processing may continue, <code>false</code> if the socket needs to be
* close immediately
*/
@Override
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
setSocketOptions 方法里,首先用 socketProperties 给这个 SocketChannel 对象的 Socket 设置了一些属性。
然后,从 nioChannels 这个 SynchronizedStack<NioChannel> 缓存池里获取一个 NioChannel 对象,如果获取不到就创建一个,创建的 NioChannel 对象的时候也创建了一个 SocketBufferHandler 对象。
public SocketBufferHandler(int readBufferSize, int writeBufferSize, boolean direct) { this.direct = direct; if (direct) { readBuffer = ByteBuffer.allocateDirect(readBufferSize); writeBuffer = ByteBuffer.allocateDirect(writeBufferSize); } else { readBuffer = ByteBuffer.allocate(readBufferSize); writeBuffer = ByteBuffer.allocate(writeBufferSize); } }
SocketBufferHandler 对象里包含了两个 ByteBuffer 对象,一个读一个写。
protected SocketChannel sc = null; protected final SocketBufferHandler bufHandler; public NioChannel(SocketChannel channel, SocketBufferHandler bufHandler) { this.sc = channel; this.bufHandler = bufHandler; }
NioChannel 封装了对 SocketChannel 对象的读写操作。
最后 setSocketOptions 里调用了 getPoller0().register(channel)。
private Poller[] pollers = null;
private AtomicInteger pollerRotater = new AtomicInteger(0);
public Poller getPoller0() {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
getPoller0() 方法就是从 pollers 数组里选一个 Poller 对象,选取的算法是轮询选取。
选出 Poller 对象后,调用其 register(channel) 方法。
1.2 NioEndpoint#Poller#register
/**
* Registers a newly created socket with the poller.
*
* @param socket The newly created socket
*/
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}
register 方法传入的参数是 NioChannel 而不是 SocketChannel 了,SocketChannel 已经与 NioChannel 关联了。
register 第一行就调用 NioChannel#setPoller 方法,把当前 Poller 对象复制给 NioChannel 的属性,将 NioChannel 对象与 Poller 对象关联起来。
接着 创建了一个 NioSocketWrapper 对象并设置了相关属性,其中最重要的是 ka.interestOps(SelectionKey.OP_READ) 这一行设置了 NioSocketWrapper 所感兴趣的操作。
然后把 NioChannel 对象与 NioSocketWrapper 对象关联起来。
public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) { super(channel, endpoint); pool = endpoint.getSelectorPool(); socketBufferHandler = channel.getBufHandler(); }
NioSocketWrapper 的声明为
public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {
SocketWrapperBase 的构造方法为
public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) { this.socket = socket; this.endpoint = endpoint; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.blockingStatusReadLock = lock.readLock(); this.blockingStatusWriteLock = lock.writeLock(); }
public static class NioSocketWrapper extends SocketWrapperBase<NioChannel>
public abstract class SocketWrapperBase<E>
SocketWrapperBase 声明里有一个泛型 E,而 NioSocketWrapper 的声明里,泛型 E 的具体类型则是 NioChannel。
register 方法的最后从 eventCache 缓存池里获取一个 PollerEvent 对象,如果获取不到就创建一个 PollerEvent 对象。
private NioChannel socket; private int interestOps; private NioSocketWrapper socketWrapper; public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) { reset(ch, w, intOps); } public void reset(NioChannel ch, NioSocketWrapper w, int intOps) { socket = ch; interestOps = intOps; socketWrapper = w; }
创建 PollerEvent 对象时传入的参数分别是前面的 NioChannel 、NioSocketWrapper 对象,以及一个 int 类型的常量 OP_REGISTER,分别赋值给 PollerEvent 的属性,另外 PollerEvent 也实现了 Runnable 接口,这几个属性在 PollerEvent#run 方法里都有对应的作用。
拿到 PollerEvent 对象后,调用 addEvent(r) 方法把这个对象加入的队列中等待后续 Poller 线程的处理。
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>(); private void addEvent(PollerEvent event) { events.offer(event); if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); }
events 缓存的 PollerEvent 对象,会在 Poller#run 方法里被处理。
小结
本文分析了 Acceptor 的 run 方法,也就是 Acceptor 线程做的事情。可以看出 Acceptor 线程在一个循环里一直接受客户端连接,生成 SocketChannel 对象,并把这个 SocketChannel 对象封装成 NioChannel 和 NioSocketWrapper 对象,并把这两个对象放在一个 PollerEvent 对象里,并把这个 PollerEvent 对象加入的缓存池里等待 Poller 线程的处理。
原文
https://segmentfault.com/a/1190000022181431
本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » Tomcat源码解析系列(十三)Acceptor