基于mina的消息推送

需要做一个基于socket客户端的消息推送功能,需求是这样的:

客户端输入用户名、密码然后通过socket提交到后台校验。校验成功后后台给指定客户端推送消息

刚开始想着挺简单的,很快出了一个基于socket的demo 进行消息推送,但是原生socket是线程阻塞的,有一定的弊端。后来来回切了三种方案分别是基于mina,netty,NIO非阻塞通讯框架以及websocket。

下面就我对这三个框架的应用做个简单的对比说明:

首先我想到的是mina框架,因为在学校那会了解过一点皮毛,所以第一反应是用它来实现这个功能。

(一)网上大多demo都是mina的服务端配套mina的客户端,而我们的客户端是用aardio实现的,前期对这个语言不是很了解而且网上资料少的可怜,所以很多也是一边摸索一边做。然后很顺利的实现了通讯。

(二)Mina本身也是一种基于TCP/IP、UDP/IP协议栈的通信框架,很多人说netty比mina简单,mina将内核和一些特性的联系过于紧密,使得用户在不需要这些特性的时候无法脱离,相比下性能会有所下降;netty解决了这个设计问题。但是我个人觉得这些对我们一般的使用并无多大影响,而且本身mina和netty都是出自一人之手。非要说区别的话我觉得netty比mina的文档,案例以及解决方案多点吧。

Mina的底层依赖的主要是Java NIO库,上层提供的是基于事件的异步接口。其整体的结构如下:

(1.) IoService:最底层的是IOService,负责具体的IO相关工作。这一层的典型代表有IOSocketAcceptor和IOSocketChannel,分别对应TCP协议下的服务端和客户端的IOService。IOService的意义

在于隐藏底层IO的细节,对上提供统一的基于事件的异步IO接口。每当有数据到达时,IOService会先调用底层IO接口读取数据,封装成IoBuffer,之后以事件的形式通知上层代码,从而将Java

NIO的同步IO接口转化成了异步IO。所以从图上看,进来的low-level IO经过IOService层后变成IO Event。具体的代码可以参考org.apache.mina.core.polling.AbstractPollingIoProcessor的私有内部

类Processor。

(2.) IoProcessor:这个接口在另一个线程上,负责检查是否有数据在通道上读写,也就是说它也拥有自己的Selector,这是与我们使用JAVA NIO 编码时的一个不同之处,通常在JAVA NIO 编码

中,我们都是使用一个Selector,也就是不区分IoService与IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在IoService 上的过滤器,并在过滤器链之后调用IoHandler。

(3.) IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的encode 与decode是最为重要的、也是你

在使用Mina 时最主要关注的地方。

(4.) IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。需要有开发者自己来实现这个接口。IoHandler可以看成是Mina处理流程的终点,每个IoService都需要指定一个

IoHandler。

(5.)IoSession:是对底层连接(服务器与客户端的特定连接,该连接由服务器地址、端口以及客户端地址、端口来决定)的封装,一个IoSession对应于一个底层的IO连接(在Mina中UDP也被抽象成

了连接)。通过IoSession,可以获取当前连接相关的上下文信息,以及向远程peer发送数据。发送数据其实也是个异步的过程。发送的操作首先会逆向穿过IoFilterChain,到达IoService。但

IoService上并不会直接调用底层IO接口来将数据发送出去,而是会将该次调用封装成一个WriteRequest,放入session的writeRequestQueue中,最后由IoProcessor线程统一调度flush出去。所以发

送操作并不会引起上层调用线程的阻塞。具体代码可以参考org.apache.mina.core.filterchain.DefaultIoFilterChain的内部类HeadFilter的filterWrite方法。

服务端流程:

1、通过SocketAcceptor 同客户端建立连接;

2、连接建立之后 I/O的读写交给了I/O Processor线程,I/O Processor是多线程的;

3、通过I/O Processor 读取的数据经过IoFilterChain里所有配置的IoFilter,IoFilter进行消息的过滤,格式的转换,在这个层面可以制定一些自定义的协议;

4、最后IoFilter将数据交给 Handler  进行业务处理,完成了整个读取的过程;

写入过程也是类似,只是刚好倒过来,通过IoSession.write 写出数据,然后Handler进行写入的业务处理,处理完成后交给IoFilterChain,进行消息过滤和协议的转换,最后通过 I/O Processor 将数据写出到 socket 通道。

1. 简单的TCPServer:

(1.) 第一步:编写IoService

