从零单排,使用 Netty 构建 IM 聊天室~

1. 概述

在 《芋道 Spring Boot WebSocket 入门》 文章中,我们使用 WebSocket 实现了一个简单的 IM 功能,支持身份认证、私聊消息、群聊消息。

然后就有胖友私信艿艿,希望使用纯 Netty 实现一个类似的功能。良心的艿艿,当然不会给她发红人卡,因此就有了本文。可能有胖友不知道 Netty 是什么,这里简单介绍下:

Netty 是一个 Java 开源框架。

Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。

Netty 相当简化和流线化了网络应用的编程开发过程,例如,TCPUDP 的 Socket 服务开发。

下面,我们来新建三个项目,如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

  • lab-67-netty-demo-server 项目:搭建 Netty 服务端。
  • lab-67-netty-demo-client 项目:搭建 Netty 客户端。
  • lab-67-netty-demo-common 项目:提供 Netty 的基础封装,提供消息的编解码、分发的功能。

另外,我们也会提供 Netty 常用功能的示例:

  • 心跳机制,实现服务端对客户端的存活检测。
  • 断线重连,实现客户端对服务端的重新连接。

不哔哔,直接开干。

友情提示:可能会胖友担心,没有 Netty 基础是不是无法阅读本文?!

艿艿的想法,看!就硬看,按照代码先自己能搭建一下哈~文末,艿艿会提供一波 Netty 基础 入门的文章。

2. 构建 Netty 服务端与客户端

本文在提供完整代码示例,可见
https://github.com/YunaiV/Spr… 的
lab-67 目录

原创不易,给点个 Star 嘿,一起冲鸭!

本小节,我们先来使用 Netty 构建服务端与客户端的核心代码,让胖友对项目的代码有个初始的认知。

2.1 构建 Netty 服务端

创建 lab-67-netty-demo-server 项目,搭建 Netty 服务端 。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

下面,我们只会暂时看看 server 包下的代码,避免信息量过大,击穿胖友的秃头。

2.1.1 NettyServer

创建 NettyServer 类,Netty 服务端。代码如下:

@Component
public class NettyServer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${netty.port}")
    private Integer port;

    @Autowired
    private NettyServerHandlerInitializer nettyServerHandlerInitializer;

    /**
     * boss 线程组,用于服务端接受客户端的连接
     */
    private EventLoopGroup bossGroup = new NioEventLoopGroup();
    /**
     * worker 线程组,用于服务端接受客户端的数据读写
     */
    private EventLoopGroup workerGroup = new NioEventLoopGroup();
    /**
     * Netty Server Channel
     */
    private Channel channel;

    /**
     * 启动 Netty Server
     */
    @PostConstruct
    public void start() throws InterruptedException {
        // <2.1> 创建 ServerBootstrap 对象,用于 Netty Server 启动
        ServerBootstrap bootstrap = new ServerBootstrap();
        // <2.2> 设置 ServerBootstrap 的各种属性
        bootstrap.group(bossGroup, workerGroup) // <2.2.1> 设置两个 EventLoopGroup 对象
                .channel(NioServerSocketChannel.class)  // <2.2.2> 指定 Channel 为服务端 NioServerSocketChannel
                .localAddress(new InetSocketAddress(port)) // <2.2.3> 设置 Netty Server 的端口
                .option(ChannelOption.SO_BACKLOG, 1024) // <2.2.4> 服务端 accept 队列的大小
                .childOption(ChannelOption.SO_KEEPALIVE, true) // <2.2.5> TCP Keepalive 机制,实现 TCP 层级的心跳保活功能
                .childOption(ChannelOption.TCP_NODELAY, true) // <2.2.6> 允许较小的数据包的发送,降低延迟
                .childHandler(nettyServerHandlerInitializer);
        // <2> 绑定端口,并同步等待成功,即启动服务端
        ChannelFuture future = bootstrap.bind().sync();
        if (future.isSuccess()) {
            channel = future.channel();
            logger.info("[start][Netty Server 启动在 {} 端口]", port);
        }
    }

    /**
     * 关闭 Netty Server
     */
    @PreDestroy
    public void shutdown() {
        // <3.1> 关闭 Netty Server
        if (channel != null) {
            channel.close();
        }
        // <3.2> 优雅关闭两个 EventLoopGroup 对象
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

}

:fire: ① 在类上,添加 @Component 注解,把 NettyServer 的创建交给 Spring 管理

  • port 属性,读取 application.yml 配置文件的 netty.port 配置项。
  • #start() 方法,添加 @PostConstruct 注解,启动 Netty 服务器。
  • #shutdown() 方法,添加 @PreDestroy 注解,关闭 Netty 服务器。

:fire: ② 我们来详细看看 #start() 方法的代码,如何实现 Netty Server 的启动。

<2.1> 处,创建 ServerBootstrap 类,Netty 提供的 服务器 的启动类,方便我们初始化 Server。

<2.2> 处,设置 ServerBootstrap 的各种属性。

友情提示:这里涉及较多 Netty 组件的知识,艿艿先以简单的语言描述,后续胖友在文末的 Netty 基础入门的文章,补充学噢。

<2.2.1> 处,调用 #group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 方法,设置使用 bossGroupworkerGroup 。其中:

  • bossGroup 属性: Boss 线程组,用于服务端接受客户端的 连接
  • workerGroup 属性: Worker 线程组,用于服务端接受客户端的 数据读写

Netty 采用的是多 Reactor 多线程模型,服务端可以接受 更多 客户端的数据读写的能力。原因是:

  • 创建专门用于接受 客户端连接bossGroup 线程组,避免因为已连接的客户端的数据读写频繁,影响新的客户端的连接。
  • 创建专门用于接收 客户端读写workerGroup 线程组, 多个 线程进行客户端的数据读写,可以支持更多客户端。

课后习题:感兴趣的胖友,后续可以看看 《【NIO 系列】——之 Reactor 模型》 文章。

<2.2.2> 处,调用 #channel(Class<? extends C> channelClass) 方法,设置使用 NioServerSocketChannel 类,它是 Netty 定义的 NIO 服务端 TCP Socket 实现类。

<2.2.3> 处,调用 #localAddress(SocketAddress localAddress) 方法,设置服务端的 端口

<2.2.4> 处,调用 option#(ChannelOption<T> option, T value) 方法,设置服务端接受客户端的 连接队列 大小。因为 TCP 建立连接是三次握手,所以第一次握手完成后,会添加到服务端的连接队列中。

课后习题:更多相关内容,后续可以看看 《浅谈 TCP Socket 的 backlog 参数》 文章。

<2.2.5> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,TCP Keepalive 机制,实现 TCP 层级的 心跳保活 功能。

课后习题:更多相关内容,后续可以看看 《TCP Keepalive 机制刨根问底》 文章。

<2.2.6> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,允许 较小的数据包 的发送,降低延迟。

课后习题:更多相关内容,后续可以看看 《详解 Socket 编程 — TCP_NODELAY 选项》 文章。

<2.2.7> 处,调用 #childHandler(ChannelHandler childHandler) 方法,设置客户端连接上来的 Channel 的处理器为 NettyServerHandlerInitializer。稍后我们在「2.1.2 NettyServerHandlerInitializer」小节来看看。

<2.3> 处,调用 #bind() + #sync() 方法,绑定端口,并 同步 等待成功,即启动服务端。

:fire: ③ 我们来详细看看 #shutdown() 方法的代码,如何实现 Netty Server 的关闭。

<3.1> 处,调用 Channel 的 #close() 方法,关闭 Netty Server,这样客户端就不再能连接了。

<3.2> 处,调用 EventLoopGroup 的 #shutdownGracefully() 方法,优雅关闭 EventLoopGroup。例如说,它们里面的线程池

2.1.2 NettyServerHandlerInitializer

在看 NettyServerHandlerInitializer 的代码之前,我们需要先了解下 Netty 的 ChannelHandler 组件,用来处理 Channel 的各种事件。这里的事件很广泛,比如可以是连接、数据读写、异常、数据转换等等。

