转载

一个群聊的netty 例子

翻了翻《Netty 实战》,看了些概念,还是觉得似是而非,于是写了些代码,通道啥的都建得挺好。但是channelRead0一直收不到消息。

后来东摸西摸,发现我client 创建channel后马上发消息是不对的,此时channel还没有完全建好,导致服务端收不到消息。改成通道active后,server 给 client发送消息,client 收到消息后才开始向server发消息,此时sever是能收到的。

实现Server handler

@Slf4j
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 处理handler 添加,全局保存channel
        log.info("add a handler");
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入/n");
        }
        channels.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 处理handler 删除,全局删除channel
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开/n");
        }
        channels.remove(ctx.channel());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel incoming = ctx.channel();

        log.info("收到消息: " + msg);
        for (Channel channel : channels) {
            if (channel != incoming) {
                // 群发给其他用户
                channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + msg + "/n");
            } else {
                // 回应当前用户
                channel.writeAndFlush("[响应]" + msg + "/n");
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 通道激活,通知client,可以准备通信了
        Channel incoming = ctx.channel();
        log.info("ChatClient:" + incoming.remoteAddress() + "在线");
        incoming.writeAndFlush("welcome" + "/n");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 通道失效,client 掉线
        Channel incoming = ctx.channel();
        log.info("ChatClient:" + incoming.remoteAddress() + "掉线");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
    {
        Channel incoming = ctx.channel();

        // 当出现异常就关闭连接
        log.info("ChatClient:" + incoming.remoteAddress() + "异常");
        cause.printStackTrace();
        ctx.close();
    }
}

服务端通过引导绑定IP

ServerBootstrap serverBootstrap = new ServerBootstrap();

        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        serverBootstrap
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(
                        new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel ch) throws Exception 
                            {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast("handler", new ChatServerHandler());
                                log.info("ChatClient:" + ch.remoteAddress() + "连接上");
                            }
                        }
                ).option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .bind(18169);

实现 client handler

@Slf4j
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        log.info(msg);
        Channel ch = ctx.channel();
        
        // 收到welcome后,给server发送消息
        if(msg.startsWith("welcome")){
            ch.writeAndFlush("hello world!" + "/r/n");
            ch.writeAndFlush("wish you happy!" + "/r/n");
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
    {
        cause.printStackTrace();
        //        出现异常时关闭连接
        ctx.close();
    }
}

通过引导创建 client channel

Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();

        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(
                        new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel ch) throws Exception 
                            {
                                ChannelPipeline pipeline = ch.pipeline();
                                // 解决粘包问题
                                pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast("handler", new ChatClientHandler());
//                                log.info("ChatServer:" + ch.remoteAddress() + "连接上");
                            }
                        }
                ).option(ChannelOption.SO_KEEPALIVE, true);

        Channel channel = bootstrap.connect("127.0.0.1", 18169).channel();

    }
原文  https://segmentfault.com/a/1190000018630686
正文到此结束
Loading...