转载

Netty 学习系列五:write&flush

一、ChannelOutboundBuffer

1、定义

是AbstractUnsafe使用的数据结构,用来存储待发送的数据。

在channel.unsafe实例化时,ChannelOutboundBuffer一起被初始化。每个channel都有一个自己的ChannelOutboundBuffer。

2、ChannelOutboundBuffer中的field

Channel channel --> 所绑定的Channel

Entry flushedEntry --> 表示下一个要被flush的Entry

Entry unflushedEntry --> 表示下一次要flush截止的Entry

Entry tailEntry

int flushed

int nioBufferCount

int nioBufferSize

long totalPendingSize --> 已存储的需要被write到socket发送缓存中的byte大小

int unwritable --> 表示当前channel的待发送缓存是否可以继续写入数据

ChannelOutboundBuffer中维护了节点元素为Entry的单向链表。

Entry为待发送数据的抽象,实际待发送数据保存在Entry的Object msg中。

二、write的过程

1、入口

AbstractChannel、DefaultChannelPipeline或AbstractChannelHandlerContext提供的write(Object msg)方法,最终都会由HeadContext.write方法执行,最终交由AbstractUnsafe.write(Object msg,ChannelPromise promise)实现。

public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }

            int size;
            try {
                msg = filterOutboundMessage(msg);
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }

            outboundBuffer.addMessage(msg, size, promise);
        }

2、取得ChannelOutboundBuffer

ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

3、对要进行write操作的数据msg,如果是ByteBuf类型则统一转换为DirectByteBuf实现

msg = filterOutboundMessage(msg);

内部调用AbstractNioByteChannel.filterOutboundMessage(Object msg)方法,如果msg是ByteBuf类型,则将其转换为DirectByteBuf的实现。(调用ByteBufAllocator.directBuffer(initialCapacity)分配一块直接缓存空间并将原msg中的字节流放入)

4、 计算msg的大小

int size = pipeline.estimatorHandle().size(msg);

对于ByteBuf类型的msg,直接调用readableBytes()方法。

5、将ByteBuf msg存入ChannelOutboundBuffer中

outboundBuffer.addMessage(msg, size, promise);

1)将msg封装成Entry对象,并放入单向链表的尾部tailEntry

Entry entry = Entry.newInstance(msg, size, total(msg), promise);
tailEntry = entry;

netty使用基于thread-local的轻量级对象池Recycler对Entry进行回收,避免多次实例化的垃圾回收和开销。

2)更新ChannelOutboundBuffer的totalPendingSize,累加上本次新增的大小

incrementPendingOutboundBytes(size, false);

若totalPendingSize超过了channel的高水位线:

-将unwritable状态更新为不可写;

-执行pipeline.fileChannelWritabilityChanged();

三、flush过程

1、入口

AbstractChannel、DefaultChannelPipeline或AbstractChannelHandlerContext提供的flush()方法,都会由HeadContext.flush(ChannelHandlerContext ctx)方法执行,最终交由AbstractUnsafe.flush()实现。

public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }

            outboundBuffer.addFlush();
            flush0();
        }

2、更新ChannelOutboundBuffer中本次将要flush的Entry区间

outboundBuffer.addFlush();

将ChannelOutboundBuffer的unflushedEntry向后不断移动到tailEntry,操作结束后本次要flush的链表区间就是flushedEntry->unflushedEntry。

3、检查ChannelOutboundBuffer是否有待flush的数据,如果没有则直接返回,终止flush过程

4、将ChannelOutboundBuffer的要flush的链表区间数据写入TCP发送缓冲区

NioByteUnsafe.doWrite(ChannelOutboundBuffer in)

对Entry链表区间中的每个Entry.msg(即ByteBuf)执行以下逻辑--->

1)如果当前flushedEntry为空,则将OP_WRITE事件从对应Channel的interestOp中移除,跳出遍历直接到步骤5)

if (msg == null) {
                // Wrote all messages.
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }

2)采用类似自旋锁的逻辑不断调用NioSocketChannel.doWriteBytes(ByteBuf buf),将Entry.msg(即ByteBuf)中的数据写入套接字的发送缓冲区。

  • 内部调用ByteBuf.readBytes(Channel out, int length)接口;
  • 实际底层最终调用nio.SocketChannel.write(ByteBuffer src);
  • 如果自旋过程中出现nio.SocketChannel.write(ByteBuffer src)返回结果为0,说明此时TCP发送缓冲队列已满,则退出自旋write并将OP_WRITE添加到ch.selectionKey.interestOps中,等待TCP发送缓冲队列可写时重新出发write操作;
  • 如果Entry的自旋write达到一定次数还没有将Entry中的数据写完,则直接跳出链表遍历操作,执行最后的incompleteWrite;

3)构造ChannelPromise通知当前已write的数据进度

in.progress(flushedAmount);

4)如果当前flushedEntry中的数据已写完,将Entry从ChannelOutboundBuffer中清理回收

in.remove();
  • 将当前entry从Entry链表中删除;
  • 从totalPendingSize中减去entry已write出去的字节数;
  • 若totalPendingSize小于了channel的低水位线,将unwritable状态更新为可写,并调用pipeline.fileChannelWritabilityChanged()产生ChannelWritabilityChanged事件。

<---对Entry链表区间中的每个Entry.msg(即ByteBuf)执行以上逻辑

5)根据当前socket的可写状态,进行后续操作

incompleteWrite(setOpWrite);
  • 若当前TCP发送缓冲区已满,则将OP_WRITE添加到ch.selectionKey.interestOps中,等待TCP发送缓冲队列可写时重新触发write操作;

  • 若当前TCP发送缓冲区未满,构造一个flush()事件,等待EventLoop的下一个循环重新检测ChannelOutboundBuffer中有无待flush的数据。

原文  https://xiaozhuanlan.com/topic/7049651328
正文到此结束
Loading...