Bootstrap是netty中用于创建Server、Client端代码的构造工具类,里面包括了启动Server、Client需要的配置信息等。下面用一段代码来初步了解下。
// 创建ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
// 创建用户acceptor和dispatcher io event的io线程池
EventLoopGroupbossGroup=newNioEventLoopGroup();
EventLoopGroupworkerGroup=newNioEventLoopGroup();
try{
// 设置server的channel类型为NioServerSocketChannel
bootstrap.channel(NioServerSocketChannel.class)
// 设置对应的io线程池
.group(bossGroup, workerGroup)
// 设置新建连接的处理器
.childHandler(newChannelInitializer<SocketChannel>() {
protectedvoidinitChannel(SocketChannelch)throwsException{
// 对于新建的连接,在它的ChannelPipeline上增加一个EchoHandler
ch.pipeline().addLast(newEchoHandler());
}
});
// 绑定到8090
ChannelFuturebind=bootstrap.bind(8090);
// netty中大多数操作都是异步的,所以上面的bind方法会立刻返回并返回一个Future, 需要sync阻塞等待bind成功
bind.sync();
// 一直阻塞当前线程知道server channel关闭
bind.channel().closeFuture().sync();
}finally{
// 关闭线程池
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
}
主要的代码类为
io.netty.bootstrap.Bootstrap
io.netty.bootstrap.ServerBootstrap
io.netty.bootstrap.AbstractBootstrap
io.netty.bootstrap.AbstractBootstrapConfig
io.netty.bootstrap.ServerBootstrapConfig
io.netty.bootstrap.BootstrapConfig
AbstractBootstrap通过提供chain方法链提供方面的Channel配置方式。
这里要了解一种很多代码中用到的看上去不太好懂的 Hierarchical Builder
模式。
public abstract class AbstractBootstrap<Bextends AbstractBootstrap<B,C>,Cextends Channel>implementsCloneable{
...
/**
* The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
* {@link Channel}
*/
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return self();
}
@SuppressWarnings("unchecked")
private B self() {
return (B) this;
}
返回了B类型的对象,这样又可以通过其他方法返回B的串联起来,这些方法最后都调用了B self()返回this, 这个this的类型是子类型,这样子类就可以直接使用而不必强行转换父类型到子类型了。
这时ServerBootstrap继承AbstractBootstrap并将B设置为自己时,就可以方便的在AbstractBootstrap基础上增加配置方法了。
public class ServerBootstrapextends AbstractBootstrap<ServerBootstrap,ServerChannel>{
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
...
}
这里的ServerBootstrap在AbstractBootstrap的基础上又添加了group(parentGroup, childGroup)的方法。所以这种写法很适合多级Builder。
当子类继承
AbstratBootstrapConfig向外暴露AbstractBootstrap的配置。
public abstract class AbstractBootstrapConfig<Bextends AbstractBootstrap<B,C>,Cextends Channel>{
protected final B bootstrap;
protected AbstractBootstrapConfig(B bootstrap) {
this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap");
}
/**
* Returns the configured local address or {@code null} if non is configured yet.
*/
public final SocketAddress localAddress() {
return bootstrap.localAddress();
}
AbstractBootstrapConfig提供基础的AbstractBootstrap拥有的配置,具体子类又可以提供其具体AbstractBootstrap的配置,
例如ServerBootstrapConfig中B为ServerBoostrap,就可以返回更多的ServerBootstrap中的方法例如childGroup等。
ublic final class ServerBootstrapConfigextends AbstractBootstrapConfig<ServerBootstrap,ServerChannel>{
ServerBootstrapConfig(ServerBootstrap bootstrap) {
super(bootstrap);
}
/**
* Returns the configured {@link EventLoopGroup} which will be used for the child channels or {@code null}
* if non is configured yet.
*/
@SuppressWarnings("deprecation")
public EventLoopGroup childGroup() {
return bootstrap.childGroup();
}
...
通常在给ServerBootstrap配置好各种参数后的最后一步就是bind。
Server的bind将创建一个本地的ServerSocket绑定到对应端口上,并且配置接受到新的socket后的处理等。
下面看一下具体的bind过程。
AbstractBootstrap中
/**
* Create a new {@linkChannel} and bind it.
*/
public ChannelFuturebind(SocketAddress localAddress){
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
private ChannelFuturedoBind(final SocketAddress localAddress){
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
通过 ServerBootstrap.channel(NioServerSocketChannel.class)
这样配置的ServerBootstrap
使用的是 ReflectiveChannelFactory
, 通过对应的类的构造器反射创建Channel对象。
ServerBootstrap的init方法中将前面配置的Bootstrap的attr、option等设置到Channel上
然后给当前ServerChannel增加一个ChannelInitializer, initChannel会在Channel注册完成后调用,
这里会给Channel添加Bootstrap中配置的Handler,然后给继续在pipeline上添加一个ServerBootstrapAcceptor。
ServerBootstrapAcceptor就是ServerSocket接受到新建的socket连接的处理。
注意到这里给pipeline添加ServerBootstrapAcceptor放到的channel的eventloop中去执行,这样做的原因是
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor。
ServerBootstrapAcceptor就是ServerSocket接受到新建的socket连接的处理。(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
ServerBootstrapAcceptor
首先给接受到的Channel(child)添加配置的childHandler、设置ChannelOption、ChannelAttribute等。
然后注册到childGroup上,对应于上面示例代码中的workerGroup。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
注册到EventLoopGruop上
ChannelFuture regFuture = config().group().register(channel);
NioEventLoopGroup
继承于 MultithreadEventLoopGroup
,next()方法会round-robin的方式选出一个NioEventLoop,
然后设置Channel的eventLoop为这个NioEventLoop然后发出channelRegister事件等。@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
MultithreadEventLoopGroup
@Override
public ChannelFutureregister(Channel channel){
return next().register(channel);
}
public EventExecutornext(){
return chooser.next();
}
chooser的作用是从EventExecutor中选出一个作为next的返回结果
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
NioEventLoop
继承于 SingleThreadEventLoop
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
AbstractChannel
这里register中的处理是
设置当前Channel的eventLoop,fireChannelRegister事件,如果当前Channel是active状态并且是第一次注册则fireChannelActive事件
public final void register(EventLoop eventLoop,final ChannelPromise promise){
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run(){
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
initAndRegister完成后,会进行doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run(){
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
doBind0调用AbstractChannel的bind方法,然后调用pipeline.bind。
后面会讲到ChannelPipeline,ChannelPipeline中有一个特殊的ChannelHandler是HeadContext,作为pipelien的head节点。
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
unsafe是一个接口,不同的io channel有不同的实现,
NioServerSocketChannel
可以看到java7以上使用jdk的ServerSocketChannel.bind方法,小于的版本得到对应的socket后进行bind
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}