ChannelPipeline是Channel的负责组织ChannelHandler的组件,如上图所示,想象远端为上方,最上面为head,近端为我们的程序,最下面为tail。一个inbound事件,通常为读到的消息、用户自定义事件等会从上而下经过各个ChannelInboundHandler。而Outbound事件通常为write消息等,会经过ChannelOutboundHandler处理。
如果要在程序中发起一个事件,可以通过ChannelHandlerContext,ChannelHandlerContext的方法和Channel方法的区别是ChannelHandlerContext的事件会传递给下一个ChannelHandler来处理,而Channel发出的事件会从头ChannelHandler(head或tail)开始处理。ChannelPipeline类似Servlet中的Filter,或其他的Interceptor模式。
Inbound事件传播方法:
Outbound事件传播方法包括
ChannelPipeline上的ChannelHandler通常分为以下几类
ChannelPipeline最常用的方法就是在pipeline最后添加ChannelHandler了
ChannelPipeline addLast(ChannelHandler... handlers);
除此之外,pipeline是线程安全的,还能动态地添加删除ChannelHandler。
另外pipeline也包括了firestChannelxxx方法
@Override
ChannelPipelinefireChannelRegistered();
@Override
ChannelPipelinefireChannelUnregistered();
@Override
ChannelPipelinefireChannelActive();
@Override
ChannelPipelinefireChannelInactive();
@Override
ChannelPipelinefireExceptionCaught(Throwable cause);
@Override
ChannelPipelinefireUserEventTriggered(Object event);
@Override
ChannelPipelinefireChannelRead(Object msg);
@Override
ChannelPipelinefireChannelReadComplete();
@Override
ChannelPipelinefireChannelWritabilityChanged();
@Override
ChannelPipelineflush();
很自然的我们可以想到使用双向链表来实现pipeline。
DefaultChannelPipeline中包含了两个特殊的ChannelHandler, head和tail, 实现类分别是HeadContext和TailContext,分别作为队列的头和尾。
两个节点在ChannelPipeline创建的时候被设置。
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
protected DefaultChannelPipeline(Channel channel){
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
HeadContext主要负责把Outbound相关事件交给AbstractChannel.Unsafe来处理,如bind、write等。
final class HeadContextextends AbstractChannelHandlerContext
implements ChannelOutboundHandler,ChannelInboundHandler{
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
@Override
public ChannelHandler handler(){
return this;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx)throws Exception {
// NOOP
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx)throws Exception {
// NOOP
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise)throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
unsafe.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
unsafe.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx){
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Excep
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx)throws Exception {
unsafe.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
ctx.fireExceptionCaught(cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx)throws Exception {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx)throws Exception {
ctx.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx)throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
@Override
public void channelInactive(ChannelHandlerContext ctx)throws Exception {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
private void readIfIsAutoRead(){
if (channel.config().isAutoRead()) {
channel.read();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx)throws Exception {
ctx.fireChannelWritabilityChanged();
}
}
TailContext的作用主要是最终给一些消息ReferenceCount减一、打印前面没有捕获的异常等。
final class TailContextextends AbstractChannelHandlerContextimplements ChannelInboundHandler{
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
@Override
public ChannelHandler handler(){
return this;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx)throws Exception { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx)throws Exception { }
@Override
public void channelActive(ChannelHandlerContext ctx)throws Exception {
onUnhandledInboundChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx)throws Exception {
onUnhandledInboundChannelInactive();
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx)throws Exception {
onUnhandledChannelWritabilityChanged();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx)throws Exception { }
@Override
public void handlerRemoved(ChannelHandlerContext ctx)throws Exception { }
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
onUnhandledInboundUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
onUnhandledInboundException(cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
onUnhandledInboundMessage(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)throws Exception {
onUnhandledInboundChannelReadComplete();
}
}
addLast将一个ChannelHandler放在pipeline最后,内部实现是在tail之前。有可选参数executor和name。
executor表示执行ChannelHandler处理的线程池,如果没有设置或传入null则使用对应channel的eventLoop。
通常如果有耗时很大的处理,则会自定义一个线程池来执行,避免阻塞eventLoop导致不能及时处理IO事件。
name可以给这个ChannelPipeline上的Handler定义一个名字,方便之后replace等操作,如果没有传入则会自动生成一个。
addLast首先给this加锁,来保证线程安全,因为其中的队列指针操作有很多步骤。
这里还有一个细节,就是Channel在执行addLast的时候可能还没有完成register,如果此时回调handlerAdded则会
导致顺序问题先发生了added再register。所以这里判断是否已经注册,如果没有则先放到一个队列中,等注册完成后再执行。
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
创建ChannelHandlerContext
private AbstractChannelHandlerContextnewContext(EventExecutorGroup group, String name, ChannelHandlerhandler){
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
final class DefaultChannelHandlerContextextends AbstractChannelHandlerContext{
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
@Override
public ChannelHandlerhandler(){
return handler;
}
...
}
链表操作,放到队尾
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
设置ChannelHandlerContext已经添加、回调ChannelHandler的handlerAdded
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx){
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
...
}
ChannelPipeline是保存Channel上的ChannelHandler的组件,内部是双向链表结构,我们看到节点保存的是
ChannelHandlerContext,而ChannelHandlerContext又是通过ChannelPipeline和ChannelHandler构造出来的。