转载

Java编程方法论-Spring WebFlux篇 Reactor-Netty下HttpServer 的封装

本系列为本人Java编程方法论 响应式解读系列的 Webflux 部分,现分享出来,前置知识Rxjava2 ,Reactor的相关解读已经录制分享视频,并发布在b站,地址如下:

Rxjava源码解读与分享: www.bilibili.com/video/av345…

Reactor源码解读与分享: www.bilibili.com/video/av353…

NIO源码解读相关视频分享: www.bilibili.com/video/av432…

NIO源码解读视频相关配套文章:

BIO到NIO源码的一些事儿之BIO

BIO到NIO源码的一些事儿之NIO 上

BIO到NIO源码的一些事儿之NIO 中

BIO到NIO源码的一些事儿之NIO 下 之 Selector

BIO到NIO源码的一些事儿之NIO 下 Buffer解读 上

BIO到NIO源码的一些事儿之NIO 下 Buffer解读 下

Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 上

Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 下

其中,Rxjava与Reactor作为本人书中内容将不对外开放,大家感兴趣可以花点时间来观看视频,本人对着两个库进行了全面彻底细致的解读,包括其中的设计理念和相关的方法论,也希望大家可以留言纠正我其中的错误。

HttpServer 的封装

本书主要针对 Netty 服务器来讲,所以读者应具备有关 Netty 的基本知识和应用技能。接下来,我们将对 Reactor-netty 从设计到实现的细节一一探究,让大家真的从中学习到好的封装设计理念。本书在写时所参考的最新版本是 Reactor-netty 0.7.8.Release 这个版本,但现在已有 0.8 版本,而且 0.70.8 版本在源码细节有不小的变动,这点给大家提醒下。我会针对 0.8 版本进行全新的解读。

HttpServer 的引入

我们由上一章可知Tomcat使用 Connector 来接收和响应连接请求,这里,对于 Netty 来讲,如果我们想让其做为一个 web 服务器,我们先来看一个 Netty 常见的一个用法(这里摘自官方文档一个例子 DiscardServer Demo ):

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 丢弃任何进入的数据
 */
public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 等待服务器  socket 关闭 。
            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
复制代码
  1. NioEventLoopGroup 是用来处理 I/O 操作的多线程事件循环器, Netty 提供了许多不同的 EventLoopGroup 的实现用来处理不同的传输。在这个例子中我们实现了一个服务端的应用,因此会有2个 NioEventLoopGroup 会被使用。第一个经常被叫做 BossGroup ,用来接收进来的连接。第二个经常被叫做 WorkerGroup ,用来处理已经被接收的连接,一旦 BossGroup 接收到连接,就会把连接信息注册到 WorkerGroup 上。如何知道多少个线程已经被使用,如何映射到已经创建的 Channel 上都需要依赖于 EventLoopGroup 的实现,并且可以通过构造函数来配置他们的关系。
  2. ServerBootstrap 是一个启动 NIO 服务的辅助启动类。你可以在这个服务中直接使用 Channel ,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。
  3. 这里我们通过指定使用 NioServerSocketChannel 来举例说明一个新的 Channel 如何接收传进来的连接。
  4. 这里的事件处理类经常会被用来处理一个最近已经接收的 ChannelChannelInitializer 是一个特殊的处理类,目的是帮助使用者配置一个新的 Channel 。 使用其对应的 ChannelPipeline 来加入你的服务逻辑处理(这里是 DiscardServerHandler )。当你的程序变的复杂时,可能你会增加更多的处理类到 pipline 上,然后提取这些匿名类到最顶层的类上(匿名类即 ChannelInitializer 实例我们可以将其看成是一个代理模式的设计,类似于 ReactorSubscriber 的设计实现,一层又一层的包装,最后得到一个我们需要的一个可以层层处理的 Subscriber )。
  5. 你可以设置这里指定的 Channel 实现的配置参数。如果我们写一个 TCP/IP 的服务端,我们可以设置 socket 的参数选项,如 tcpNoDelaykeepAlive 。请参考 ChannelOptionChannelConfig 实现的接口文档来对 ChannelOption 的有一个大概的认识。
  6. 接着我们来看 option()childOption() : option() 是提供给 NioServerSocketChannel 用来接收进来的连接。 childOption() 是提供给由父管道 ServerChannel 接收到的连接,在这个例子中也是 NioServerSocketChannel
  7. 剩下的就是绑定端口然后启动服务。这里我们在服务器上绑定了其 8080 端口。当然现在你可以多次调用 bind() 方法(基于不同绑定地址)。

