转载

netty的自我学习(七)—Netty的简单入门案例

学习这件事,不在乎有没有人督促你,最重要的是在于你自己有没有觉悟和恒心。

温习中,会以笔记的形式记录下自我学习的过程。预计1月底之前更新完毕,请关注。

文章部分图片来源于视频笔记!!非我自画!!

netty的自我学习(七)—Netty的简单入门案例
netty的自我学习(一)—BIO、NIO、AIO的简单介绍

netty的自我学习(二)—初识NIO以及Buffer

netty的自我学习(三)—NIO的channel

netty的自我学习(四)—NIO的Selector(选择器)

netty的自我学习(五)—NIO之零拷贝

netty的自我学习(六)—Reactor模型以及Netty模型介绍

一个入门的Netty案例

服务端

package netty.netty.simple;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
*  https://my.oschina.net/javamaster/blog/2994961
*  https://blog.csdn.net/zuixiaoyao_001/article/details/90198968
*/
public class NettyServer {
 public static void main(String[] args) throws Exception {


     //创建BossGroup 和 WorkerGroup
     //说明
     //1. 创建两个线程组 bossGroup 和 workerGroup
     //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
     //3. 两个都是无限循环
     //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
     //   默认实际 cpu核数 * 2
     //
     EventLoopGroup bossGroup = new NioEventLoopGroup(1);
     EventLoopGroup workerGroup = new NioEventLoopGroup(); //8

     try {
         //创建服务器端的启动对象,配置参数
         ServerBootstrap bootstrap = new ServerBootstrap();

         //使用链式编程来进行设置
         bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                 .channel(NioServerSocketChannel.class) //bossGroup使用NioSocketChannel 作为服务器的通道实现
                 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数 option主要是针对boss线程组,
                 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 child主要是针对worker线程组
//                    .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
                 .childHandler(new ChannelInitializer<SocketChannel>() {//workerGroup使用 SocketChannel创建一个通道初始化对象(匿名对象)
                     //给pipeline 设置处理器
                     @Override
                     protected void initChannel(SocketChannel ch) throws Exception {
                         System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
                         ch.pipeline().addLast(new NettyServerHandler());
                     }
                 }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器

         System.out.println(".....服务器 is ready...");

         //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
         //启动服务器(并绑定端口)
         ChannelFuture cf = bootstrap.bind(6668).sync();

         //给cf 注册监听器,监控我们关心的事件

         cf.addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future) throws Exception {
                 if (cf.isSuccess()) {
                     System.out.println("监听端口 6668 成功");
                 } else {
                     System.out.println("监听端口 6668 失败");
                 }
             }
         });
         //对关闭通道进行监听
         cf.channel().closeFuture().sync();
     }finally {
         bossGroup.shutdownGracefully();
         workerGroup.shutdownGracefully();
     }

 }

}

复制代码

NettyServerHandler

package netty.netty.simple;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;

import java.util.concurrent.TimeUnit;

/*
说明
1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
2. 这时我们自定义一个Handler , 才能称为一个handler
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //读取数据实际(这里我们可以读取客户端发送的消息)
    /*
    1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
    2. Object msg: 就是客户端发送的数据 默认Object
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

/*

        //比如这里我们有一个非常耗时长的业务-> 异步执行 -> 提交该channel 对应的
        //NIOEventLoop 的 taskQueue中,

        //解决方案1 用户程序自定义的普通任务

        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
                    System.out.println("channel code=" + ctx.channel().hashCode());
                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        });

        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵3", CharsetUtil.UTF_8));
                    System.out.println("channel code=" + ctx.channel().hashCode());
                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        });

        //解决方案2 : 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中

        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8));
                    System.out.println("channel code=" + ctx.channel().hashCode());
                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        }, 5, TimeUnit.SECONDS);



        System.out.println("go on ...");*/


        System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
        System.out.println("server ctx =" + ctx);
        System.out.println("看看channel 和 pipeline的关系");
        Channel channel = ctx.channel();
        ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站


        //将 msg 转成一个 ByteBuf
        //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:" + channel.remoteAddress());
    }

    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        //writeAndFlush 是 write + flush
        //将数据写入到缓存,并刷新
        //一般讲,我们对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
    }

    //处理异常, 一般是需要关闭通道

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

复制代码

NettyClient

package netty.netty.simple;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
    public static void main(String[] args) throws Exception {

        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();


        try {
            //创建客户端启动对象
            //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
                        }
                    });

            System.out.println("客户端 ok..");

            //启动客户端去连接服务器端
            //关于 ChannelFuture 要分析,涉及到netty的异步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //给关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        }finally {

            group.shutdownGracefully();

        }
    }
}

复制代码

NettyClientHandler

package netty.netty.simple;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    //当通道就绪就会触发该方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
    }

    //当通道有读取事件时,会触发
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

复制代码

针对以上DEMO分析

1. bossGroup和workerGroup 如何确定有几个线程组的

netty的自我学习(七)—Netty的简单入门案例

根据源码可看,默认的线程组数量是CPU核数*2

netty的自我学习(七)—Netty的简单入门案例

我本机机器是4核的,所以默认是8个线程组。

2. workGroup 循环线程组演示

我们启动多个客户端,观察下。

netty的自我学习(七)—Netty的简单入门案例

我们顺序开启8个客户端,然后workGroup的8个线程组,循环执行,等第9个客户端连接的时候,又会从第一个线程组开始,以此证明是一个循环事件。

netty的自我学习(七)—Netty的简单入门案例

3. channle和pipeline的关系

netty的自我学习(七)—Netty的简单入门案例
netty的自我学习(七)—Netty的简单入门案例

channle和pipeline是互相包含的,通过channel可以获取pipeline,通过pipeline也可以获取对应的channel. pipeline 是一个双向链表。

4. 任务队列中的 Task 有 3 种典型使用场景

如果某个业务长时间执行,那么就会对handler造成长时间的阻塞,因此对于这些任务可以提交到对应的taskQueue队列中,异步执行。

1 自定义普通任务

//比如这里我们有一个非常耗时长的业务-> 异步执行 -> 提交该channel 对应的
        //NIOEventLoop 的 taskQueue中,

        //解决方案1 用户程序自定义的普通任务

        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
                    System.out.println("channel code=" + ctx.channel().hashCode());
                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        });

        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵3", CharsetUtil.UTF_8));
                    System.out.println("channel code=" + ctx.channel().hashCode());
                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        });
        //因为是在一个线程中,所以第一个任务执行完,在执行第二个任务,即第二个任务的执行触发时间为 5*1000 +5*1000
复制代码

2 自定义定时任务 scheduleTaskQueue

//解决方案2 : 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中

        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8));
                    System.out.println("channel code=" + ctx.channel().hashCode());
                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        }, 5, TimeUnit.SECONDS);
复制代码
netty的自我学习(七)—Netty的简单入门案例
netty的自我学习(七)—Netty的简单入门案例

可从ctx里看是否在任务队列中。

互相讨论、共同进步

文章是笔者分享的学习笔记,若你觉得可以、还行、过得去、甚至不太差的话,可以“点赞”一下的哦。就此谢过!

持之以恒!

netty的自我学习(七)—Netty的简单入门案例
原文  https://juejin.im/post/5e1ba5446fb9a02ff32002db
正文到此结束
Loading...