Channel 概念与 java.nio.channel 概念一致, 用以连接IO设备 (socket, 文件等) 的纽带. 例如将网络的读、写, 客户端发起连接, 主动关闭连接, 链路关闭, 获取通信双方的网络地址等.
Channel 的 IO 类型主要有两种: 非阻塞IO (NIO) 以及阻塞IO(OIO).
数据传输类型有两种: 按事件消息传递 (Message) 以及按字节传递 (Byte).
适用方类型也有两种: 服务器(ServerSocket) 以及客户端(Socket). 还有一些根据传输协议而制定的的Channel, 如: UDT、SCTP等.
Netty 按照类型逐层设计相应的类. 最底层的为抽象类 AbstractChannel , 再以此根据IO类型、数据传输类型、适用方类型实现. 类图可以一目了然, 如下图所示:
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
从注释里面可以看到是在 Channel 绑定到 Eventloop 上面的时候调用的.
不管是 Server 还是 Client, 绑定到 Eventloop 的时候, 最终都是调用 Abstract.initAndRegister() 这个方法上(Server是在 AbstractBootstrap.doBind() 的时候调用的, Client 是在 Bootstrap.doConnect() 的时候调用的).
initAndRegister() 方法定义如下:
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 把channel绑定到Eventloop对象上面去
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
继续跟踪下去会定位到 AbstractChannel.AbstractUnsafe.register0() 方法上.
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 做实际的绑定动作。把Channel感兴趣的事件注册到Eventloop.selector上面.具体实现在Abstract.doRegister()方法内
doRegister();
neverRegistered = false;
registered = true;
// 通过pipeline的传播机制,触发handlerAdded事件
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 通过pipeline的传播机制,触发channelRegistered事件
pipeline.fireChannelRegistered();
// 还没有绑定,所以这里的 isActive() 返回false.
if (isActive()) {
if (firstRegistration) {
// 如果当前链路已经激活,则调用channelActive()方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
从上面的代码也可以看出, 在调用完 pipeline.fireChannelRegistered() 之后, 紧接着会调用 isActive() 判断当前链路是否激活, 如果激活了则会调用 pipeline.fireChannelActive() 方法.
这个时候, 对于 Client 和 Server 都还没有激活, 所以, 这个时候不管是 Server 还是 Client 都不会调用 pipeline.fireChanenlActive() 方法.
从启动器的 bind() 接口开始, 往下调用 doBind() 方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化及注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 调用 doBind0
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
....
}
}
doBind 方法又会调用 doBind0() 方法, 在 doBind0() 方法中会通过 EventLoop 去执行 channel 的 bind() 任务.
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 调用channel.bind接口
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
doBind0() 方法往下会调用到 pipeline.bind(localAddress, promise) ; 方法, 通过 pipeline 的传播机制, 最终会调用到 AbstractChannel.AbstractUnsafe.bind() 方法, 这个方法主要做两件事情:
doBind() pipeline.fireChannelActive()
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
....
// wasActive 在绑定成功前为 false
boolean wasActive = isActive();
try {
// 调用doBind()调用JDK底层API进行端口绑定
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// 完成绑定之后,isActive() 返回true
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 触发channelActive事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
也就是说当有新客户端连接的时候, 会变成活动状态.
fireChannelnactive() 方法在两个地方会被调用: Channel.close() 和 Channel.disconnect() .
在调用前会先确认状态是从 Active ---> Inactive .
fireChannelUnregistered() 方法是在 Channel 从 Eventloop 中解除注册的时候被调用的. Channel.close() 的时候被触发执行.
handlerAdded() : 添加到 ChannelPipeline 时调用.
handlerRemoved() : 从 ChannelPipeline 中移除时调用.
exceptionCaught() : 处理过程中在 ChannelPipeline 中有错误产生时调用.
处理 I/O 事件或截获 I/O 操作, 并将其转发到 ChannelPipeline 中的下一个处理程序. ChannelHandler 本身不提供许多方法, 但通常必须实现其子类型之一:
ChannelInboundHandler ChannelOutboundHandler
channelRegistered() : 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用.
channelUnregistered() : 当 Channel 从他的 EventLoop 注销并且无法处理任何 I/O 时被调用.
channelActive() : 当 Channel 处于活动状态时被调用.
channelInactive() : 当 Channel 离开活动状态并且不再连接远程节点时被调用.
channelRead() : 当从 Channel 读取数据时被调用.
channelReadComplete() : 当 Channel 上的一个读操作完成时被调用. 当所有可读字节都从 Channel 中读取之后, 将会调用该回调方法.
出站操作和数据将由 ChannelOutboundHandler 处理. 它的方法将被 Channel ChannelPipeline 以及 ChannelHandlerContext 调用.
ChannelOutboundHandler 的一个强大的功能是可以按需推迟操作或事件, 这使得可以通过一些复杂的方法来处理请求. 例如, 如果到远程节点的写入被暂停, 那么你可以推迟刷新操作并在稍后继续.
connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) : 当请求将 Channel 连接到远程节点时被调用.
disconnect(ChannelHandlerContext ctx, ChannelPromise promise) : 当请求将 Channel 从远程节点断开时被调用.
deregister(ChannelHandlerContext ctx, ChannelPromise promise) : 当请求将 Channel 从它的 EventLoop 注销时被调用.
read(ChannelHandlerContext ctx) : 当请求从 Channel 读取更多的数据时被调用.
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) : 当请求通过 Channel 将数据写到远程节点时被调用.
flush(ChannelHandlerContext ctx) : 当请求从 Channel 将入队数据冲刷到远程节点时被调用.
ChannelFuture 表示 Channel 中异步I/O操作的结果, 在 netty 中所有的 I/O 操作都是异步的, I/O 的调用会直接返回, 可以通过 ChannelFuture 来获取 I/O 操作的结果或者状态信息.
当 I/O 操作开始时, 将创建一个新对象. 新的对象是未完成的-它既没有成功, 也没有失败, 也没有被取消, 因为 I/O 操作还没有完成.
如果 I/O 操作已成功完成(失败或取消), 则对象将标记为已完成, 其中包含更具体的信息, 例如故障原因.
请注意, 即使失败和取消属于已完成状态.
ChannelPromise 是 ChannelFuture 的一个子接口, 其定义了一些可写的方法, 如 setSuccess() 和 setFailure() , 从而使 ChannelFuture 不可变.
当做了一个 I/O 操作并有任何后续任务的时候, 推荐优先使用 addListener(GenericFutureListener) 的方式来获得通知, 而非 await()
addListener(GenericFutureListener) 是非阻塞的. 它会把特定的 ChannelFutureListener 添加到 ChannelFuture 中, 然后 I/O 线程会在 I/O 操作相关的 future 完成的时候通知监听器.
ChannelFutureListener 会利于最佳的性能和资源的利用, 因为它一点阻塞都没有. 而且不会造成死锁.
ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 这两个适配器类分别提供了
ChannelInboundHandler 和 ChannelOutboundHandler 的基本实现, 它们继承了共同的父接口
ChannelHandler 的方法, 扩展了抽象类 ChannelHandlerAdapter .
ChannelHandlerAdapter 还提供了实用方法 isSharable() .
如果其对应的实现被标注为 Sharable , 那么这个方法将返回 true , 表示它可以被添加到多个 ChannelPipeline 中.
ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 中所提供的方法体调用了其相关联的 ChannelHandlerContext 上的等效方法, 从而将事件转发到了 ChannelPipeline 中的 ChannelHandler 中.
ChannelPipeline 将多个 ChannelHandler 链接在一起来让事件在其中传播处理. 一个 ChannelPipeline 中可能不仅有入站处理器, 还有出站处理器, 入站处理器只会处理入站的事件, 而出站处理器只会处理出站的数据.
每一个新创建的 Channel 都将会分配一个新的 ChannelPipeline , 不能附加另一个 ChannelPipeline , 也不能分离当前的.
通过调用 ChannelHandlerContext 实现, 它将被转发给同一个超类型的下一个 ChannelHandler .
从事件途径 ChannelPilpeline 的角度来看, ChannelPipeline 的头部和尾端取决于该事件是入站的还是出站的.
而 Netty 总是将 ChannelPilpeline 的入站口 (左侧) 作为头部, 将出站口 (右侧) 作为尾端.
当通过调用 ChannelPilpeline.add*() 方法将入站处理器和出站处理器混合添加到 ChannelPilpeline 之后, 每一个 ChannelHandler 从头部到尾端的顺序就是我们添加的顺序.
在 ChannelPilpeline 传播事件时, 它会测试 ChannelPilpeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配. 如果不匹配, ChannelPilpeline 将跳过该 ChannelHandler 并前进到下一个, 直到它找到和该事件期望的方向相匹配的为止.
这里指修改 ChannelPipeline 中的 ChannelHandler 的编排.
通过调用 ChannelPipeline 上的相关方法, ChannelHandler 可以添加, 删除或者替换其他的 ChannelHandler , 从而实时地修改 ChannelPipeline 的布局.
addFirst // 将 ChannelHandler 插入第一个位置 addBefore // 在某个 ChannelHandler 之前添加一个 addAfter // 在某个 ChannelHandler 之后添加一个 addLast // 将 ChannelHandler 插入最后一个位置 remove // 移除某个 ChannelHandler replace // 将某个 ChannelHandler 替换成指定 ChannelHandler
ChannelHandlerContext 代表了 ChanelHandler 和 ChannelPipeline 之间的关联, 每当有 ChanelHandler 添加到 ChannelPipeline 中, 都会创建 ChannelHandlerContext .
ChannelHandlerContext 的主要功能是管理它所关联的 ChannelPipeline 和同一个 ChannelPipeline 中的其他 ChanelHandler 之间的交互.
ChannelHandlerContext 有很多的方法, 其中一些方法也存在于 Channel 和 ChannelPipeline 上, 但是有一点重要的不同.
如果调用 Channel 和 ChannelPipeline 上的这些方法将沿着 ChannelPipeline 进行传播(从头或尾开始).
而调用位于 ChannelHandlerContext 上的相同方法, 则将从当前所关联的 ChannelHandler 开始, 并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler .
这样做可以减少 ChannelHandler 的调用开销.
上图为 Channel ChannelPipeline ChannelHandler 以及 ChannelHandlerContext 之间的关系.