【Netty 专栏】深入浅出 Netty write

点击上方“芋道源码”,选择“置顶公众号”

技术文章第一时间送达!

源码精品专栏

 



摘要: 原创出处 https://www.jianshu.com/p/1ad424c53e80 「占小狼」欢迎转载,保留摘要,谢谢!


上一章节中,分析了Netty如何处理read事件,本节分析Netty如何把数据写会客户端。

把数据返回客户端,需要经历三个步骤:
1、申请一块缓存buf,写入数据。
2、将buf保存到ChannelOutboundBuffer中。
3、将ChannelOutboundBuffer中的buff输出到socketChannel中。

  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      ReferenceCountUtil.release(msg);

      ByteBuf buf1 = ctx.alloc().buffer(4);
      buf1.writeInt(1);

      ByteBuf buf2 = ctx.alloc().buffer(4);
      buf2.writeInt(2);

      ByteBuf buf3 = ctx.alloc().buffer(4);
      buf3.writeInt(3);

      ctx.write(buf1);
      ctx.write(buf2);
      ctx.write(buf3);
      ctx.flush();
  }

为什么需要把buf保存到ChannelOutboundBuffer?

ctx.write()实现:

//AbstractChannelHandlerContext.java
public ChannelFuture write(Object msg) {
  return write(msg, newPromise());
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeWrite(msg, promise);
        if (flush) {
            next.invokeFlush();
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, msg, promise);
        }  else {
            task = WriteTask.newInstance(next, msg, promise);
        }
        safeExecute(executor, task, promise, msg);
    }
}

默认情况下,findContextOutbound()会找到pipeline的head节点,触发write方法。

//HeadContext.java
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

//AbstractUnsafe
public final void write(Object msg, ChannelPromise promise) {
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
        ReferenceCountUtil.release(msg);
        return;
    }

    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }

    outboundBuffer.addMessage(msg, size, promise);
}

outboundBuffer 随着Unsafe一起实例化,最终将msg通过outboundBuffer封装起来。

ChannelOutboundBuffer内部维护了一个Entry链表,并使用Entry封装msg。
1、unflushedEntry:指向链表头部
2、tailEntry:指向链表尾部
3、totalPendingSize:保存msg的字节数
4、unwritable:不可写标识

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    incrementPendingOutboundBytes(size, false);
}

通过Entry.newInstance返回Entry实例,Netty对Entry采用了缓存策略,使用完的Entry实例需要清空并回收,难道是因为Entry实例化比较耗时?

新的entry默认插入链表尾部,并让tailEntry指向它。

【Netty 专栏】深入浅出 Netty write
img

Paste_Image.png

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}

方法incrementPendingOutboundBytes主要采用CAS更新totalPendingSize字段,并判断当前totalPendingSize是否超过阈值writeBufferHighWaterMark,默认是65536。如果totalPendingSize >= 65536,则采用CAS更新unwritable为1,并触发ChannelWritabilityChanged事件。

到此为止,全部的buf数据已经保存在outboundBuffer中。

ctx.flush()实现:

public ChannelHandlerContext flush() {
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeFlush();
    } else {
        Runnable task = next.invokeFlushTask;
        if (task == null) {
            next.invokeFlushTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeFlush();
                }
            };
        }
        safeExecute(executor, task, channel().voidPromise(), null);
    }

    return this;
}

默认情况下,findContextOutbound()会找到pipeline的head节点,触发flush方法。

//HeadContext.java
public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}

//AbstractUnsafe
public final void flush() {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    outboundBuffer.addFlush();
    flush0();
}

方法addFlush主要对write过程添加的msg进行flush标识,其实我不清楚,这个标识过程有什么意义。

直接看flush0方法:

protected final void flush0() {
    // Flush immediately only when there's no pending flush.
    // If there's a pending flush operation, event loop will call forceFlush() later,
    // and thus there's no need to call it now.
    if (isFlushPending()) {
        return;
    }
    super.flush0();
}

private boolean isFlushPending() {
    SelectionKey selectionKey = selectionKey();
    return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}

1、如果当前selectionKey 是写事件,说明有线程执行flush过程,则直接返回。
2、否则直接执行flush操作。

