转载

Netty 源码分析之 六 流水线处理器: Handler

本文是永顺大牛写的系列教程 《源码之下无秘密 ── 做最好的 Netty 源码分析教程》 的续写章节。本章主要介绍Netty中用来处理数据流的handler以及底层原理。

写在最前

永顺前辈已写完的章节有如下:

  • Netty 源码分析之 番外篇 Java NIO 的前生今世
  • Netty 源码分析之 零 磨刀不误砍柴工 源码分析环境搭建
  • Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端)
  • Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (服务器端)
  • Netty 源码分析之 二 贯穿 Netty 的大动脉 ── ChannelPipeline (一)
  • Netty 源码分析之 二 贯穿 Netty 的大动脉 ── ChannelPipeline (二)
  • Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(一)
  • Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(二)

笔者尝试续写的章节:

  • Netty 源码分析之 四 Promise 与 Future: 双子星的秘密
  • Netty 源码分析之 五 奔腾的血液: ByteBuf
  • Netty 源码分析之 六 流水线处理器: Handler

本文使用的netty版本为4.1.33.Final

回忆handler

链式结构

我们先来回忆下在 《Netty 源码分析之 二 贯穿 Netty 的大动脉 ── ChannelPipeline (一)》 提到过来的handler链式结构:

Netty 源码分析之 六 流水线处理器: Handler

  • handler分为实现ChannelInboundHandler接口的入站处理器和实现ChannelOutboundHandler接口的出站处理器;
  • handler由DefaultChannelHandlerContext进行包装,并组成一个双向链表;
  • 所有的入站操作从HeadContext出发,沿着链表经由每一个入站处理器处理后向TailContext方向传递;
  • 所有的出站操作从TailContext出发,沿着链表经由每一个出站处理器处理后向HeadContext方向传递;
  • 无论是入站操作抑或是出站操作的传递,都可以在handler中按照业务需求被中断或者改变传递方向。

本文的封面图使用了一张流水线卡通图,正因为这个链式结构非常类似于制造业里的流水线,handler就像是流水里的处理节点,而入站出站数据就如同流水线上被加工的产品。

初始化

一个常见的客户端初始化过程是这个样子的:

Bootstrap bootstrap = new Bootstrap();
        ChannelFuture future = bootstrap.group(new NioEventLoopGroup(10))
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 管道中添加基于换行符分割字符串的解析器
                        ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                        // 管道中添加字符串编码解码器
                        ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                        ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
                        // 管道中添加服务端处理逻辑
                        ch.pipeline().addLast(new MyClientEchoHandler());
                    }
                }).connect("127.0.0.1", 9898).sync();

    future.channel().closeFuture().sync();

在ChannelInitializer的实现方法中,调用ch.pipeline().addLast方法,不断地将handler追加到双向链表中(TailContext之前),从而形成上图所示的双向链表结构。

入站handler

入站handler都实现了ChannelInboundHandler接口:

public interface ChannelInboundHandler extends ChannelHandler {

    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    void channelActive(ChannelHandlerContext ctx) throws Exception;

    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

该接口内定义了常见的入站操作包括channelActive、channelRead、channelInactive等,还支持用户自定义入站操作userEventTriggered。

ChannelInboundHandlerAdapter是ChannelInboundHandler接口的一个默认实现,内部所有方法都是将入站操作往后传递,不作任何业务处理,如channelRead方法:

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

ChannelInboundHandlerAdapter并没有包含任何业务逻辑,用户的handler子类可以继承它,然后覆盖并实现其中的部分方法。下文要提到的 SimpleChannelInboundHandler 以及 ByteToMessageDecoder 正是其中两个案例。

出站handler

出站handler都实现了ChannelOutboundHandler接口,并提供常见的出站操作(bind、connect、close、write、flush等等):

public interface ChannelOutboundHandler extends ChannelHandler {

    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void read(ChannelHandlerContext ctx) throws Exception;

    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    void flush(ChannelHandlerContext ctx) throws Exception;
}

类似地,ChannelOutboundHandlerAdapter是该接口的一个默认实现,内部所有方法都是将出站操作往前传递,不作任何业务处理,如write方法:

@Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

同样地,子类可以继承ChannelOutboundHandlerAdapter,并覆盖实现其中的任何方法。MessageToByteEncoder和MessageToMessageEncoder这两个编码器是常见的实现子类。

解码器decoder

解码器是典型的入站处理器。解码器处理的入站数据结构一般是ByteBuf。

ByteToMessageDecoder

例如ByteToMessageDecoder,可以从ByteBuf这种字节流中读取数据,然后转换为其他形式的消息对象(也可以是ByteBuf)。

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            // 仅处理ByteBuf对象
            // 新建out列表,用于保存解码得到的对象列表
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 调用解码实现方法
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                // 调用fireChannelRead传递解码得到的对象列表out
                fireChannelRead(ctx, out, size);
                // 回收对象
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                // ...省略部分代码...
                decodeRemovalReentryProtection(ctx, in, out);
                // ...省略部分代码...
            }
        } catch (DecoderException e) {
             throw e;
        } catch (Exception cause) {
             throw new DecoderException(cause);
        }
     }

     final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            decode(ctx, in, out);
        } finally {
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {
                handlerRemoved(ctx);
            }
        }
    }

