转载

MINA 网络黏包处理代码

本文完整代码,可以浏览:

https://github.com/hjj2017/xgame-code_server/blob/master/game_server/src/com/game/gameServer/framework/mina/MsgCumulativeFilter.java

我在网上查阅过的 MINA 黏包处理,一般都是放在 Decoder 中做的。也就是黏包处理和消息解码放在一起做,显得比较混乱不好打理。而以下这段代码,我是把黏包处理放在 Filter 中了。在具体使用时可以这样:

 1 // 创建 IO 接收器  2 NioSocketAcceptor acceptor = new NioSocketAcceptor();  3   4 // 获取责任链  5 DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();  6 // 处理网络粘包  7 chain.addLast("msgCumulative", new MsgCumulativeFilter());  8   9 // 添加自定义编解码器 10 chain.addLast("msgCodec", new ProtocolCodecFilter( 11     new XxxEncoder(), 12     new XxxDecoder() 13 )); 14  15 // 获取会话配置 16 IoSessionConfig cfg = acceptor.getSessionConfig(); 17  18 // 设置缓冲区大小 19 cfg.setReadBufferSize(4096); 20 // 设置 session 空闲时间 21 cfg.setIdleTime(IdleStatus.BOTH_IDLE, 10); 22  23 // 设置 IO 句柄 24 acceptor.setHandler(new XxxHandler()); 25 acceptor.setReuseAddress(true); 26  27 try { 28     // 绑定端口 29     acceptor.bind(new InetSocketAddress("127.0.0.1", 4400)); 30 } catch (Exception ex) { 31     // 输出错误日志 32     System.error.println(ex); 33 }