ChannelHandler 有非常多的子类,其中有个非常特殊的 ChannelInitializer ,它用于 Channel 创建时,实现自定义的初始化逻辑。这里我们创建的 NettyServerHandlerInitializer 类,就继承了 ChannelInitializer 抽象类,代码如下:

@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {

    /**
     * 心跳超时时间
     */
    private static final Integer READ_TIMEOUT_SECONDS = 3 * 60;

    @Autowired
    private MessageDispatcher messageDispatcher;
    @Autowired
    private NettyServerHandler nettyServerHandler;

    @Override
    protected void initChannel(Channel ch) {
        // <1> 获得 Channel 对应的 ChannelPipeline
        ChannelPipeline channelPipeline = ch.pipeline();
        // <2> 添加一堆 NettyServerHandler 到 ChannelPipeline 中
        channelPipeline
                // 空闲检测
                .addLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS))
                // 编码器
                .addLast(new InvocationEncoder())
                // 解码器
                .addLast(new InvocationDecoder())
                // 消息分发器
                .addLast(messageDispatcher)
                // 服务端处理器
                .addLast(nettyServerHandler)
        ;
    }

}

在每一个客户端与服务端建立完成连接时,服务端会创建一个 Channel 与之对应。此时,NettyServerHandlerInitializer 会进行执行 #initChannel(Channel c) 方法,进行自定义的初始化。

友情提示:创建的客户端的 Channel,不要和小节的 NioServerSocketChannel 混淆,不是同一个哈。

#initChannel(Channel ch) 方法的 ch 参数,就是此时创建的客户端 Channel。

<1> 处,调用 Channel 的 #pipeline() 方法,获得客户端 Channel 对应的 ChannelPipeline 。ChannelPipeline 由一系列的 ChannelHandler 组成,又或者说是 ChannelHandler 。这样, Channel 所有上所有的事件都会经过 ChannelPipeline,被其上的 ChannelHandler 所处理。

<2> 处,添加 五个 ChannelHandler 到 ChannelPipeline 中,每一个的作用看其上的注释。具体的,我们会在后续的小节详细解释。

2.1.3 NettyServerHandler

创建 NettyServerHandler 类,继承 ChannelInboundHandlerAdapter 类,实现客户端 Channel 建立 连接、 断开 连接、异常时的处理。代码如下:

@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private NettyChannelManager channelManager;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // 从管理器中添加
        channelManager.add(ctx.channel());
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) {
        // 从管理器中移除
        channelManager.remove(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error("[exceptionCaught][连接({}) 发生异常]", ctx.channel().id(), cause);
        // 断开连接
        ctx.channel().close();
    }

}

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

channelManager 属性,是我们实现的客户端 Channel 的管理器。

  • #channelActive(ChannelHandlerContext ctx) 方法,在客户端和服务端 建立 连接完成时,调用 NettyChannelManager 的 #add(Channel channel) 方法,添加到 其中
  • #channelUnregistered(ChannelHandlerContext ctx) 方法,在客户端和服务端 断开 连接时,调用 NettyChannelManager 的 #add(Channel channel) 方法,从其中 移除

具体的 NettyChannelManager 的源码,我们在「2.1.4 NettyChannelManager」小节中来瞅瞅~

#exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 方法,在处理 Channel 的事件发生异常时,调用 Channel 的 #close() 方法, 断开 和客户端的连接。

2.1.4 NettyChannelManager

创建 NettyChannelManager 类,提供 两种 功能。

:fire: ① 客户端 Channel 的 管理 。代码如下:

@Component
public class NettyChannelManager {

    /**
     * {@link Channel#attr(AttributeKey)} 属性中,表示 Channel 对应的用户
     */
    private static final AttributeKey<String> CHANNEL_ATTR_KEY_USER = AttributeKey.newInstance("user");

    private Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * Channel 映射
     */
    private ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>();
    /**
     * 用户与 Channel 的映射。
     *
     * 通过它,可以获取用户对应的 Channel。这样,我们可以向指定用户发送消息。
     */
    private ConcurrentMap<String, Channel> userChannels = new ConcurrentHashMap<>();

    /**
     * 添加 Channel 到 {@link #channels} 中
     *
     * @param channel Channel
     */
    public void add(Channel channel) {
        channels.put(channel.id(), channel);
        logger.info("[add][一个连接({})加入]", channel.id());
    }

    /**
     * 添加指定用户到 {@link #userChannels} 中
     *
     * @param channel Channel
     * @param user 用户
     */
    public void addUser(Channel channel, String user) {
        Channel existChannel = channels.get(channel.id());
        if (existChannel == null) {
            logger.error("[addUser][连接({}) 不存在]", channel.id());
            return;
        }
        // 设置属性
        channel.attr(CHANNEL_ATTR_KEY_USER).set(user);
        // 添加到 userChannels
        userChannels.put(user, channel);
    }

    /**
     * 将 Channel 从 {@link #channels} 和 {@link #userChannels} 中移除
     *
     * @param channel Channel
     */
    public void remove(Channel channel) {
        // 移除 channels
        channels.remove(channel.id());
        // 移除 userChannels
        if (channel.hasAttr(CHANNEL_ATTR_KEY_USER)) {
            userChannels.remove(channel.attr(CHANNEL_ATTR_KEY_USER).get());
        }
        logger.info("[remove][一个连接({})离开]", channel.id());
    }
}

:fire: ② 向客户端 Channel 发送 消息 。代码如下:

@Component
public class NettyChannelManager {

    /**
     * 向指定用户发送消息
     *
     * @param user 用户
     * @param invocation 消息体
     */
    public void send(String user, Invocation invocation) {
        // 获得用户对应的 Channel
        Channel channel = userChannels.get(user);
        if (channel == null) {
            logger.error("[send][连接不存在]");
            return;
        }
        if (!channel.isActive()) {
            logger.error("[send][连接({})未激活]", channel.id());
            return;
        }
        // 发送消息
        channel.writeAndFlush(invocation);
    }

    /**
     * 向所有用户发送消息
     *
     * @param invocation 消息体
     */
    public void sendAll(Invocation invocation) {
        for (Channel channel : channels.values()) {
            if (!channel.isActive()) {
                logger.error("[send][连接({})未激活]", channel.id());
                return;
            }
            // 发送消息
            channel.writeAndFlush(invocation);
        }
    }

}

2.1.5 引入依赖

创建 pom.xml 文件,引入 Netty 依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>lab-67-netty-demo</artifactId>
        <groupId>cn.iocoder.springboot.labs</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-67-netty-demo-server</artifactId>

    <properties>
        <!-- 依赖相关配置 -->
        <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
        <!-- 插件相关配置 -->
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!-- Spring Boot 基础依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- Netty 依赖 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.50.Final</version>
        </dependency>

        <!-- 引入 netty-demo-common 封装 -->
        <dependency>
            <groupId>cn.iocoder.springboot.labs</groupId>
            <artifactId>lab-67-netty-demo-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

2.1.6 NettyServerApplication

创建 NettyServerApplication 类,Netty Server 启动类。代码如下:

@SpringBootApplication
public class NettyServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(NettyServerApplication.class, args);
    }

}

2.1.7 简单测试

执行 NettyServerApplication 类,启动 Netty Server 服务器。日志如下:

... // 省略其他日志

2020-06-21 00:16:38.801  INFO 41948 --- [           main] c.i.s.l.n.server.NettyServer             : [start][Netty Server 启动在 8888 端口]
2020-06-21 00:16:38.893  INFO 41948 --- [           main] c.i.s.l.n.NettyServerApplication         : Started NettyServerApplication in 0.96 seconds (JVM running for 1.4)

Netty Server 启动在 8888 端口。

2.2 构建 Netty 客户端

创建 lab-67-netty-demo-client 项目,搭建 Netty 客户端 。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

下面,我们只会暂时看看 client 包下的代码,避免信息量过大,击穿胖友的秃头。

2.2.1 NettyClient

创建 NettyClient 类,Netty 客户端。代码如下:

@Component
public class NettyClient {

    /**
     * 重连频率,单位:秒
     */
    private static final Integer RECONNECT_SECONDS = 20;

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${netty.server.host}")
    private String serverHost;
    @Value("${netty.server.port}")
    private Integer serverPort;