可见callDecode内部持续循环消费字节流,然后底层调用了子类实现的抽象方法decode进行解码。其中,常见的实现类有LineBasedFrameDecoder。

LineBasedFrameDecoder

LineBasedFrameDecoder实现了根据一个ByteBuf以换行符分割为多个ByteBuf的功能,核心实现如下:

@Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 解码得到的对象都放out列表中
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        final int eol = findEndOfLine(buffer);
        if (!discarding) {
            if (eol >= 0) {
                // 如果找到换行符
                final ByteBuf frame;
                // 计算当前帧的长度以及分隔符长度
                final int length = eol - buffer.readerIndex();
                final int delimLength = buffer.getByte(eol) == '/r'? 2 : 1;
    
                if (length > maxLength) {
                    // 如果该帧长度大于最大长度,则抛异常
                    buffer.readerIndex(eol + delimLength);
                    fail(ctx, length);
                    return null;
                }

                if (stripDelimiter) {
                    // frame去掉分隔符
                    frame = buffer.readRetainedSlice(length);
                    buffer.skipBytes(delimLength);
                } else {
                    // frame包含分隔符
                    frame = buffer.readRetainedSlice(length + delimLength);
                }

                return frame;
            } else {
                final int length = buffer.readableBytes();
                if (length > maxLength) {
                    // 如果没有换行符,而且该帧长度大于最大长度
                    // 则标记discarding为true,且丢弃所有可读数据
                    discardedBytes = length;
                    buffer.readerIndex(buffer.writerIndex());
                    discarding = true;
                    offset = 0;
                    if (failFast) {
                        fail(ctx, "over " + discardedBytes);
                    }
                }
                return null;
            }
        } else {
            if (eol >= 0) {
                // 如果有换行符,丢弃换行符前的所有可读数据
                final int length = discardedBytes + eol - buffer.readerIndex();
                final int delimLength = buffer.getByte(eol) == '/r'? 2 : 1;
                buffer.readerIndex(eol + delimLength);
                discardedBytes = 0;
                discarding = false;
                if (!failFast) {
                    fail(ctx, length);
                }
            } else {
                // 如果没有换行符,丢弃所有可读数据
                discardedBytes += buffer.readableBytes();
                buffer.readerIndex(buffer.writerIndex());
                // We skip everything in the buffer, we need to set the offset to 0 again.
                offset = 0;
            }
            return null;
        }
    }

     private int findEndOfLine(final ByteBuf buffer) {
        int totalLength = buffer.readableBytes();
        // 找到换行符/n所在的下标
        int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF);
        if (i >= 0) {
            offset = 0;
            // 某些系统以/r/n作为换行符,这里修改下标为/r的下标
            if (i > 0 && buffer.getByte(i - 1) == '/r') {
                i--;
            }
        } else {
            offset = totalLength;
        }
        return i;
    }

MessageToMessageDecoder

上一小节的ByteToMessageDecoder实现了从ByteBuf到消息对象的解码转换,而MessageToMessageDecoder可以实现消息之间的解码转换。核心实现如下:

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            // 检查msg是否满足指定的模板类型I
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    // 调用decode抽象方法
                    decode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }
            } else {
                out.add(msg);
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            int size = out.size();
            for (int i = 0; i < size; i ++) {
                ctx.fireChannelRead(out.getUnsafe(i));
            }
            out.recycle();
        }
    }

    /**
     * Decode from one message to an other. This method will be called for each written message that can be handled
     * by this decoder.
     *
     * @param ctx           the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} belongs to
     * @param msg           the message to decode to an other one
     * @param out           the {@link List} to which decoded messages should be added
     * @throws Exception    is thrown if an error occurs
     */
    protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

MessageToMessageDecoder里的业务非常少,核心的解码转换逻辑还需要子类去实现。常用的实现类有StringDecoder。

StringDecoder

StringDecoder非常简单,输入模板类型ByteBuf,然后转换为String。核心解码转换方法如下:

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
    out.add(msg.toString(charset));
}

编码器encoder

编码器是典型的出站处理器。同时,也分为MessageToMessageEncoder和MessageToByteEncoder两种。其功能和实现刚好是解码器的逆过程,所以这里不再详细分析,不然本文就沦为水文一篇了。

此外,类似地,StringEncoder也是StringDecoder的逆过程,实现也非常简单此处不作赘言。

SimpleChannelInboundHandler

用户自定义handler的时候最常用到的父类是SimpleChannelInboundHandler。相比ChannelInboundHandlerAdapter,它为用户做了消息对象的数据类型强制转换,方便数据处理,并且确保消息对象被释放掉。核心实现如下:

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                // 类型强制转换
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

    protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;

用户需要实现channelRead0方法,自定义业务逻辑。

总结

最后以一张类图温习本文:

Netty 源码分析之 六 流水线处理器: Handler

原文  https://segmentfault.com/a/1190000023262670
正文到此结束
Loading...