Java编程方法论-Spring WebFlux篇 Reactor-Netty下HttpServer 的封装

本系列为本人Java编程方法论 响应式解读系列的 Webflux
部分,现分享出来,前置知识Rxjava2 ,Reactor的相关解读已经录制分享视频,并发布在b站,地址如下:

Rxjava源码解读与分享: www.bilibili.com/video/av345…

Reactor源码解读与分享: www.bilibili.com/video/av353…

NIO源码解读相关视频分享: www.bilibili.com/video/av432…

NIO源码解读视频相关配套文章:

BIO到NIO源码的一些事儿之BIO

BIO到NIO源码的一些事儿之NIO 上

BIO到NIO源码的一些事儿之NIO 中

BIO到NIO源码的一些事儿之NIO 下 之 Selector

BIO到NIO源码的一些事儿之NIO 下 Buffer解读 上

BIO到NIO源码的一些事儿之NIO 下 Buffer解读 下

Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 上

Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 下

其中,Rxjava与Reactor作为本人书中内容将不对外开放,大家感兴趣可以花点时间来观看视频,本人对着两个库进行了全面彻底细致的解读,包括其中的设计理念和相关的方法论,也希望大家可以留言纠正我其中的错误。

HttpServer 的封装

本书主要针对 Netty
服务器来讲,所以读者应具备有关 Netty
的基本知识和应用技能。接下来,我们将对 Reactor-netty
从设计到实现的细节一一探究,让大家真的从中学习到好的封装设计理念。本书在写时所参考的最新版本是 Reactor-netty 0.7.8.Release
这个版本,但现在已有 0.8
版本,而且 0.7
0.8
版本在源码细节有不小的变动,这点给大家提醒下。我会针对 0.8
版本进行全新的解读。

HttpServer 的引入

我们由上一章可知Tomcat使用 Connector
来接收和响应连接请求,这里,对于 Netty
来讲,如果我们想让其做为一个 web
服务器,我们先来看一个 Netty
常见的一个用法(这里摘自官方文档一个例子 DiscardServer Demo
):

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 丢弃任何进入的数据
 */
public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 等待服务器  socket 关闭 。
            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
复制代码
  1. NioEventLoopGroup
    是用来处理 I/O
    操作的多线程事件循环器, Netty
    提供了许多不同的 EventLoopGroup
    的实现用来处理不同的传输。在这个例子中我们实现了一个服务端的应用,因此会有2个 NioEventLoopGroup
    会被使用。第一个经常被叫做 BossGroup
    ,用来接收进来的连接。第二个经常被叫做 WorkerGroup
    ,用来处理已经被接收的连接,一旦 BossGroup
    接收到连接,就会把连接信息注册到 WorkerGroup
    上。如何知道多少个线程已经被使用,如何映射到已经创建的 Channel
    上都需要依赖于 EventLoopGroup
    的实现,并且可以通过构造函数来配置他们的关系。
  2. ServerBootstrap
    是一个启动 NIO
    服务的辅助启动类。你可以在这个服务中直接使用 Channel
    ,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。
  3. 这里我们通过指定使用 NioServerSocketChannel
    来举例说明一个新的 Channel
    如何接收传进来的连接。
  4. 这里的事件处理类经常会被用来处理一个最近已经接收的 Channel
    ChannelInitializer
    是一个特殊的处理类,目的是帮助使用者配置一个新的 Channel
    。 使用其对应的 ChannelPipeline
    来加入你的服务逻辑处理(这里是 DiscardServerHandler
    )。当你的程序变的复杂时,可能你会增加更多的处理类到 pipline
    上,然后提取这些匿名类到最顶层的类上(匿名类即 ChannelInitializer
    实例我们可以将其看成是一个代理模式的设计,类似于 Reactor
    Subscriber
    的设计实现,一层又一层的包装,最后得到一个我们需要的一个可以层层处理的 Subscriber
    )。
  5. 你可以设置这里指定的 Channel
    实现的配置参数。如果我们写一个 TCP/IP
    的服务端,我们可以设置 socket
    的参数选项,如 tcpNoDelay
    keepAlive
    。请参考 ChannelOption
    ChannelConfig
    实现的接口文档来对 ChannelOption
    的有一个大概的认识。
  6. 接着我们来看 option()
    childOption()
    : option()
    是提供给 NioServerSocketChannel
    用来接收进来的连接。 childOption()
    是提供给由父管道 ServerChannel
    接收到的连接,在这个例子中也是 NioServerSocketChannel
  7. 剩下的就是绑定端口然后启动服务。这里我们在服务器上绑定了其 8080
    端口。当然现在你可以多次调用 bind()
    方法(基于不同绑定地址)。