目前 Netty 框架要比 MINA 流行的多,而且 Netty 对网络黏包处理也做了很好的处理,不用开发者自己费那么大劲。我也考虑过迁移到 Netty 框架上,不过目前还没有找到特别充分的理由。闲话不多说了,以下就是黏包处理代码:

  1 package com.game.gameServer.framework.mina;   2    3 import java.util.concurrent.ConcurrentHashMap;   4    5 import org.apache.mina.core.buffer.IoBuffer;   6 import org.apache.mina.core.filterchain.IoFilterAdapter;   7 import org.apache.mina.core.session.IoSession;   8    9 import com.game.gameServer.framework.FrameworkLog;  10 import com.game.gameServer.msg.SpecialMsgSerialUId;  11 import com.game.part.msg.IoBuffUtil;  12   13 /**  14  * 消息粘包处理  15  *   16  * @author hjj2017  17  * @since 2014/3/17  18  *   19  */  20 class MsgCumulativeFilter extends IoFilterAdapter {  21     /**   22      * 从客户端接收的消息估计长度,  23      * {@value} 字节,   24      * 对于从客户端接收的数据来说, 都是简单的命令!   25      * 很少超过 {@value}B  26      *   27      */  28     private static final int DECODE_MSG_LEN = 64;  29     /** 容器 Buff 字典 */  30     private static final ConcurrentHashMap<Long, IoBuffer> _containerBuffMap = new ConcurrentHashMap<>();  31   32     @Override  33     public void sessionClosed(NextFilter nextFilter, IoSession sessionObj) throws Exception {  34         if (nextFilter == null ||   35             sessionObj == null) {  36             // 如果参数对象为空,   37             // 则直接退出!  38             FrameworkLog.LOG.error("null nextFilter or sessionObj");  39             return;  40         }  41   42         // 移除容器 Buff  43         removeContainerBuff(sessionObj);  44         // 向下传递  45         super.sessionClosed(nextFilter, sessionObj);  46     }  47   48     @Override  49     public void messageReceived(  50         NextFilter nextFilter, IoSession sessionObj, Object msgObj) throws Exception {  51         if (nextFilter == null ||   52             sessionObj == null) {  53             // 如果参数对象为空,   54             // 则直接退出!  55             FrameworkLog.LOG.error("null nextFilter or sessionObj");  56             return;  57         }  58   59         // 获取会话 UId  60         long sessionUId = sessionObj.getId();  61   62         if (!(msgObj instanceof IoBuffer)) {  63             // 如果消息对象不是 ByteBuff,   64             // 则直接向下传递!  65             FrameworkLog.LOG.warn("msgObj is not a IoBuff, sessionUId = " + sessionUId);  66             super.messageReceived(nextFilter, sessionObj, msgObj);  67         }  68   69         // 获取输入 Buff  70         IoBuffer inBuff = (IoBuffer)msgObj;  71   72         if (!inBuff.hasRemaining()) {  73             // 如果没有剩余内容,   74             // 则直接退出!  75             FrameworkLog.LOG.error("inBuff has not remaining, sessionUId = " + sessionUId);  76             return;  77         } else if (inBuff.remaining() <= 8) {  78             // 如果 <= 8 字节,   79             // 那还是执行粘包处理过程吧 ...  80             // 8 字节 = 消息长度 ( Short ) + 消息类型 ( Short ) + 时间戳 ( Int )  81             // 如果比这个长度都小,   82             // 那肯定不是一条完整消息 ...  83             this.msgRecv_0(nextFilter, sessionObj, inBuff);  84             return;  85         }  86   87         // 获取消息长度  88         final int msgSize = inBuff.getShort();  89         inBuff.position(0);  90   91         if (msgSize == inBuff.limit() &&   92             containerBuffIsEmpty(sessionObj)) {  93             //   94             // 如果消息长度和极限值刚好相同,   95             // 并且容器 Buff 中没有任何内容 ( 即, 上一次消息没有粘包 ),  96             // 那么直接向下传递!  97             //   98             super.messageReceived(  99                 nextFilter, sessionObj, inBuff 100             ); 101         } else { 102             //  103             // 如果消息长度和极限值不同,  104             // 则说明是网络粘包! 105             // 这时候跳转到粘包处理过程 ... 106             //  107             this.msgRecv_0(nextFilter, sessionObj, inBuff); 108         } 109     } 110  111     /** 112      * 接收连包消息 113      *  114      * @param nextFilter 115      * @param sessionObj 116      * @param inBuff 117      * @throws Exception  118      *  119      */ 120     private void msgRecv_0( 121         NextFilter nextFilter, IoSession sessionObj, IoBuffer inBuff) throws Exception { 122         if (nextFilter == null ||  123             sessionObj == null) { 124             // 如果参数对象为空,  125             // 则直接退出! 126             FrameworkLog.LOG.error("null nextFilter or sessionObj"); 127             return; 128         } 129  130         // 获取会话 UId 131         long sessionUId = sessionObj.getId(); 132         // 获取容器 Buff 133         IoBuffer containerBuff = getContainerBuff(sessionObj); 134  135         // 添加新 Buff 到容器 Buff 的末尾 136         IoBuffUtil.append(containerBuff, inBuff); 137         // 令 position = 0 138         containerBuff.position(0); 139  140 //        // 记录调试信息 141 //        FrameworkLog.LOG.debug("/nin = [ " + inBuff.getHexDump() + " ]"); 142  143         for (int i = 0; ; i++) { 144 //            // 记录调试信息 145 //            FrameworkLog.LOG.debug( 146 //                "i = " + i  147 //                + "/nco = [ " + containerBuff.getHexDump() + " ]" 148 //                + "/nco.pos = " + containerBuff.position()  149 //                + "/nco.lim = " + containerBuff.limit() 150 //            ); 151  152             if (containerBuff.remaining() < 4) { 153                 //  154                 // 如果剩余字节数 < 4,  155                 // 这样根本无法识别出消息类型 msgSerialUId ... 156                 // 直接退出! 157                 // 在退出前,  158                 // 准备好接收下一次消息! 159                 //  160                 IoBuffUtil.readyToNext(containerBuff); 161                 return; 162             } 163  164             // 获取原始位置 165             final int oldPos = containerBuff.position(); 166             // 获取消息长度和类型 167             final int msgSize = containerBuff.getShort(); 168             final int msgSerialUId = containerBuff.getShort(); 169  170 //            // 记录调试信息 171 //            FrameworkLog.LOG.debug( 172 //                "i = " + i  173 //                + "/nmsgSize = " + msgSize 174 //                + "/nmsgSerialUId = " + msgSerialUId 175 //            ); 176  177             // 还原原始位置 178             containerBuff.position(oldPos); 179  180             if (msgSerialUId == SpecialMsgSerialUId.CG_FLASH_POLICY ||  181                 msgSerialUId == SpecialMsgSerialUId.CG_QQ_TGW) { 182                 //  183                 // 如果是 Flash 安全策略消息,  184                 // 或者是腾讯网关消息,  185                 // 则尝试找一下 0 字节的位置 ... 186                 //  187                 int pos0 = IoBuffUtil.indexOf(containerBuff, (byte)0); 188  189                 if (pos0 <= -1) { 190                     // 如果找不到 0 字节的位置,  191                     // 则说明消息还没接收完,  192                     // 准备接受下次消息并直接退出! 193                     IoBuffUtil.readyToNext(containerBuff); 194                     return; 195                 } 196  197                 // 复制 Buff 内容 198                 containerBuff.position(0); 199                 IoBuffer realBuff = IoBuffUtil.copy(containerBuff, pos0); 200  201                 // 更新 Buff 位置 202                 final int newPos = containerBuff.position() + pos0; 203                 containerBuff.position(newPos); 204                 // 压缩容器 Buff 205                 IoBuffUtil.compact(containerBuff); 206  207                 // 向下传递 208                 super.messageReceived( 209                     nextFilter, sessionObj, realBuff 210                 ); 211                 continue; 212             } 213  214             if (msgSize <= 0) { 215                 //  216                 // 如果消息长度 <= 0,  217                 // 则直接退出! 218                 // 这种情况可能是消息已经乱套了 ... 219                 // 还是重新来过吧! 220                 //  221                 FrameworkLog.LOG.error("i = " + i + ", msgSize = " + msgSize + ", sessionUId = " + sessionUId); 222                 // 将容器 Buff 内容清空 223                 containerBuff.position(0); 224                 containerBuff.flip(); 225                 // 压缩容器 Buff 226                 IoBuffUtil.compact(containerBuff); 227                 return; 228             } 229  230             if (containerBuff.remaining() < msgSize) { 231                 //  232                 // 如果消息长度不够,  233                 // 则可能是出现网络粘包情况了 ... 234                 // 直接退出就可以了! 235                 //  236                 FrameworkLog.LOG.warn( 237                     "i = " + i 238                     + ", msgSize = " + msgSize  239                     + ", containerBuff.remaining = " + containerBuff.remaining() 240                     + ", sessionUId = " + sessionUId 241                 ); 242  243                 // 准备接受下一次消息 244                 IoBuffUtil.readyToNext(containerBuff); 245                 return; 246             } 247  248             // 创建新 Buff 并复制字节内容 249             IoBuffer realBuff = IoBuffUtil.copy(containerBuff, msgSize); 250  251             if (realBuff == null) { 252                 //  253                 // 如果真实的 Buff 为空,  254                 // 则直接退出! 255                 // 这种情况可能也是消息乱套了 ... 256                 // 记录一下错误信息 257                 //  258                 FrameworkLog.LOG.error("i = " + i + ", null realBuff, sessionUId = " + sessionUId); 259             } else { 260 //                // 记录调试信息 261 //                FrameworkLog.LOG.debug( 262 //                    "i = " + i 263 //                    + "/nreal = [ " + realBuff.getHexDump() + " ]" 264 //                    + "/nreal.pos = " + realBuff.position() 265 //                    + "/nreal.lim = " + realBuff.limit() 266 //                ); 267  268                 // 向下传递 269                 super.messageReceived( 270                     nextFilter, sessionObj, realBuff 271                 ); 272             } 273  274             // 更新位置 275             containerBuff.position(containerBuff.position() + msgSize); 276             // 压缩容器 Buff 277             IoBuffUtil.compact(containerBuff); 278         } 279     } 280      281     /** 282      * 获取玩家的 Buff, 如果为空则新建一个! 283      *  284      * @param sessionObj 285      * @return  286      *  287      */ 288     private static IoBuffer getContainerBuff(IoSession sessionObj) { 289         if (sessionObj == null) { 290             // 如果参数对象为空,  291             // 则直接退出! 292             return null; 293         } 294  295         // 获取会话 UId 296         long sessionUId = sessionObj.getId(); 297         // 获取容器 Buff 298         IoBuffer containerBuff = _containerBuffMap.get(sessionUId); 299  300         if (containerBuff == null) { 301             // 创建缓存 Buff 302             containerBuff = IoBuffer.allocate(DECODE_MSG_LEN); 303             containerBuff.setAutoExpand(true); 304             containerBuff.setAutoShrink(true); 305             containerBuff.position(0); 306             containerBuff.flip(); 307             // 缓存  Buff 对象 308             Object oldVal = _containerBuffMap.putIfAbsent(sessionUId, containerBuff); 309  310             if (oldVal != null) { 311                 FrameworkLog.LOG.warn("exists oldVal"); 312             } 313         } 314  315         return containerBuff; 316     } 317  318     /** 319      * 移除容器 Buff 320      *  321      * @param sessionObj 322      *  323      */ 324     private static void removeContainerBuff(IoSession sessionObj) { 325         if (sessionObj == null) { 326             // 如果参数对象为空,  327             // 则直接退出! 328             return; 329         } 330  331         // 获取会话 UId 332         long sessionUId = sessionObj.getId(); 333         // 获取容器 Buff 334         IoBuffer containerBuff = _containerBuffMap.get(sessionUId); 335  336         if (containerBuff != null) { 337             // 是否所占资源 338             containerBuff.clear(); 339         } 340  341         // 移除玩家的 Buff 对象 342         _containerBuffMap.remove(sessionUId); 343     } 344  345     /** 346      * 容器 Buff 为空 ? 347      *  348      * @param sessionObj 349      * @return  350      *  351      */ 352     private static boolean containerBuffIsEmpty(IoSession sessionObj) { 353         if (sessionObj == null) { 354             // 如果参数对象为空,  355             // 则直接退出! 356             return false; 357         } 358  359         // 获取容器 Buff 360         IoBuffer containerBuff = getContainerBuff(sessionObj); 361  362         if (containerBuff == null) { 363             // 如果容器为空,  364             // 则直接退出! 365             FrameworkLog.LOG.error("null containerBuff, sessionUId = " + sessionObj.getId()); 366             return false; 367         } else { 368             // 如果当前位置和极限值都为 0,  369             // 则判定为空! 370             return (containerBuff.position() == 0  371                  && containerBuff.limit() == 0); 372         } 373     } 374 }

---恢复内容结束---

正文到此结束
Loading...