    @Autowired
    private NettyClientHandlerInitializer nettyClientHandlerInitializer;

    /**
     * 线程组,用于客户端对服务端的连接、数据读写
     */
    private EventLoopGroup eventGroup = new NioEventLoopGroup();
    /**
     * Netty Client Channel
     */
    private volatile Channel channel;

    /**
     * 启动 Netty Server
     */
    @PostConstruct
    public void start() throws InterruptedException {
        // <2.1> 创建 Bootstrap 对象,用于 Netty Client 启动
        Bootstrap bootstrap = new Bootstrap();
        // <2.2>
        bootstrap.group(eventGroup) // <2.2.1> 设置一个 EventLoopGroup 对象
                .channel(NioSocketChannel.class)  // <2.2.2> 指定 Channel 为客户端 NioSocketChannel
                .remoteAddress(serverHost, serverPort) // <2.2.3> 指定连接服务器的地址
                .option(ChannelOption.SO_KEEPALIVE, true) // <2.2.4> TCP Keepalive 机制,实现 TCP 层级的心跳保活功能
                .option(ChannelOption.TCP_NODELAY, true) //<2.2.5>  允许较小的数据包的发送,降低延迟
                .handler(nettyClientHandlerInitializer);
        // <2.3> 连接服务器,并异步等待成功,即启动客户端
        bootstrap.connect().addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // 连接失败
                if (!future.isSuccess()) {
                    logger.error("[start][Netty Client 连接服务器({}:{}) 失败]", serverHost, serverPort);
                    reconnect();
                    return;
                }
                // 连接成功
                channel = future.channel();
                logger.info("[start][Netty Client 连接服务器({}:{}) 成功]", serverHost, serverPort);
            }

        });
    }

    public void reconnect() {
        // ... 暂时省略代码。
    }

    /**
     * 关闭 Netty Server
     */
    @PreDestroy
    public void shutdown() {
        // <3.1> 关闭 Netty Client
        if (channel != null) {
            channel.close();
        }
        // <3.2> 优雅关闭一个 EventLoopGroup 对象
        eventGroup.shutdownGracefully();
    }

    /**
     * 发送消息
     *
     * @param invocation 消息体
     */
    public void send(Invocation invocation) {
        if (channel == null) {
            logger.error("[send][连接不存在]");
            return;
        }
        if (!channel.isActive()) {
            logger.error("[send][连接({})未激活]", channel.id());
            return;
        }
        // 发送消息
        channel.writeAndFlush(invocation);
    }

}

友情提示:整体代码,是和对等,且基本是一致的。

:fire: ① 在类上,添加 @Component 注解,把 NettyClient 的创建交给 Spring 管理。

  • serverHostserverPort 属性,读取 application.yml 配置文件的 netty.server.hostnetty.server.port 配置项。
  • #start() 方法,添加 @PostConstruct 注解,启动 Netty 客户端。
  • #shutdown() 方法,添加 @PreDestroy 注解,关闭 Netty 客户端。

:fire: ② 我们来详细看看 #start() 方法的代码,如何实现 Netty Client 的启动,建立和服务器的连接。

<2.1> 处,创建 Bootstrap 类,Netty 提供的 客户端 的启动类,方便我们初始化 Client。

<2.2> 处,设置 Bootstrap 的各种属性。

<2.2.1> 处,调用 #group(EventLoopGroup group) 方法,设置使用 eventGroup 线程组,实现客户端对服务端的连接、数据读写。

<2.2.2> 处,调用 #channel(Class<? extends C> channelClass) 方法,设置使用 NioSocketChannel 类,它是 Netty 定义的 NIO 服务端 TCP Client 实现类。

<2.2.3> 处,调用 #remoteAddress(SocketAddress localAddress) 方法,设置连接服务端的 地址

<2.2.4> 处,调用 #option(ChannelOption<T> childOption, T value) 方法,TCP Keepalive 机制,实现 TCP 层级的 心跳保活 功能。

<2.2.5> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,允许 较小的数据包 的发送,降低延迟。

<2.2.7> 处,调用 #handler(ChannelHandler childHandler) 方法,设置 自己 Channel 的处理器为 NettyClientHandlerInitializer。稍后我们在「2.2.2 NettyClientHandlerInitializer」小节来看看。

<2.3> 处,调用 #connect() 方法,连接服务器,并 异步 等待成功,即启动客户端。同时,添加回调监听器 ChannelFutureListener,在连接服务端失败的时候,调用 #reconnect() 方法,实现定时重连。:smiling_imp: 具体 #reconnect() 方法的代码,我们稍后在瞅瞅哈。

③ 我们来详细看看 #shutdown() 方法的代码,如何实现 Netty Client 的关闭。

<3.1> 处,调用 Channel 的 #close() 方法,关闭 Netty Client,这样客户端就断开和服务端的连接。

<3.2> 处,调用 EventLoopGroup 的 #shutdownGracefully() 方法,优雅关闭 EventLoopGroup。例如说,它们里面的线程池。

#send(Invocation invocation) 方法,实现向服务端发送消息。

因为 NettyClient 是客户端,所以无需像 NettyServer 一样使用「2.1.4 NettyChannelManager」维护 Channel 的集合。

2.2.2 NettyClientHandlerInitializer

创建的 NettyClientHandlerInitializer 类,就继承了 ChannelInitializer 抽象类,实现和服务端建立连接后,添加相应的 ChannelHandler 处理器。代码如下:

@Component
public class NettyClientHandlerInitializer extends ChannelInitializer<Channel> {

    /**
     * 心跳超时时间
     */
    private static final Integer READ_TIMEOUT_SECONDS = 60;

    @Autowired
    private MessageDispatcher messageDispatcher;

    @Autowired
    private NettyClientHandler nettyClientHandler;

    @Override
    protected void initChannel(Channel ch) {
        ch.pipeline()
                // 空闲检测
                .addLast(new IdleStateHandler(READ_TIMEOUT_SECONDS, 0, 0))
                .addLast(new ReadTimeoutHandler(3 * READ_TIMEOUT_SECONDS))
                // 编码器
                .addLast(new InvocationEncoder())
                // 解码器
                .addLast(new InvocationDecoder())
                // 消息分发器
                .addLast(messageDispatcher)
                // 客户端处理器
                .addLast(nettyClientHandler)
        ;
    }

}

和「2.1.2 NettyServerHandlerInitializer」的代码基本一样,差别在于空闲检测额外增加 IdleStateHandler ,客户端处理器换成了 NettyClientHandler

2.2.3 NettyClientHandler

创建 NettyClientHandler 类,实现客户端 Channel 断开 连接、异常时的处理。代码如下:

@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private NettyClient nettyClient;

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 发起重连
        nettyClient.reconnect();
        // 继续触发事件
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error("[exceptionCaught][连接({}) 发生异常]", ctx.channel().id(), cause);
        // 断开连接
        ctx.channel().close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
        // 空闲时,向服务端发起一次心跳
        if (event instanceof IdleStateEvent) {
            logger.info("[userEventTriggered][发起一次心跳]");
            HeartbeatRequest heartbeatRequest = new HeartbeatRequest();
            ctx.writeAndFlush(new Invocation(HeartbeatRequest.TYPE, heartbeatRequest))
                    .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        } else {
            super.userEventTriggered(ctx, event);
        }
    }

}

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

#channelInactive(ChannelHandlerContext ctx) 方法,实现在和服务端 断开 连接时,调用 NettyClient 的 #reconnect() 方法,实现客户端 定时 和服务端 重连

#exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 方法,在处理 Channel 的事件发生异常时,调用 Channel 的 #close() 方法, 断开 和客户端的连接。

#userEventTriggered(ChannelHandlerContext ctx, Object event) 方法,在客户端在空闲时,向服务端发送一次心跳,即 心跳机制 。这块的内容,我们稍后详细讲讲。

2.2.4 引入依赖