针对bootstrap的option的封装

在看了常见的 Netty
的一个服务器创建用法之后,我们来看 Reactor Netty
给我们提供的Http服务器的一个封装: reactor.ipc.netty.http.server.HttpServer
。由上面 DiscardServer Demo
可知,首先是定义一个服务器,方便设定一些条件对其进行配置,然后启动的话是调用其 run
方法启动,为做到更好的可配置性,这里使用了建造器模式,以便我们自定义或直接使用默认配置(有些是必须配置,否则会抛出异常,这也是我们这里面所设定的内容之一):

//reactor.ipc.netty.http.server.HttpServer.Builder
public static final class Builder {
    private String bindAddress = null;
    private int port = 8080;
    private Supplier<InetSocketAddress> listenAddress = () -> new InetSocketAddress(NetUtil.LOCALHOST, port);
    private Consumer<? super HttpServerOptions.Builder> options;

    private Builder() {
    }
    ...
    public final Builder port(int port) {
        this.port = port;
        return this;
    }

    /**
        * The options for the server, including bind address and port.
        *
        * @param options the options for the server, including bind address and port.
        * @return {@code this}
        */
    public final Builder options(Consumer<? super HttpServerOptions.Builder> options) {
        this.options = Objects.requireNonNull(options, "options");
        return this;
    }

    public HttpServer build() {
        return new HttpServer(this);
    }
}
复制代码

可以看到,此处的 HttpServer.Builder#options
是一个函数式动作 Consumer
,其传入的参数是 HttpServerOptions.Builder
,在 HttpServerOptions.Builder
内可以针对我们在 DiscardServer Demo
中的 bootstrap.option
进行一系列的默认配置或者自行调控配置,我们的对于 option
的自定义设置主要还是针对于 ServerBootstrap#childOption
。因为在 reactor.ipc.netty.options.ServerOptions.Builder#option
这个方法中,有对它的父类 reactor.ipc.netty.options.NettyOptions.Builder#option
进行了相应的重写:

//reactor.ipc.netty.options.ServerOptions.Builder
public static class Builder<BUILDER extends Builder<BUILDER>>
	extends NettyOptions.Builder<ServerBootstrap, ServerOptions, BUILDER>{...}
	
//reactor.ipc.netty.options.ServerOptions.Builder#option
/**
* Set a {@link ChannelOption} value for low level connection settings like
* SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote
* peer.
*
* @param key the option key
* @param <T> the option type
* @return {@code this}
* @see ServerBootstrap#childOption(ChannelOption, Object)
*/
@Override
public final <T> BUILDER option(ChannelOption<T> key, T value) {
this.bootstrapTemplate.childOption(key, value);
return get();
}
//reactor.ipc.netty.options.NettyOptions.Builder#option
/**
* Set a {@link ChannelOption} value for low level connection settings like
* SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote
* peer.
*
* @param key the option key
* @param value the option value
* @param <T> the option type
* @return {@code this}
* @see Bootstrap#option(ChannelOption, Object)
*/
public <T> BUILDER option(ChannelOption<T> key, T value) {
this.bootstrapTemplate.option(key, value);
return get();
}
复制代码

这是我们需要注意的地方。然后,我们再回到 reactor.ipc.netty.http.server.HttpServer.Builder
,从其 build
这个方法可知,其返回一个 HttpServer
实例,通过对所传入的 HttpServer.Builder
实例的 options
进行判断,接着,就是对 bootstrap.group
的判断,因为要使用构造器配置的话,首先得获取到 ServerBootstrap
,所以要先判断是否有可用 EventLoopGroup
,这个我们是可以自行设定的,这里设定一次, bossGroup
workerGroup
可能都会调用这一个,这点要注意下( loopResources
源码注释已经讲的很明确了):