protected void flush0() {
    if (inFlush0) {
        // Avoid re-entrance
        return;
    }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }

    inFlush0 = true;

    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) {
        try {
            if (isOpen()) {
                outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true);
            } else {
                // Do not trigger channelWritabilityChanged because the channel is closed already.
                outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }

    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        if (t instanceof IOException && config().isAutoClose()) {
            /**
             * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
             * failing all flushed messages and also ensure the actual close of the underlying transport
             * will happen before the promises are notified.
             *
             * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
             * may still return {@code true} even if the channel should be closed as result of the exception.
             */

            close(voidPromise(), t, false);
        } else {
            outboundBuffer.failFlushed(t, true);
        }
    } finally {
        inFlush0 = false;
    }
}

public boolean isActive() {
    SocketChannel ch = javaChannel();
    return ch.isOpen() && ch.isConnected();
}

1、如果当前socketChannel已经关闭,或断开连接,则执行失败操作。
2、否则执行doWrite把数据写入到socketChannel。

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        int size = in.size();
        if (size == 0) {
            // All written so clear OP_WRITE
            clearOpWrite();
            break;
        }
        long writtenBytes = 0;
        boolean done = false;
        boolean setOpWrite = false;

        // Ensure the pending writes are made of ByteBufs only.
        ByteBuffer[] nioBuffers = in.nioBuffers();
        int nioBufferCnt = in.nioBufferCount();
        long expectedWrittenBytes = in.nioBufferSize();
        SocketChannel ch = javaChannel();

        // Always us nioBuffers() to workaround data-corruption.
        // See https://github.com/netty/netty/issues/2761
        switch (nioBufferCnt) {
            case 0:
                // We have something else beside ByteBuffers to write so fallback to normal writes.
                super.doWrite(in);
                return;
            case 1:
                // Only one ByteBuf so use non-gathering write
                ByteBuffer nioBuffer = nioBuffers[0];
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                    final int localWrittenBytes = ch.write(nioBuffer);
                    if (localWrittenBytes == 0) {
                        setOpWrite = true;
                        break;
                    }
                    expectedWrittenBytes -= localWrittenBytes;
                    writtenBytes += localWrittenBytes;
                    if (expectedWrittenBytes == 0) {
                        done = true;
                        break;
                    }
                }
                break;
            default:
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes == 0) {
                        setOpWrite = true;
                        break;
                    }
                    expectedWrittenBytes -= localWrittenBytes;
                    writtenBytes += localWrittenBytes;
                    if (expectedWrittenBytes == 0) {
                        done = true;
                        break;
                    }
                }
                break;
        }

        // Release the fully written buffers, and update the indexes of the partially written buffer.
        in.removeBytes(writtenBytes);

        if (!done) {
            // Did not write all buffers completely.
            incompleteWrite(setOpWrite);
            break;
        }
    }
}

1、size方法返回outboundBuffer有多少Entry实例。
2、in.nioBuffers()负责把Entry中保存的ByteBuf类型的msg,重新返回Nio的ByteBuffer实例,并返回ByteBuffer数组nioBuffers,其实msg和ByteBuffer实例指向的是同一块内存,因为在UnpooledDirectByteBuf实现类中,已经维护了ByteBuffer的实例。
3、socketChannel.write()方法把nioBuffers的数据写到socket中,这是Nio中的实现。

到此为止,nioBuffers的数据都flush到socket,客户端可以准备接收了。

666. 彩蛋




如果你对 Dubbo 感兴趣,欢迎加入我的知识星球一起交流。

【Netty 专栏】深入浅出 Netty write

知识星球



目前在知识星球(https://t.zsxq.com/2VbiaEu)更新了如下 Dubbo 源码解析如下:

01. 调试环境搭建
02. 项目结构一览
03. 配置 Configuration
04. 核心流程一览

05. 拓展机制 SPI

06. 线程池

07. 服务暴露 Export

08. 服务引用 Refer

09. 注册中心 Registry

10. 动态编译 Compile

11. 动态代理 Proxy

12. 服务调用 Invoke

13. 调用特性 

14. 过滤器 Filter

15. NIO 服务器

16. P2P 服务器

17. HTTP 服务器

18. 序列化 Serialization

19. 集群容错 Cluster

20. 优雅停机

21. 日志适配

22. 状态检查

23. 监控中心 Monitor

24. 管理中心 Admin

25. 运维命令 QOS

26. 链路追踪 Tracing

...
一共 60 篇++

发表评论