创建 pom.xml 文件,引入 Netty 依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>lab-67-netty-demo</artifactId>
        <groupId>cn.iocoder.springboot.labs</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-67-netty-demo-client</artifactId>

    <properties>
        <!-- 依赖相关配置 -->
        <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
        <!-- 插件相关配置 -->
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!-- 实现对 Spring MVC 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Netty 依赖 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.50.Final</version>
        </dependency>

        <!-- 引入 netty-demo-common 封装 -->
        <dependency>
            <groupId>cn.iocoder.springboot.labs</groupId>
            <artifactId>lab-67-netty-demo-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

2.2.5 NettyClientApplication

创建 NettyClientApplication 类,Netty Client 启动类。代码如下:

@SpringBootApplication
public class NettyClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(NettyClientApplication.class, args);
    }

}

2.2.6 简单测试

执行 NettyClientApplication 类,启动 Netty Client 客户端。日志如下:

... // 省略其他日志

2020-06-21 09:06:12.205  INFO 44029 --- [ntLoopGroup-2-1] c.i.s.l.n.client.NettyClient             : [start][Netty Client 连接服务器(127.0.0.1:8888) 成功]

同时 Netty Server 服务端发现有一个客户端接入,打印如下日志:

2020-06-21 09:06:12.268  INFO 41948 --- [ntLoopGroup-3-1] c.i.s.l.n.server.NettyChannelManager     : [add][一个连接(db652822)加入]

2.3 小结

至此,我们已经构建 Netty 服务端和客户端完成。因为 Netty 提供的 API 非常便利,所以我们不会像直接使用 NIO 时,需要处理大量底层且细节的代码。

不过,如上的内容仅仅是本文的开胃菜,正片即将开始!美滋滋,继续往下看,奥利给!

3. 通信协议

在「2. 构建 Netty 服务端与客户端」小节中,我们实现了客户端和服务端的 连接 功能。而本小节,我们要让它们两能够说上话,即进行 数据的读写

在日常项目的开发中,前端和后端之间采用 HTTP 作为通信协议,使用 文本内容 进行交互,数据格式一般是 JSON 。但是在 TCP 的世界里,我们需要自己基于 二进制 构建,构建客户端和服务端的通信协议。

我们以客户端向服务端发送消息来举个例子,假设客户端要发送一个登录请求,对应的类如下:

public class AuthRequest {

    /** 用户名 **/
    private String username;
    /** 密码 **/
    private String password;
    
}
  • 显然,我们无法将一个 Java 对象直接丢到 TCP Socket 当中,而是需要将其转换成 byte 字节数组 ,才能写入到 TCP Socket 中去。即,需要将消息对象通过 序列化 ,转换成 byte 字节数组。
  • 同时,在服务端收到 byte 字节数组时,需要将其又转换成 Java 对象,即 反序列化 。不然,服务端对着一串 byte 字节处理个毛线?!

友情提示:服务端向客户端发消息,也是一样的过程哈!

序列化的工具非常多,例如说 Google 提供的 Protobuf ,性能高效,且序列化出来的二进制数据较小。Netty 对 Protobuf 进行集成,提供了相应的 编解码器 。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

但是考虑到很多胖友对 Protobuf 并不了解,因为它实现序列化又增加胖友的额外学习成本。因此,艿艿仔细一个捉摸,还是采用 JSON 方式进行序列化。可能胖友会疑惑,JSON 不是将 对象 转换成 字符串 吗?嘿嘿,我们再把字符串转换成 byte 字节数组 就可以啦~

下面,我们新建 lab-67-netty-demo-common 项目,并在 codec 包下,实现我们自定义的通信协议。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

3.1 Invocation

创建 Invocation 类,通信协议的消息体。代码如下:

/**
 * 通信协议的消息体
 */
public class Invocation {

    /**
     * 类型
     */
    private String type;
    /**
     * 消息,JSON 格式
     */
    private String message;

    // 空构造方法
    public Invocation() {
    }

    public Invocation(String type, String message) {
        this.type = type;
        this.message = message;
    }

    public Invocation(String type, Message message) {
        this.type = type;
        this.message = JSON.toJSONString(message);
    }
    
    // ... 省略 setter、getter、toString 方法
}

type 属性, 类型 ,用于匹配对应的消息处理器。如果类比 HTTP 协议, type 属性相当于请求地址。

message 属性,消息内容,使用 JSON 格式。

另外, Message 是我们定义的消息接口。代码如下:

public interface Message {

    // ... 空,作为标记接口

}

3.2 粘包与拆包

在开始看 Invocation 的编解码处理器之前,我们先了解下 粘包拆包 的概念。

如果的内容,引用 《Netty 解决粘包和拆包问题的四种方案》 文章的内容,进行二次编辑。

3.2.1 产生原因

产生粘包和拆包问题的主要原因是,操作系统在发送 TCP 数据的时候,底层会有一个缓冲区,例如 1024 个字节大小。

  • 如果一次请求发送的数据量 比较小 ,没达到缓冲区大小,TCP 则会将多个请求合并为同一个请求进行发送,这就形成了 粘包 问题。

    例如说,在 《详解 Socket 编程 — TCP_NODELAY 选项》 文章中我们可以看到,在关闭 Nagle 算法时,请求不会等待满足缓冲区大小,而是尽快发出,降低延迟。

  • 如果一次请求发送的数据量 比较大 ,超过了缓冲区大小,TCP 就会将其拆分为多次发送,这就是 拆包 ,也就是将一个大的包拆分为多个小包进行发送。

如下图展示了粘包和拆包的一个示意图,演示了粘包和拆包的三种情况:

从零单排,使用 Netty 构建 IM 聊天室~

  • A 和 B 两个包都刚好满足 TCP 缓冲区的大小,或者说其等待时间已经达到 TCP 等待时长,从而还是使用两个独立的包进行发送。
  • A 和 B 两次请求间隔时间内较短,并且数据包较小,因而合并为同一个包发送给服务端。
  • B 包比较大,因而将其拆分为两个包 B_1 和 B_2 进行发送,而这里由于拆分后的 B_2 比较小,其又与 A 包合并在一起发送。

3.2.2 解决方案

对于粘包和拆包问题,常见的解决方案有三种:

:fire: ① 客户端在发送数据包的时候,每个包都 固定长度 。比如 1024 个字节大小,如果客户端发送的数据长度不足 1024 个字节,则通过补充空格的方式补全到指定长度。

这种方式,艿艿暂时没有找到采用这种方式的案例。

:fire: ② 客户端在每个包的末尾使用固定的 分隔符 。例如 /r/n ,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的 /r/n ,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包。

具体的案例,有 HTTP、WebSocket、Redis。

:fire: ③ 将消息分为头部和消息体,在头部中保存有当前整个 消息的长度 ,只有在读取到足够长度的消息之后才算是读到了一个完整的消息。

友情提示:方案 ③ 是 ① 的升级版, 动态长度

本文,艿艿将采用这种方式,在每次 Invocation 序列化成字节数组写入 TCP Socket 之前,先将字节数组的长度写到其中。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

3.3 InvocationEncoder

创建 InvocationEncoder 类,实现将 Invocation 序列化,并写入到 TCP Socket 中。代码如下:

public class InvocationEncoder extends MessageToByteEncoder<Invocation> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) {
        // <2.1> 将 Invocation 转换成 byte[] 数组
        byte[] content = JSON.toJSONBytes(invocation);
        // <2.2> 写入 length
        out.writeInt(content.length);
        // <2.3> 写入内容
        out.writeBytes(content);
        logger.info("[encode][连接({}) 编码了一条消息({})]", ctx.channel().id(), invocation.toString());
    }

}

① MessageToByteEncoder 是 Netty 定义的 编码 ChannelHandler 抽象类,将泛型 <I> 消息转换成字节数组。

#encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) 方法,进行编码的逻辑。

<2.1> 处,调用 JSON 的 #toJSONBytes(Object object, SerializerFeature... features) 方法,将 Invocation 转换成 字节数组。

<2.2> 处,将字节数组的 长度 ,写入到 TCP Socket 当中。这样,后续「3.4 InvocationDecoder」可以根据该长度,解析到消息, 解决粘包和拆包的问题