按照上面的执行流程,我们首先需要编写IoService,IoService 本身既是服务端,又是客户端,我们这里编写服务端,所以使用IoAcceptor 实现,由于IoAcceptor 是与协议无关的,因为我们要编写TCPServer,所以我们使用IoAcceptor 的实现NioSocketAcceptor,实际上底层就是调用java.nio.channels.ServerSocketChannel 类。当然,如果你使用了Apache 的APR 库,那么你可以选择使AprSocketAcceptor 作为TCPServer 的实现,据传说Apache APR库的性能比JVM 自带的本地库高出很多。那么IoProcessor 是由指定的IoService 内部创建并调用的,我们并不需要关心。

IoAcceptor acceptor = new NioSocketAcceptor();
   acceptor.getSessionConfig().setReadBufferSize(2048);
   acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
        
 
//设置过滤器
//...
        
//设置handler
 
//绑定端口
acceptor.bind(new InetSocketAddress(9124));

这段代码我们初始化了服务端的TCP/IP 的基于NIO 的套接字,然后调用IoSessionConfig设置读取数据的缓冲区大小、读写通道均在10 秒内无任何操作就进入空闲状态。

(2.) 第二步:编写过滤器

这里我们处理最简单的字符串传输,Mina 已经为我们提供了TextLineCodecFactory 编解码器工厂来对字符串进行编解码处理。

 // 编写过滤器
acceptor.getFilterChain().addLast("codec",
    new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),
       LineDelimiter.WINDOWS.getValue(), 
       LineDelimiter.WINDOWS.getValue()))
      );

这段代码要在acceptor.bind()方法之前执行,因为绑定套接字之后就不能再做这些准备工作了。这里先不用清楚编解码器是如何工作的,这个是后面重点说明的内容,这里你只需要清楚,我们传输的以换行符为标识的数据,所以使用了Mina 自带的换行符编解码器工厂。

(3.) 第三步:编写IoHandler

这里我们只是简单的打印Client 传说过来的数据。

package com.dxz.minademo2;
 
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class TCPServerHandler extends IoHandlerAdapter {
    // 这里我们使用的SLF4J作为日志门面,至于为什么在后面说明。
  private final static Logger log = LoggerFactory.getLogger(TCPServerHandler.class);
 
 @Override
 public void messageReceived(IoSession session, Object message) throws Exception {
    String str = message.toString();
    System.out.println("The message received is [" + str + "]");
    if (str.endsWith("quit")) {
         session.close(true);
         return;
      }
    }
 
    @Override
    public void sessionCreated(IoSession session) throws Exception {
        System.out.println("server session created");
        super.sessionCreated(session);
    }
 
    @Override
    public void sessionOpened(IoSession session) throws Exception {
        System.out.println("server session Opened");
        super.sessionOpened(session);
    }
 
    @Override
    public void sessionClosed(IoSession session) throws Exception {
        System.out.println("server session Closed");
        super.sessionClosed(session);
    }
    
}

然后我们把这个IoHandler 注册到IoService:

//设置handler

acceptor.setHandler(new TCPServerHandler());

当然这段代码也要在acceptor.bind()方法之前执行。然后我们运行MyServer 中的main 方法,你可以看到控制台一直处于阻塞状态,等待客户端连接。

完成的代码:

package com.dxz.minademo2;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
 
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 
public class TCPServer {
public static void main(String[] args) throws IOException {
   IoAcceptor acceptor = new NioSocketAcceptor();
   acceptor.getSessionConfig().setReadBufferSize(2048);
   acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
        
 
   // 编写过滤器
   acceptor.getFilterChain().addLast("codec",
   new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),
        LineDelimiter.WINDOWS.getValue(), 
        LineDelimiter.WINDOWS.getValue()))
                );
        
     //设置handler
     acceptor.setHandler(new TCPServerHandler());
        
     //绑定端口
     acceptor.bind(new InetSocketAddress(9124));
    }
}

测试

此时,我们用telnet 127.0.0.1 9123 访问,然后输入一些内容,当按下回车键,你会发现数据在Server 端被输出,但要注意不要输入中文,因为Windows 的命令行窗口不会对传输的数据进行UTF-8 编码。当输入quit 结尾的字符串时,连接被断开。这里注意你如果使用的操作系统,或者使用的Telnet 软件的换行符是什么,如果不清楚,可以删掉第二步中的两个红色的参数,使用TextLineCodec 内部的自动识别机制。

pom.xml

<!-- MINA集成 -->
<dependency>
  <groupId>org.apache.mina</groupId>
  <artifactId>mina-core</artifactId>
  <version>2.0.7</version>
   </dependency>
   <dependency>
   <groupId>org.apache.mina</groupId>
   <artifactId>mina-integration-spring</artifactId>
   <version>1.1.7</version>
 </dependency>

Post Views:
6

原文 

http://www.xiwenblog.com/archives/2145

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 基于mina的消息推送

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址