针对bootstrap的option的封装

在看了常见的 Netty 的一个服务器创建用法之后,我们来看 Reactor Netty 给我们提供的Http服务器的一个封装: reactor.ipc.netty.http.server.HttpServer 。由上面 DiscardServer Demo 可知,首先是定义一个服务器,方便设定一些条件对其进行配置,然后启动的话是调用其 run 方法启动,为做到更好的可配置性,这里使用了建造器模式,以便我们自定义或直接使用默认配置(有些是必须配置,否则会抛出异常,这也是我们这里面所设定的内容之一):

//reactor.ipc.netty.http.server.HttpServer.Builder
public static final class Builder {
    private String bindAddress = null;
    private int port = 8080;
    private Supplier<InetSocketAddress> listenAddress = () -> new InetSocketAddress(NetUtil.LOCALHOST, port);
    private Consumer<? super HttpServerOptions.Builder> options;

    private Builder() {
    }
    ...
    public final Builder port(int port) {
        this.port = port;
        return this;
    }

    /**
        * The options for the server, including bind address and port.
        *
        * @param options the options for the server, including bind address and port.
        * @return {@code this}
        */
    public final Builder options(Consumer<? super HttpServerOptions.Builder> options) {
        this.options = Objects.requireNonNull(options, "options");
        return this;
    }

    public HttpServer build() {
        return new HttpServer(this);
    }
}
复制代码

可以看到,此处的 HttpServer.Builder#options 是一个函数式动作 Consumer ,其传入的参数是 HttpServerOptions.Builder ,在 HttpServerOptions.Builder 内可以针对我们在 DiscardServer Demo 中的 bootstrap.option 进行一系列的默认配置或者自行调控配置,我们的对于 option 的自定义设置主要还是针对于 ServerBootstrap#childOption 。因为在 reactor.ipc.netty.options.ServerOptions.Builder#option 这个方法中,有对它的父类 reactor.ipc.netty.options.NettyOptions.Builder#option 进行了相应的重写:

//reactor.ipc.netty.options.ServerOptions.Builder
public static class Builder<BUILDER extends Builder<BUILDER>>
	extends NettyOptions.Builder<ServerBootstrap, ServerOptions, BUILDER>{...}
	
//reactor.ipc.netty.options.ServerOptions.Builder#option
/**
* Set a {@link ChannelOption} value for low level connection settings like
* SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote
* peer.
*
* @param key the option key
* @param <T> the option type
* @return {@code this}
* @see ServerBootstrap#childOption(ChannelOption, Object)
*/
@Override
public final <T> BUILDER option(ChannelOption<T> key, T value) {
this.bootstrapTemplate.childOption(key, value);
return get();
}
//reactor.ipc.netty.options.NettyOptions.Builder#option
/**
* Set a {@link ChannelOption} value for low level connection settings like
* SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote
* peer.
*
* @param key the option key
* @param value the option value
* @param <T> the option type
* @return {@code this}
* @see Bootstrap#option(ChannelOption, Object)
*/
public <T> BUILDER option(ChannelOption<T> key, T value) {
this.bootstrapTemplate.option(key, value);
return get();
}
复制代码

这是我们需要注意的地方。然后,我们再回到 reactor.ipc.netty.http.server.HttpServer.Builder ,从其 build 这个方法可知,其返回一个 HttpServer 实例,通过对所传入的 HttpServer.Builder 实例的 options 进行判断,接着,就是对 bootstrap.group 的判断,因为要使用构造器配置的话,首先得获取到 ServerBootstrap ,所以要先判断是否有可用 EventLoopGroup ,这个我们是可以自行设定的,这里设定一次, bossGroupworkerGroup 可能都会调用这一个,这点要注意下( loopResources 源码注释已经讲的很明确了):