友情提示:MessageToByteEncoder 会最终将 ByteBuf out 写到 TCP Socket 中。

<2.3> 处,将字节数组,写入到 TCP Socket 当中。

3.4 InvocationDecoder

创建 InvocationDecoder 类,实现从 TCP Socket 读取字节数组,反序列化成 Invocation。代码如下:

public class InvocationDecoder extends ByteToMessageDecoder {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // <2.1> 标记当前读取位置
        in.markReaderIndex();
        // <2.2> 判断是否能够读取 length 长度
        if (in.readableBytes() <= 4) {
            return;
        }
        // <2.3> 读取长度
        int length = in.readInt();
        if (length < 0) {
            throw new CorruptedFrameException("negative length: " + length);
        }
        // <3.1> 如果 message 不够可读,则退回到原读取位置
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }
        // <3.2> 读取内容
        byte[] content = new byte[length];
        in.readBytes(content);
        // <3.3> 解析成 Invocation
        Invocation invocation = JSON.parseObject(content, Invocation.class);
        out.add(invocation);
        logger.info("[decode][连接({}) 解析到一条消息({})]", ctx.channel().id(), invocation.toString());
    }

}

① ByteToMessageDecoder 是 Netty 定义的 解码 ChannelHandler 抽象类,在 TCP Socket 读取到 新数据 时,触发进行解码。

② 在 <2.1><2.2><2.3> 处,从 TCP Socket 中读取 长度

③ 在 <3.1><3.2><3.3> 处,从 TCP Socket 中读取 字节数组 ,并反序列化成 Invocation 对象。

最终,添加 List<Object> out 中,交给后续的 ChannelHandler 进行处理。稍后,我们将在小结中,会看到 MessageDispatcher 将 Invocation 分发到其对应的 MessageHandler 中,进行 业务 逻辑的执行。

3.5 引入依赖

创建 pom.xml 文件,引入 Netty、FastJSON 等等依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>lab-67-netty-demo</artifactId>
        <groupId>cn.iocoder.springboot.labs</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-67-netty-demo-common</artifactId>

    <properties>
        <!-- 插件相关配置 -->
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencies>
        <!-- Netty 依赖 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.50.Final</version>
        </dependency>

        <!-- FastJSON 依赖 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.71</version>
        </dependency>

        <!-- 引入 Spring 相关依赖 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>5.2.5.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.2.5.RELEASE</version>
        </dependency>

        <!-- 引入 SLF4J 依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
    </dependencies>

</project>

3.6 小结

至此,我们已经完成通信协议的定义、编解码的逻辑,是不是蛮有趣的?!

另外,我们在 NettyServerHandlerInitializer 和 NettyClientHandlerInitializer 的初始化代码中,将编解码器添加到其中。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

4. 消息分发

SpringMVC 中, DispatcherServlet 会根据请求地址、方法等,将请求分发到匹配的 Controller 的 Method 方法上。

lab-67-netty-demo-client 项目的 dispatcher 包中,我们创建了 MessageDispatcher 类,实现和 DispatcherServlet 类似的功能,将 Invocation 分发到其对应的 MessageHandler 中,进行 业务 逻辑的执行。

从零单排,使用 Netty 构建 IM 聊天室~

下面,我们来看看具体的代码实现。

4.1 Message

创建 Message 接口,定义消息的标记接口。代码如下:

public interface Message {
}

下图,是我们涉及到的 Message 实现类。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

4.2 MessageHandler

创建 MessageHandler 接口,消息处理器接口。代码如下:

public interface MessageHandler<T extends Message> {

    /**
     * 执行处理消息
     *
     * @param channel 通道
     * @param message 消息
     */
    void execute(Channel channel, T message);

    /**
     * @return 消息类型,即每个 Message 实现类上的 TYPE 静态字段
     */
    String getType();

}
<T>

下图,是我们涉及到的 MessageHandler 实现类。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

4.3 MessageHandlerContainer

创建 MessageHandlerContainer 类,作为 MessageHandler 的容器。代码如下:

public class MessageHandlerContainer implements InitializingBean {

    private Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * 消息类型与 MessageHandler 的映射
     */
    private final Map<String, MessageHandler> handlers = new HashMap<>();

    @Autowired
    private ApplicationContext applicationContext;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 通过 ApplicationContext 获得所有 MessageHandler Bean
        applicationContext.getBeansOfType(MessageHandler.class).values() // 获得所有 MessageHandler Bean
                .forEach(messageHandler -> handlers.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中
        logger.info("[afterPropertiesSet][消息处理器数量:{}]", handlers.size());
    }

    /**
     * 获得类型对应的 MessageHandler
     *
     * @param type 类型
     * @return MessageHandler
     */
    MessageHandler getMessageHandler(String type) {
        MessageHandler handler = handlers.get(type);
        if (handler == null) {
            throw new IllegalArgumentException(String.format("类型(%s) 找不到匹配的 MessageHandler 处理器", type));
        }
        return handler;
    }

    /**
     * 获得 MessageHandler 处理的消息类
     *
     * @param handler 处理器
     * @return 消息类
     */
    static Class<? extends Message> getMessageClass(MessageHandler handler) {
        // 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler);
        // 获得接口的 Type 数组
        Type[] interfaces = targetClass.getGenericInterfaces();
        Class<?> superclass = targetClass.getSuperclass();
        while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { // 此处,是以父类的接口为准
            interfaces = superclass.getGenericInterfaces();
            superclass = targetClass.getSuperclass();
        }
        if (Objects.nonNull(interfaces)) {
            // 遍历 interfaces 数组
            for (Type type : interfaces) {
                // 要求 type 是泛型参数
                if (type instanceof ParameterizedType) {
                    ParameterizedType parameterizedType = (ParameterizedType) type;
                    // 要求是 MessageHandler 接口
                    if (Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) {
                        Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
                        // 取首个元素
                        if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
                            return (Class<Message>) actualTypeArguments[0];
                        } else {
                            throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
                        }
                    }
                }
            }
        }
        throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
    }

}

① 实现 InitializingBean 接口,在 #afterPropertiesSet() 方法中,扫描所有 MessageHandler Bean ,添加到 MessageHandler 集合中。

② 在 #getMessageHandler(String type) 方法中,获得类型对应的 MessageHandler 对象。稍后,我们会在 MessageDispatcher 调用该方法。

③ 在 #getMessageClass(MessageHandler handler) 方法中,通过 MessageHandler 中,通过解析其类上的泛型,获得消息类型对应的 Class 类。这是参考 rocketmq-spring 项目的 DefaultRocketMQListenerContainer#getMessageType() 方法,进行略微修改。

友情提示:如果胖友对 Java 的泛型机制没有做过一点了解,可能略微有点硬核。可以先暂时跳过,知道意图即可。

4.4 MessageDispatcher

创建 MessageDispatcher 类,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。代码如下:

@ChannelHandler.Sharable
public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> {

    @Autowired
    private MessageHandlerContainer messageHandlerContainer;

    private final ExecutorService executor =  Executors.newFixedThreadPool(200);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) {
        // <3.1> 获得 type 对应的 MessageHandler 处理器
        MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType());
        // 获得  MessageHandler 处理器的消息类
        Class<? extends Message> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);
        // <3.2> 解析消息
        Message message = JSON.parseObject(invocation.getMessage(), messageClass);
        // <3.3> 执行逻辑
        executor.submit(new Runnable() {

            @Override
            public void run() {
                // noinspection unchecked
                messageHandler.execute(ctx.channel(), message);
            }

        });
    }

}

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

② SimpleChannelInboundHandler 是 Netty 定义的 消息处理 ChannelHandler 抽象类,处理消息的类型是 <I> 泛型时。

#channelRead0(ChannelHandlerContext ctx, Invocation invocation) 方法,处理消息,进行分发。

从零单排,使用 Netty 构建 IM 聊天室~

<3.1> 处,调用 MessageHandlerContainer 的 #getMessageHandler(String type) 方法,获得 Invocation 的 type 对应的 MessageHandler 处理器

然后,调用 MessageHandlerContainer 的 #getMessageClass(messageHandler) 方法,获得 MessageHandler 处理器的 消息类

