转载

Netty源码分析-7-MessageToByteEncoder

前面我们分析了ByteToMessageDecoder解码器,对应于解码需要有相关的编码器,即从Java对象转换成二进制数据的组件,这就是MessageToByteEncoder。

Netty源码分析-7-MessageToByteEncoder

基本使用

下面的StringToByteEncoder中,会将String对象转换成ByteBuf,传递给后面的ChannelOutboundHandler。使用MessageToByteEncoder,只需要实现encode这个方法即可,即通过ChannleHandlerContext和接收到的对应类型的对象,写入到一个ByteBuf中

class StringToByteEncoderextends MessageToByteEncoder<String>{

        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {
            byteBuf.writeBytes(s.getBytes());
        }
}
值得注意的是MessageToByteEncoder 的I可以是ByteBuf,即可以从一种格式的二进制转换为另一种格式的二进制。下面的LengthFieldPrepender是LengthFieldBasedFrameDecoder解码器对应的编码器,负责将已经编码好的ByteBuf按照消息长度在前面追加length字段。虽然继承于MessageToMessageEncoder 但其实也是MessageToByteEncoder

,因为它的接收和输出都是ByteBuf类型。

public class LengthFieldPrependerextends MessageToMessageEncoder<ByteBuf>{

    private final ByteOrder byteOrder;
    private final int lengthFieldLength;
    private final boolean lengthIncludesLengthFieldLength;
    private final int lengthAdjustment;

    public LengthFieldPrepender(
ByteOrder byteOrder,int lengthFieldLength,
int lengthAdjustment, boolean lengthIncludesLengthFieldLength){
        // 只支持1,2,3,4,8字节长的length字段编码
        if (lengthFieldLength != 1 && lengthFieldLength != 2 &&
            lengthFieldLength != 3 && lengthFieldLength != 4 &&
            lengthFieldLength != 8) {
            throw new IllegalArgumentException(
                    "lengthFieldLength must be either 1, 2, 3, 4, or 8: " +
                    lengthFieldLength);
        }
        ObjectUtil.checkNotNull(byteOrder, "byteOrder");
        this.byteOrder = byteOrder;
        this.lengthFieldLength = lengthFieldLength;
        this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength;
        this.lengthAdjustment = lengthAdjustment;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)throws Exception {
        int length = msg.readableBytes() + lengthAdjustment;
        if (lengthIncludesLengthFieldLength) {
            length += lengthFieldLength;
        }

        if (length < 0) {
            throw new IllegalArgumentException(
                    "Adjusted frame length (" + length + ") is less than zero");
        }

        switch (lengthFieldLength) {
        case 1:
            if (length >= 256) {
                throw new IllegalArgumentException(
                        "length does not fit into a byte: " + length);
            }
            out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length));
            break;
        ... 中间省略
        case 8:
            out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length));
            break;
        default:
            throw new Error("should not reach here");
        }
        out.add(msg.retain());
    }
}

实现分析

MessageToByteEncoder同样是过滤出某一种类型的对象,所以也使用了TypeParameterMatcher,其中还有一个参数可以控制是否优先使用DirectMemory堆外直接内存,默认是true使用,如果当前classpath中有 sun.misc.Unsafe 类则使用其获取DirectMemory否则降级为heapMemory。

public abstract class MessageToByteEncoder<I>extends ChannelOutboundHandlerAdapter{
    private final TypeParameterMatcher matcher;
    private final boolean preferDirect;

    // 创建一个满足当前参数类型 I的Encoder,preferDirect表示是否优先使用DirectMemory
    protected MessageToByteEncoder(boolean preferDirect) {
        matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
        this.preferDirect = preferDirect;
    }

作为Encoder,需要处理的是write事件。

write逻辑为

  1. 判断是否为符合当前类型的对象
  2. 根据当前对象和是否使用DirectMemory来申请一个ByteBuf
  3. 调用需要子类override的encoder方法,将需要编码的msg对象encoder到ByteBuf中,然后release msg对象。
  4. 调用ChannelContextWrite.write方法,交给下一个ChannelHandlerContext处理这个ByteBuf
  5. 如果不符合1, 则直接交给下一个ChannelHandlerContext
    @Override
       public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Exception {
           ByteBuf buf = null;
           try {
               // 判断对象类型
               if (acceptOutboundMessage(msg)) {
                   @SuppressWarnings("unchecked")
                   I cast = (I) msg;
                   // 申请ByteBuf
                   buf = allocateBuffer(ctx, cast, preferDirect);
                   try {
                       // 编码写入ByteBuf中
                       encode(ctx, cast, buf);
                   } finally {
                       // release msg对象
                       ReferenceCountUtil.release(cast);
                   }
                   // 如果ByteBuf不空则ctx write
                   if (buf.isReadable()) {
                       ctx.write(buf, promise);
                   } else {
                       // 否则release byteBuf然后写入一个空BUFFER
                       buf.release();
                       ctx.write(Unpooled.EMPTY_BUFFER, promise);
                   }
                   // 帮助GC回收
                   buf = null;
               } else {
                   // 其他类型的对象不处理交给后面的处理
                   ctx.write(msg, promise);
               }
           } catch (EncoderException e) {
               // encode方法的异常抛给上层处理
               throw e;
           } catch (Throwable e) {
               // 其他包装成EncoderException的异常抛给上层处理
               throw new EncoderException(e);
           } finally {
               // 防止encode方法异常导致申请的ByteBuf没有释放
               if (buf != null) {
                   buf.release();
               }
           }
       }
    
       protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                                  boolean preferDirect) throws Exception {
           if (preferDirect) {
               // 这里会判断是否有Unsafe类来决定使用DirectMemory还是HeapMemory
               return ctx.alloc().ioBuffer();
           } else {
               return ctx.alloc().heapBuffer();
           }
       }
       // 留给子类去实现的具体编码方法
       protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out)throws Exception;
    
原文  https://liuzhengyang.github.io/2018/08/03/netty-7-messagetobyteencoder/
正文到此结束
Loading...