//reactor.ipc.netty.http.server.HttpServer.Builder#build
public HttpServer build() {
    return new HttpServer(this);
}
//reactor.ipc.netty.http.server.HttpServer#HttpServer
private HttpServer(HttpServer.Builder builder) {
    HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder();
    if (Objects.isNull(builder.options)) {
        if (Objects.isNull(builder.bindAddress)) {
            serverOptionsBuilder.listenAddress(builder.listenAddress.get());
        }
        else {
            serverOptionsBuilder.host(builder.bindAddress).port(builder.port);
        }
    }
    else {
        builder.options.accept(serverOptionsBuilder);
    }
    if (!serverOptionsBuilder.isLoopAvailable()) {
        serverOptionsBuilder.loopResources(HttpResources.get());
    }
    this.options = serverOptionsBuilder.build();
    this.server = new TcpBridgeServer(this.options);
}
//reactor.ipc.netty.options.NettyOptions.Builder
public static abstract class Builder<BOOTSTRAP extends AbstractBootstrap<BOOTSTRAP, ?>,
SO extends NettyOptions<BOOTSTRAP, SO>, BUILDER extends Builder<BOOTSTRAP, SO, BUILDER>>
implements Supplier<BUILDER> {
    ...
/**
* Provide a shared {@link EventLoopGroup} each Connector handler.
*
* @param eventLoopGroup an eventLoopGroup to share
* @return {@code this}
*/
public final BUILDER eventLoopGroup(EventLoopGroup eventLoopGroup) {
Objects.requireNonNull(eventLoopGroup, "eventLoopGroup");
return loopResources(preferNative -> eventLoopGroup);
}
/**
* Provide an {@link EventLoopGroup} supplier.
* Note that server might call it twice for both their selection and io loops.
*
* @param channelResources a selector accepting native runtime expectation and
* returning an eventLoopGroup
* @return {@code this}
*/
public final BUILDER loopResources(LoopResources channelResources) {
this.loopResources = Objects.requireNonNull(channelResources, "loopResources");
return get();
}

public final boolean isLoopAvailable() {
return this.loopResources != null;
}
...
}
复制代码

可以看到,这个类是 Supplier 实现,其是一个对象提取器,即属于一个函数式动作对象,适合用于懒加载的场景。这里的 LoopResources 也是一个函数式接口( @FunctionalInterface ),其设计的初衷就是为 io.netty.channel.Channel 的工厂方法服务的:

//reactor.ipc.netty.resources.LoopResources
@FunctionalInterface
public interface LoopResources extends Disposable {

/**
* Default worker thread count, fallback to available processor
*/
int DEFAULT_IO_WORKER_COUNT = Integer.parseInt(System.getProperty(
    "reactor.ipc.netty.workerCount",
    "" + Math.max(Runtime.getRuntime()
                .availableProcessors(), 4)));
/**
* Default selector thread count, fallback to -1 (no selector thread)
*/
int DEFAULT_IO_SELECT_COUNT = Integer.parseInt(System.getProperty(
    "reactor.ipc.netty.selectCount",
    "" + -1));
/**
* Create a simple {@link LoopResources} to provide automatically for {@link
* EventLoopGroup} and {@link Channel} factories
*
* @param prefix the event loop thread name prefix
*
* @return a new {@link LoopResources} to provide automatically for {@link
* EventLoopGroup} and {@link Channel} factories
*/
static LoopResources create(String prefix) {
return new DefaultLoopResources(prefix, DEFAULT_IO_SELECT_COUNT,
        DEFAULT_IO_WORKER_COUNT,
        true);
}
static LoopResources create(String prefix,
			int selectCount,
			int workerCount,
			boolean daemon) {
		...
		return new DefaultLoopResources(prefix, selectCount, workerCount, daemon);
	}
...
/**
* Callback for server {@link EventLoopGroup} creation.
*
* @param useNative should use native group if current {@link #preferNative()} is also
* true
*
* @return a new {@link EventLoopGroup}
*/
EventLoopGroup onServer(boolean useNative);
...
}
复制代码

我们在自定义的时候,可以借助此类的静态方法 create 方法来快速创建一个 LoopResources 实例。另外通过 LoopResources 的函数式特性,可以做到懒加载(将我们想要实现的业务藏到一个方法内),即,只有在使用的时候才会生成所需要的对象实例,即在使用 reactor.ipc.netty.options.NettyOptions.Builder#loopResources(LoopResources channelResources) 方法时,可进行 loopResources(true -> new NioEventLoopGroup()) ,即在拿到 LoopResources 实例后,只有调用其 onServer 方法,才能拿到 EventLoopGroup 。这样就可以大大节省内存资源,提高性能。

原文  https://juejin.im/post/5c76cea45188251fd46ee923
正文到此结束
Loading...