<3.2> 处,调用 JSON 的 # parseObject(String text, Class<T> clazz) 方法,将 Invocation 的 message 解析成 MessageHandler 对应的 消息对象

<3.3> 处,丢到线程池中,然后调用 MessageHandler 的 #execute(Channel channel, T message) 方法,执行 业务逻辑

注意,为什么要丢到 executor 线程池中呢?我们先来了解下 EventGroup 的线程模型。

友情提示:在我们启动 Netty 服务端或者客户端时,都会设置其 EventGroup。

EventGroup 我们可以先简单理解成一个 线程池 ,并且线程池的大小 仅仅 是 CPU 数量 * 2。每个 Channel 仅仅 会被分配到其中的一个 线程 上,进行数据的读写。并且,多个 Channel 会 共享 一个线程,即使用同一个线程进行数据的读写。

那么胖友试着思考下,MessageHandler 的具体逻辑视线中,往往会涉及到 IO 处理 ,例如说进行数据库的读取。这样,就会导致一个 Channel 在执行 MessageHandler 的过程中, 阻塞 了共享当前线程的其它 Channel 的数据读取。

因此,我们在这里创建了 executor 线程池,进行 MessageHandler 的逻辑执行,避免 阻塞 Channel 的数据读取。

可能会有胖友说,我们是不是能够把 EventGroup 的线程池设置大一点,例如说 200 呢?对于长连接的 Netty 服务端,往往会有 1000 ~ 100000 的 Netty 客户端连接上来,这样无论设置多大的线程池,都会出现 阻塞 数据读取的情况。

友情提示:
executor

线程池,我们一般称之为业务线程池或者逻辑线程池,顾名思义,就是执行业务逻辑的。

这样的设计方式,目前 Dubbo 等等 RPC 框架,都采用这种方式。

后续,胖友可以认真阅读下 《【NIO 系列】——之 Reactor 模型》 文章,进一步理解。

4.5 NettyServerConfig

创建 NettyServerConfig 配置类,创建 MessageDispatcher 和 MessageHandlerContainer Bean。代码如下:

@Configuration
public class NettyServerConfig {

    @Bean
    public MessageDispatcher messageDispatcher() {
        return new MessageDispatcher();
    }

    @Bean
    public MessageHandlerContainer messageHandlerContainer() {
        return new MessageHandlerContainer();
    }

}

4.6 NettyClientConfig

友情提示:和「4.5 NettyServerConfig」小结一致。

创建 NettyClientConfig 配置类,创建 MessageDispatcher 和 MessageHandlerContainer Bean。代码如下:

@Configuration
public class NettyClientConfig {

    @Bean
    public MessageDispatcher messageDispatcher() {
        return new MessageDispatcher();
    }

    @Bean
    public MessageHandlerContainer messageHandlerContainer() {
        return new MessageHandlerContainer();
    }

}

4.7 小结

后续,我们将在如下小节,具体演示消息分发的使用:

5. 断开重连

Netty 客户端需要实现 断开重连 机制,解决各种情况下的断开情况。例如说:

  • Netty 客户端启动时,Netty 服务端处于挂掉,导致无法连接上。
  • 在运行过程中,Netty 服务端挂掉,导致连接被断开。
  • 任一一端网络抖动,导致连接异常断开。

具体的代码实现比较简单,只需要在两个地方增加 重连 机制。

  • Netty 客户端 启动 时,无法连接 Netty 服务端时,发起重连。
  • Netty 客户端 运行 时,和 Netty 断开连接时,发起重连。

考虑到重连会存在 失败 的情况,我们采用 定时 重连的方式,避免占用过多资源。

5.1 具体代码

① 在 NettyClient 中,提供 #reconnect() 方法,实现定时重连的逻辑。代码如下:

// NettyClient.java

public void reconnect() {
    eventGroup.schedule(new Runnable() {
        @Override
        public void run() {
            logger.info("[reconnect][开始重连]");
            try {
                start();
            } catch (InterruptedException e) {
                logger.error("[reconnect][重连失败]", e);
            }
        }
    }, RECONNECT_SECONDS, TimeUnit.SECONDS);
    logger.info("[reconnect][{} 秒后将发起重连]", RECONNECT_SECONDS);
}

通过调用 EventLoop 提供的 #schedule(Runnable command, long delay, TimeUnit unit) 方法,实现 定时 逻辑。而在内部的具体逻辑,调用 NettyClient 的 #start() 方法,发起连接 Netty 服务端。

又因为 NettyClient 在 #start() 方法在连接 Netty 服务端 失败 时,又会调用 #reconnect() 方法,从而再次发起 定时重连 。如此循环反复,知道 Netty 客户端连接上 Netty 服务端。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

② 在 NettyClientHandler 中,实现 #channelInactive(ChannelHandlerContext ctx) 方法,在发现和 Netty 服务端 断开 时,调用 Netty Client 的 #reconnect() 方法,发起重连。代码如下:

// NettyClientHandler.java

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    // 发起重连
    nettyClient.reconnect();
    // 继续触发事件
    super.channelInactive(ctx);
}

5.2 简单测试

① 启动 Netty Client,不要启动 Netty Server,控制台打印日志如下图:

从零单排,使用 Netty 构建 IM 聊天室~

可以看到 Netty Client 在连接失败时,不断发起定时重连。

② 启动 Netty Server,控制台打印如下图:

从零单排,使用 Netty 构建 IM 聊天室~

可以看到 Netty Client 成功重连上 Netty Server。

6. 心跳机制与空闲检测

在上文中,艿艿推荐胖友阅读 《TCP Keepalive 机制刨根问底》 文章,我们可以了解到 TCP 自带 的空闲检测机制, 默认 是 2 小时。这样的检测机制,从 系统资源 层面上来说是可以接受的。

但是在 业务 层面,如果 2 小时才发现客户端与服务端的连接 实际 已经断开,会导致中间非常多的消息丢失,影响客户的使用体验。

因此,我们需要在 业务 层面,自己实现空闲检测,保证 尽快 发现客户端与服务端 实际 已经断开的情况。实现逻辑如下:

  • 服务端 发现 180 秒未从 客户端 读取到消息, 主动 断开连接。
  • 客户端 发现 180 秒未从 服务端 读取到消息, 主动 断开连接。

考虑到客户端和服务端之间并不是一直有消息的交互,所以我们需要增加 心跳机制

  • 客户端 每 60 秒向 服务端 发起一次心跳消息,保证 服务端 可以读取到消息。
  • 服务端 在收到心跳消息时,回复 客户端 一条确认消息,保证 客户端 可以读取到消息。

友情提示:

  • 为什么是 180 秒?可以加大或者减小,看自己希望多快检测到连接异常。过短的时间,会导致心跳过于频繁,占用过多资源。
  • 为什么是 60 秒?三次机会,确认是否心跳超时。

虽然听起来有点复杂,但是实现起来并不复杂哈。

6.1 服务端的空闲检测

在 NettyServerHandlerInitializer 中,我们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

通过这样的方式,实现 服务端 发现 180 秒未从 客户端 读取到消息, 主动 断开连接。

6.2 客户端的空闲检测

友情提示:和「6.1 服务端的空闲检测」一致。

在 NettyClientHandlerInitializer 中,我们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

通过这样的方式,实现 客户端 发现 180 秒未从 服务端 读取到消息, 主动 断开连接。

6.3 心跳机制

Netty 提供了 IdleStateHandler 处理器,提供空闲检测的功能,在 Channel 的 读或者写 空闲时间太长时,将会触发一个 IdleStateEvent 事件。

这样,我们只需要在 NettyClientHandler 处理器中,在接收到 IdleStateEvent 事件时,客户端向客户端发送一次心跳消息。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

  • 其中, HeartbeatRequest 是心跳请求。

同时,我们在 服务端 项目中,创建了一个 HeartbeatRequestHandler 消息处理器,在收到客户端的心跳请求时,回复客户端一条确认消息。代码如下:

@Component
public class HeartbeatRequestHandler implements MessageHandler<HeartbeatRequest> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void execute(Channel channel, HeartbeatRequest message) {
        logger.info("[execute][收到连接({}) 的心跳请求]", channel.id());
        // 响应心跳
        HeartbeatResponse response = new HeartbeatResponse();
        channel.writeAndFlush(new Invocation(HeartbeatResponse.TYPE, response));
    }

    @Override
    public String getType() {
        return HeartbeatRequest.TYPE;
    }

}
  • 其中, HeartbeatResponse 是心跳确认响应

6.4 简单测试

启动 Netty Server 服务端,再启动 Netty Client 客户端,耐心等待 60 秒后,可以看到 心跳 日志如下:

// ... 客户端
2020-06-22 08:24:47.275  INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.c.handler.NettyClientHandler   : [userEventTriggered][发起一次心跳]
2020-06-22 08:24:47.335  INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(44223e18) 编码了一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})]
2020-06-22 08:24:47.408  INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(44223e18) 解析到一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]
2020-06-22 08:24:47.409  INFO 57005 --- [pool-1-thread-1] c.i.s.l.n.m.h.HeartbeatResponseHandler   : [execute][收到连接(44223e18) 的心跳响应]

// ... 服务端
2020-06-22 08:24:47.388  INFO 56998 --- [ntLoopGroup-3-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(34778465) 解析到一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})]
2020-06-22 08:24:47.390  INFO 56998 --- [pool-1-thread-1] c.i.s.l.n.m.h.HeartbeatRequestHandler    : [execute][收到连接(34778465) 的心跳请求]
2020-06-22 08:24:47.399  INFO 56998 --- [ntLoopGroup-3-1] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(34778465) 编码了一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]

7. 认证逻辑

友情提示:从本小节开始,我们就具体看看业务逻辑的处理示例。

认证的过程,如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

7.1 AuthRequest

创建 AuthRequest 类,定义用户 认证 请求。代码如下:

public class AuthRequest implements Message {

    public static final String TYPE = "AUTH_REQUEST";

    /**
     * 认证 Token
     */
    private String accessToken;
    
    // ... 省略 setter、getter、toString 方法
}

这里我们使用 accessToken 认证令牌进行认证。

因为一般情况下,我们使用 HTTP 进行登录系统,然后使用登录后的身份标识(例如说 accessToken 认证令牌),将客户端和当前用户进行认证绑定。

7.2 AuthResponse

创建 AuthResponse 类,定义用户 认证 响应。代码如下:

public class AuthResponse implements Message {

    public static final String TYPE = "AUTH_RESPONSE";

    /**
     * 响应状态码
     */
    private Integer code;
    /**
     * 响应提示
     */
    private String message;
    
    // ... 省略 setter、getter、toString 方法
}

7.3 AuthRequestHandler

服务端…

创建 AuthRequestHandler 类,为 服务端 处理 客户端 的认证请求。代码如下:

@Component
public class AuthRequestHandler implements MessageHandler<AuthRequest> {

    @Autowired
    private NettyChannelManager nettyChannelManager;

    @Override
    public void execute(Channel channel, AuthRequest authRequest) {
        // <1> 如果未传递 accessToken
        if (StringUtils.isEmpty(authRequest.getAccessToken())) {
            AuthResponse authResponse = new AuthResponse().setCode(1).setMessage("认证 accessToken 未传入");
            channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse));
            return;
        }

        // <2> ... 此处应有一段

        // <3> 将用户和 Channel 绑定
        // 考虑到代码简化,我们先直接使用 accessToken 作为 User
        nettyChannelManager.addUser(channel, authRequest.getAccessToken());

        // <4> 响应认证成功
        AuthResponse authResponse = new AuthResponse().setCode(0);
        channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse));
    }

    @Override
    public String getType() {
        return AuthRequest.TYPE;
    }

}

代码比较简单,胖友看看 <1><2><3><4> 上的注释。

7.4 AuthResponseHandler

客户端…

创建 AuthResponseHandler 类,为 客户端 处理 服务端 的认证响应。代码如下:

@Component
public class AuthResponseHandler implements MessageHandler<AuthResponse> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void execute(Channel channel, AuthResponse message) {
        logger.info("[execute][认证结果:{}]", message);
    }

    @Override
    public String getType() {
        return AuthResponse.TYPE;
    }

}

打印个认证结果,方便调试

7.5 TestController

客户端…

创建 TestController 类,提供 /test/mock 接口,模拟客户端向服务端发送请求。代码如下:

@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private NettyClient nettyClient;

    @PostMapping("/mock")
    public String mock(String type, String message) {
        // 创建 Invocation 对象
        Invocation invocation = new Invocation(type, message);
        // 发送消息
        nettyClient.send(invocation);
        return "success";
    }

}

7.6 简单测试

启动 Netty Server 服务端,再启动 Netty Client 客户端,然后使用 Postman 模拟一次认证请求。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

同时,可以看到认证成功的日志如下:

// 客户端...
2020-06-22 08:41:12.364  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(9e086597) 编码了一条消息(Invocation{type='AUTH_REQUEST', message='{"accessToken": "yunai"}'})]
2020-06-22 08:41:12.390  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='AUTH_RESPONSE', message='{"code":0}'})]
2020-06-22 08:41:12.392  INFO 57583 --- [pool-1-thread-1] c.i.s.l.n.m.auth.AuthResponseHandler     : [execute][认证结果:AuthResponse{code=0, message='null'}]

// 服务端...
2020-06-22 08:41:12.374  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(791f122b) 解析到一条消息(Invocation{type='AUTH_REQUEST', message='{"accessToken": "yunai"}'})]
2020-06-22 08:41:12.379  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(791f122b) 编码了一条消息(Invocation{type='AUTH_RESPONSE', message='{"code":0}'})]

8. 单聊逻辑

私聊的过程,如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

服务端负责将客户端 A 发送的私聊消息,转发给客户端 B。

8.1 ChatSendToOneRequest

创建 ChatSendToOneRequest 类,发送给指定人的私聊消息的请求。代码如下:

public class ChatSendToOneRequest implements Message {

    public static final String TYPE = "CHAT_SEND_TO_ONE_REQUEST";

    /**
     * 发送给的用户
     */
    private String toUser;
    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    
    // ... 省略 setter、getter、toString 方法
}

8.2 ChatSendResponse

创建 ChatSendResponse 类,聊天发送消息结果的响应。代码如下:

public class ChatSendResponse implements Message {

    public static final String TYPE = "CHAT_SEND_RESPONSE";

    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 响应状态码
     */
    private Integer code;
    /**
     * 响应提示
     */
    private String message;
    
    // ... 省略 setter、getter、toString 方法
}

8.3 ChatRedirectToUserRequest

创建 ChatRedirectToUserRequest 类, 转发消息给一个用户的请求。代码如下:

public class ChatRedirectToUserRequest implements Message {

    public static final String TYPE = "CHAT_REDIRECT_TO_USER_REQUEST";

    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    
    // ... 省略 setter、getter、toString 方法
}

友情提示:写完之后,艿艿突然发现少了一个 fromUser 字段,表示来自谁的消息。

8.4 ChatSendToOneHandler

服务端…

创建 ChatSendToOneHandler 类,为 服务端 处理 客户端 的私聊请求。代码如下:

@Component
public class ChatSendToOneHandler implements MessageHandler<ChatSendToOneRequest> {

    @Autowired
    private NettyChannelManager nettyChannelManager;

    @Override
    public void execute(Channel channel, ChatSendToOneRequest message) {
        // <1> 这里,假装直接成功
        ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0);
        channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse));

        // <2> 创建转发的消息,发送给指定用户
        ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId())
                .setContent(message.getContent());
        nettyChannelManager.send(message.getToUser(), new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest));
    }

    @Override
    public String getType() {
        return ChatSendToOneRequest.TYPE;
    }

}

代码比较简单,胖友看看 <1><2> 上的注释。

8.5 ChatSendResponseHandler

客户端…

创建 ChatSendResponseHandler 类,为 客户端 处理 服务端 的聊天响应。代码如下:

@Component
public class ChatSendResponseHandler implements MessageHandler<ChatSendResponse> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void execute(Channel channel, ChatSendResponse message) {
        logger.info("[execute][发送结果:{}]", message);
    }

    @Override
    public String getType() {
        return ChatSendResponse.TYPE;
    }

}

打印个聊天 发送 结果,方便调试。

8.6 ChatRedirectToUserRequestHandler

客户端

创建 ChatRedirectToUserRequestHandler 类,为 客户端 处理 服务端 的转发消息的请求。代码如下:

@Component
public class ChatRedirectToUserRequestHandler implements MessageHandler<ChatRedirectToUserRequest> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void execute(Channel channel, ChatRedirectToUserRequest message) {
        logger.info("[execute][收到消息:{}]", message);
    }

    @Override
    public String getType() {
        return ChatRedirectToUserRequest.TYPE;
    }

}

打印个聊天 接收消息 ,方便调试。

8.7 简单测试

① 启动 Netty Server 服务端。

② 启动 Netty Client 客户端 A 。然后使用 Postman 模拟一次认证请求(用户为 yunai )。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

③ 启动 Netty Client 客户端 B 。注意,需要设置 --server.port 端口为 8081,避免冲突。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

然后使用 Postman 模拟一次认证请求(用户为 tutou )。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

④ 最后使用 Postman 模拟一次 yunai 芋艿给 tutou 土豆发送一次私聊消息。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

同时,可以看到客户端 A 向客户端 B 发送私聊消息的日志如下:

// 客户端 A...(芋艿)
2020-06-22 08:48:09.505  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tudou", msgId: "1", content: "你猜"}'})]
2020-06-22 08:48:09.510  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})]
2020-06-22 08:48:09.511  INFO 57583 --- [ool-1-thread-69] c.i.s.l.n.m.c.ChatSendResponseHandler    : [execute][发送结果:ChatSendResponse{msgId='1', code=0, message='null'}]
2020-06-22 08:48:35.148  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tutou", msgId: "1", content: "你猜"}'})]
2020-06-22 08:48:35.150  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})]
2020-06-22 08:48:35.150  INFO 57583 --- [ool-1-thread-70] c.i.s.l.n.m.c.ChatSendResponseHandler    : [execute][发送结果:ChatSendResponse{msgId='1', code=0, message='null'}]

// 服务端 ...
2020-06-22 08:48:35.149  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(791f122b) 解析到一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tutou", msgId: "1", content: "你猜"}'})]
2020-06-22 08:48:35.149  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(791f122b) 编码了一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})]
2020-06-22 08:48:35.149  INFO 56998 --- [ntLoopGroup-3-3] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(79cb3a1e) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"你猜","msgId":"1"}'})]

// 客户端 B...(秃头)
2020-06-22 08:48:18.277  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.c.handler.NettyClientHandler   : [userEventTriggered][发起一次心跳]
2020-06-22 08:48:18.278  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(24fbc3e8) 编码了一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})]
2020-06-22 08:48:18.280  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(24fbc3e8) 解析到一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]
2020-06-22 08:48:18.281  INFO 59613 --- [pool-1-thread-4] c.i.s.l.n.m.h.HeartbeatResponseHandler   : [execute][收到连接(24fbc3e8) 的心跳响应]
2020-06-22 08:48:35.150  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(24fbc3e8) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"你猜","msgId":"1"}'})]
2020-06-22 08:48:35.151  INFO 59613 --- [pool-1-thread-5] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='1', content='你猜'}]

9. 群聊逻辑

群聊的过程,如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

服务端负责将客户端 A 发送的群聊消息,转发给客户端 A、B、C。

友情提示:考虑到逻辑简洁,艿艿提供的本小节的示例,并不是一个一个群,而是所有人在一个大的群聊中哈~

9.1 ChatSendToAllRequest

创建 ChatSendToOneRequest 类,发送给所有人的 群聊 消息的请求。代码如下:

public class ChatSendToAllRequest implements Message {

    public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST";

    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    
    // ... 省略 setter、getter、toString 方法
}

友情提示:如果是正经的群聊,会有一个 groupId 字段,表示群编号。

9.2 ChatSendResponse

和「8.2 ChatSendResponse」小节一致。

9.3 ChatRedirectToUserRequest

和「8.3 ChatRedirectToUserRequest」小节一致。

9.4 ChatSendToAllHandler

服务端…

创建 ChatSendToAllHandler 类,为 服务端 处理 客户端 的群聊请求。代码如下:

@Component
public class ChatSendToAllHandler implements MessageHandler<ChatSendToAllRequest> {

    @Autowired
    private NettyChannelManager nettyChannelManager;

    @Override
    public void execute(Channel channel, ChatSendToAllRequest message) {
        // <1> 这里,假装直接成功
        ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0);
        channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse));

        // <2> 创建转发的消息,并广播发送
        ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId())
                .setContent(message.getContent());
        nettyChannelManager.sendAll(new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest));
    }

    @Override
    public String getType() {
        return ChatSendToAllRequest.TYPE;
    }

}

代码比较简单,胖友看看 <1><2> 上的注释。

9.5 ChatSendResponseHandler

和「8.5 ChatSendResponseHandler」小节一致。

9.6 ChatRedirectToUserRequestHandler

和「8.6 ChatRedirectToUserRequestHandler」小节一致。

9.7 简单测试

① 启动 Netty Server 服务端。

② 启动 Netty Client 客户端 A 。然后使用 Postman 模拟一次认证请求(用户为 yunai )。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

③ 启动 Netty Client 客户端 B 。注意,需要设置 --server.port 端口为 8081,避免冲突。

从零单排,使用 Netty 构建 IM 聊天室~

④ 启动 Netty Client 客户端 C 。注意,需要设置 --server.port 端口为 8082,避免冲突。

从零单排,使用 Netty 构建 IM 聊天室~

⑤ 最后使用 Postman 模拟一次发送群聊消息。如下图所示:

从零单排,使用 Netty 构建 IM 聊天室~

同时,可以看到客户端 A 群发 给所有客户端的日志如下:

// 客户端 A...
2020-06-22 08:55:44.898  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ALL_REQUEST', message='{msgId: "2", content: "广播消息"}'})]
2020-06-22 08:55:44.901  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"2"}'})]
2020-06-22 08:55:44.901  INFO 57583 --- [ol-1-thread-148] c.i.s.l.n.m.c.ChatSendResponseHandler    : [execute][发送结果:ChatSendResponse{msgId='2', code=0, message='null'}]
2020-06-22 08:55:44.901  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.903  INFO 57583 --- [ol-1-thread-149] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}]

// 服务端...
2020-06-22 08:55:44.898  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(791f122b) 解析到一条消息(Invocation{type='CHAT_SEND_TO_ALL_REQUEST', message='{msgId: "2", content: "广播消息"}'})]
2020-06-22 08:55:44.901  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(791f122b) 编码了一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"2"}'})]
2020-06-22 08:55:44.901  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(791f122b) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.901  INFO 56998 --- [ntLoopGroup-3-3] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(79cb3a1e) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.901  INFO 56998 --- [ntLoopGroup-3-4] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(9dc03826) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]

// 客户端 B...
2020-06-22 08:55:44.902  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(24fbc3e8) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.902  INFO 59613 --- [ool-1-thread-83] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}]

// 客户端 C...
2020-06-22 08:55:44.901  INFO 61597 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9128c71c) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.903  INFO 61597 --- [ool-1-thread-16] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}]

666. 彩蛋

至此,我们已经通过 Netty 实现了一个简单的 IM 功能,是不是收获蛮大的,嘿嘿。

下面,良心的艿艿,再来推荐一波文章,嘿嘿。

  • 想要了解 Netty 源码 的,可以阅读 《Netty 实现原理与源码解析系统 —— 精品合集》 文章。
  • 想要入门 Netty 基础 的,可以阅读 《Netty Bootstrap(图解)》 文章。

等后续,艿艿会在 https://github.com/YunaiV/one… 开源项目中,实现一个相对完整的客服功能,哈哈哈~

原文 

https://segmentfault.com/a/1190000023231612

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 从零单排,使用 Netty 构建 IM 聊天室~

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址