//reactor.ipc.netty.http.server.HttpServer.Builder#build
public HttpServer build() {
    return new HttpServer(this);
}
//reactor.ipc.netty.http.server.HttpServer#HttpServer
private HttpServer(HttpServer.Builder builder) {
    HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder();
    if (Objects.isNull(builder.options)) {
        if (Objects.isNull(builder.bindAddress)) {
            serverOptionsBuilder.listenAddress(builder.listenAddress.get());
        }
        else {
            serverOptionsBuilder.host(builder.bindAddress).port(builder.port);
        }
    }
    else {
        builder.options.accept(serverOptionsBuilder);
    }
    if (!serverOptionsBuilder.isLoopAvailable()) {
        serverOptionsBuilder.loopResources(HttpResources.get());
    }
    this.options = serverOptionsBuilder.build();
    this.server = new TcpBridgeServer(this.options);
}
//reactor.ipc.netty.options.NettyOptions.Builder
public static abstract class Builder<BOOTSTRAP extends AbstractBootstrap<BOOTSTRAP, ?>,
SO extends NettyOptions<BOOTSTRAP, SO>, BUILDER extends Builder<BOOTSTRAP, SO, BUILDER>>
implements Supplier<BUILDER> {
    ...
/**
* Provide a shared {@link EventLoopGroup} each Connector handler.
*
* @param eventLoopGroup an eventLoopGroup to share
* @return {@code this}
*/
public final BUILDER eventLoopGroup(EventLoopGroup eventLoopGroup) {
Objects.requireNonNull(eventLoopGroup, "eventLoopGroup");
return loopResources(preferNative -> eventLoopGroup);
}
/**
* Provide an {@link EventLoopGroup} supplier.
* Note that server might call it twice for both their selection and io loops.
*
* @param channelResources a selector accepting native runtime expectation and
* returning an eventLoopGroup
* @return {@code this}
*/
public final BUILDER loopResources(LoopResources channelResources) {
this.loopResources = Objects.requireNonNull(channelResources, "loopResources");
return get();
}

public final boolean isLoopAvailable() {
return this.loopResources != null;
}
...
}
复制代码

可以看到,这个类是 Supplier
实现,其是一个对象提取器,即属于一个函数式动作对象,适合用于懒加载的场景。这里的 LoopResources
也是一个函数式接口( @FunctionalInterface
),其设计的初衷就是为 io.netty.channel.Channel
的工厂方法服务的:

//reactor.ipc.netty.resources.LoopResources
@FunctionalInterface
public interface LoopResources extends Disposable {

/**
* Default worker thread count, fallback to available processor
*/
int DEFAULT_IO_WORKER_COUNT = Integer.parseInt(System.getProperty(
    "reactor.ipc.netty.workerCount",
    "" + Math.max(Runtime.getRuntime()
                .availableProcessors(), 4)));
/**
* Default selector thread count, fallback to -1 (no selector thread)
*/
int DEFAULT_IO_SELECT_COUNT = Integer.parseInt(System.getProperty(
    "reactor.ipc.netty.selectCount",
    "" + -1));
/**
* Create a simple {@link LoopResources} to provide automatically for {@link
* EventLoopGroup} and {@link Channel} factories
*
* @param prefix the event loop thread name prefix
*
* @return a new {@link LoopResources} to provide automatically for {@link
* EventLoopGroup} and {@link Channel} factories
*/
static LoopResources create(String prefix) {
return new DefaultLoopResources(prefix, DEFAULT_IO_SELECT_COUNT,
        DEFAULT_IO_WORKER_COUNT,
        true);
}
static LoopResources create(String prefix,
			int selectCount,
			int workerCount,
			boolean daemon) {
		...
		return new DefaultLoopResources(prefix, selectCount, workerCount, daemon);
	}
...
/**
* Callback for server {@link EventLoopGroup} creation.
*
* @param useNative should use native group if current {@link #preferNative()} is also
* true
*
* @return a new {@link EventLoopGroup}
*/
EventLoopGroup onServer(boolean useNative);
...
}
复制代码

我们在自定义的时候,可以借助此类的静态方法 create
方法来快速创建一个 LoopResources
实例。另外通过 LoopResources
的函数式特性,可以做到懒加载(将我们想要实现的业务藏到一个方法内),即,只有在使用的时候才会生成所需要的对象实例,即在使用 reactor.ipc.netty.options.NettyOptions.Builder#loopResources(LoopResources channelResources)
方法时,可进行 loopResources(true -> new NioEventLoopGroup())
,即在拿到 LoopResources
实例后,只有调用其 onServer
方法,才能拿到 EventLoopGroup
。这样就可以大大节省内存资源,提高性能。

原文 

https://juejin.im/post/5c76cea45188251fd46ee923

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » Java编程方法论-Spring WebFlux篇 Reactor-Netty下HttpServer 的封装

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址