前面我们分析了ByteToMessageDecoder解码器,对应于解码需要有相关的编码器,即从Java对象转换成二进制数据的组件,这就是MessageToByteEncoder。
下面的StringToByteEncoder中,会将String对象转换成ByteBuf,传递给后面的ChannelOutboundHandler。使用MessageToByteEncoder,只需要实现encode这个方法即可,即通过ChannleHandlerContext和接收到的对应类型的对象,写入到一个ByteBuf中
class StringToByteEncoderextends MessageToByteEncoder<String>{
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {
byteBuf.writeBytes(s.getBytes());
}
}
,因为它的接收和输出都是ByteBuf类型。
public class LengthFieldPrependerextends MessageToMessageEncoder<ByteBuf>{
private final ByteOrder byteOrder;
private final int lengthFieldLength;
private final boolean lengthIncludesLengthFieldLength;
private final int lengthAdjustment;
public LengthFieldPrepender(
ByteOrder byteOrder,int lengthFieldLength,
int lengthAdjustment, boolean lengthIncludesLengthFieldLength){
// 只支持1,2,3,4,8字节长的length字段编码
if (lengthFieldLength != 1 && lengthFieldLength != 2 &&
lengthFieldLength != 3 && lengthFieldLength != 4 &&
lengthFieldLength != 8) {
throw new IllegalArgumentException(
"lengthFieldLength must be either 1, 2, 3, 4, or 8: " +
lengthFieldLength);
}
ObjectUtil.checkNotNull(byteOrder, "byteOrder");
this.byteOrder = byteOrder;
this.lengthFieldLength = lengthFieldLength;
this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength;
this.lengthAdjustment = lengthAdjustment;
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)throws Exception {
int length = msg.readableBytes() + lengthAdjustment;
if (lengthIncludesLengthFieldLength) {
length += lengthFieldLength;
}
if (length < 0) {
throw new IllegalArgumentException(
"Adjusted frame length (" + length + ") is less than zero");
}
switch (lengthFieldLength) {
case 1:
if (length >= 256) {
throw new IllegalArgumentException(
"length does not fit into a byte: " + length);
}
out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length));
break;
... 中间省略
case 8:
out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length));
break;
default:
throw new Error("should not reach here");
}
out.add(msg.retain());
}
}
MessageToByteEncoder同样是过滤出某一种类型的对象,所以也使用了TypeParameterMatcher,其中还有一个参数可以控制是否优先使用DirectMemory堆外直接内存,默认是true使用,如果当前classpath中有 sun.misc.Unsafe 类则使用其获取DirectMemory否则降级为heapMemory。
public abstract class MessageToByteEncoder<I>extends ChannelOutboundHandlerAdapter{
private final TypeParameterMatcher matcher;
private final boolean preferDirect;
// 创建一个满足当前参数类型 I的Encoder,preferDirect表示是否优先使用DirectMemory
protected MessageToByteEncoder(boolean preferDirect) {
matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
this.preferDirect = preferDirect;
}
作为Encoder,需要处理的是write事件。
write逻辑为
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Exception {
ByteBuf buf = null;
try {
// 判断对象类型
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
// 申请ByteBuf
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 编码写入ByteBuf中
encode(ctx, cast, buf);
} finally {
// release msg对象
ReferenceCountUtil.release(cast);
}
// 如果ByteBuf不空则ctx write
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
// 否则release byteBuf然后写入一个空BUFFER
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
// 帮助GC回收
buf = null;
} else {
// 其他类型的对象不处理交给后面的处理
ctx.write(msg, promise);
}
} catch (EncoderException e) {
// encode方法的异常抛给上层处理
throw e;
} catch (Throwable e) {
// 其他包装成EncoderException的异常抛给上层处理
throw new EncoderException(e);
} finally {
// 防止encode方法异常导致申请的ByteBuf没有释放
if (buf != null) {
buf.release();
}
}
}
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
boolean preferDirect) throws Exception {
if (preferDirect) {
// 这里会判断是否有Unsafe类来决定使用DirectMemory还是HeapMemory
return ctx.alloc().ioBuffer();
} else {
return ctx.alloc().heapBuffer();
}
}
// 留给子类去实现的具体编码方法
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out)throws Exception;