基于Mina的配置中心(四)

基于Mina的配置中心(四)

继续编写 Server
代码,接下来是 handler

消息处理器 MinaServerHandler

IoHandlerAdapter
中有以下方法。

看名字就可以看出,有处理异常、接收消息,发送消息、连接打开,连接关闭、进入空闲状态等方法。

基于Mina的配置中心(四)

我们可以根据自己的实际情况复写父类中的方法。在 MinaServerHandler
中,我们复写了以下方法:

基于Mina的配置中心(四)

最好复写 exceptionCaught
这个方法,不然出现异常,连接关闭的时候,你可能无从下手。

作为服务器,在 messageReceived
中处理客户端发出的请求。当客户端请求一次后,会把客户端连接信息保存下来,用来推送数据

可以调用 messageSent
这个方法,向客户端发送信息。

sessionIdle
这个方法中,处理连接超时的情况,比如在一定时间内没有发送心跳包,关闭客户端连接。

package com.lww.mina.handler;

import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.lww.mina.dao.MessageDao;
import com.lww.mina.domain.Message;
import com.lww.mina.protocol.MessagePack;
import com.lww.mina.session.SessionManager;
import com.lww.mina.util.Const;
import com.lww.mina.util.SpringBeanFactoryUtils;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

/**
 * 处理客户端发送的消息
 *
 * @author lww
 * @date 2020-07-06 22:53
 */
@Slf4j
public class MinaServerHandler extends IoHandlerAdapter {

    @Override
    public void sessionCreated(IoSession session) {
        InetSocketAddress isa = (InetSocketAddress) session.getRemoteAddress();
        String ip = isa.getAddress().getHostAddress();
        session.setAttribute("ip", ip);
        log.info("来自" + ip + " 的终端上线,sessionId:" + session.getId());
    }

    @Override
    public void sessionClosed(IoSession session) {
        log.info(session.getAttribute(Const.SESSION_KEY) + " nid: " + session.getId() + "sessionClosed ");
        // 移除 属性
        session.removeAttribute(Const.SESSION_KEY);
        // 移除超时属性
        session.removeAttribute(Const.TIME_OUT_KEY);
        String key = (String) session.getAttribute(Const.SESSION_KEY);
        if (key != null) {
            SessionManager.removeSession(key);
        }
        session.closeNow();
    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status) {
        if (session.getAttribute(Const.TIME_OUT_KEY) == null) {
            session.closeNow();
            log.error(session.getAttribute(Const.SESSION_KEY) + " nid: " + session.getId() + " time_out_key null");
            return;
        }
        try {
            int isTimeoutNum = (int) session.getAttribute(Const.TIME_OUT_KEY);
            isTimeoutNum++;
            // 没有超过最大次数,超时次数加1
            if (isTimeoutNum < Const.TIME_OUT_NUM) {
                session.setAttribute(Const.TIME_OUT_KEY, isTimeoutNum);
            } else {
                // 超过最大次数,关闭会话连接
                String key = (String) session.getAttribute(Const.SESSION_KEY);
                // 移除device属性
                session.removeAttribute(Const.SESSION_KEY);
                // 移除超时属性
                session.removeAttribute(Const.TIME_OUT_KEY);
                SessionManager.removeSession(key);
                session.closeOnFlush();
                log.info("client user: " + key + " more than " + Const.TIME_OUT_NUM + " times have no response, connection closed!");
            }
        } catch (Exception e) {
            log.error(session.getAttribute(Const.SESSION_KEY) + " nid: " + session.getId() + e.getMessage());
            session.closeNow();
        }
    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) {
        log.error("终端用户:{} 连接发生异常,即将关闭连接,原因:{}", session.getAttribute(Const.SESSION_KEY), cause);
    }

    @Override
    public void messageReceived(IoSession session, Object message) {
        SocketAddress remoteAddress = session.getRemoteAddress();
        log.info("server received MinaServerHandler_messageReceived_remoteAddress:{}", remoteAddress);
        MessagePack pack = (MessagePack) message;
        MessagePack response;
        String body = pack.getBody();
        if (StringUtils.isBlank(body)) {
            log.error("ServerHandler_messageReceived_body:{}", body);
            response = new MessagePack(Const.BASE, "body empty");
            session.write(response);
            session.close(false);
            return;
        }
        Message msg = JSONObject.parseObject(body, Message.class);
        if (msg == null) {
            log.error("ServerHandler_messageReceived_body:{}", body);
            response = new MessagePack(Const.BASE, "message empty");
            session.write(response);
            session.close(false);
            return;
        }
        if (Const.CONF.equalsIgnoreCase(msg.getPropertyValue()) && pack.getModule() == Const.BASE) {
            log.info("ServerHandler_messageReceived_Susccess:{}", msg.getPropertyValue());
            response = new MessagePack(pack.getModule(), body);
            session.write(response);
            return;
        }
        final String key = remoteAddress.toString();
        //存储的key
        session.setAttribute(Const.SESSION_KEY, key);
        // 超时次数设为0
        session.setAttribute(Const.TIME_OUT_KEY, 0);
        synchronized (this) {
            IoSession oldSession = SessionManager.getSession(key);
            if (oldSession != null && !oldSession.equals(session)) {
                // 移除key属性
                oldSession.removeAttribute(Const.SESSION_KEY);
                // 移除超时时间
                oldSession.removeAttribute(Const.TIME_OUT_KEY);
                // 替换oldSession
                SessionManager.replaceSession(key, session);
                session.closeOnFlush();
                log.info("oldSession close!");
            }
            if (oldSession == null) {
                SessionManager.addSession(key, session);
            }
            log.info("bind success: " + session.getRemoteAddress());
        }
        MessageDao minaMessageDao = SpringBeanFactoryUtils.getApplicationContext().getBean(MessageDao.class);
        log.info("ServerHandler_messageReceived_projectName:{}, propertityValue:{}, envValue:{}", msg.getProjectName(), msg.getPropertyValue(), msg.getEnvValue());
        Message configMessage = minaMessageDao.selectOne(new QueryWrapper<Message>().lambda()
                .eq(Message::getProjectName, msg.getProjectName())
                .eq(Message::getPropertyValue, msg.getPropertyValue())
                .eq(Message::getEnvValue, msg.getEnvValue()));
        if (configMessage == null && !msg.getPropertyValue().equalsIgnoreCase(Const.CONF)) {
            log.error(session.toString() + "select null");
            response = new MessagePack(Const.BASE, "select error");
            session.write(response);
            session.closeOnFlush();
        } else {
            // 设置session key
            if (configMessage != null) {
                configMessage.setRemoteAddress(key);
                // AR模式
                boolean updateSessionKey = configMessage.updateById();
                log.info("ServerHandler_messageReceived_updateSessionKey:{}", updateSessionKey);
            }
            log.info(session.toString() + " succeed!");
            response = new MessagePack(pack.getModule(), JSONObject.toJSONString(configMessage));
            session.write(response);
        }
    }

    @Override
    public void messageSent(IoSession session, Object message) {
        if (message instanceof Message) {
            Message minaMessage = (Message) message;
            session.write(new MessagePack(Const.CONFIG_MANAGE, JSONObject.toJSONString(minaMessage)));
        }
        session.setAttribute(Const.TIME_OUT_KEY, 0);
        log.info("发送消息成功");
    }

}
复制代码

Session管理 SessionManager

上面的代码里出现了这个类 SessionManager
,这是一个管理 Session
的工具类,我尝试过把 Session
转为Json存储,可是会报异常,无法转为Json,序列化也不可行。

基于Mina的配置中心(四)

但是看文档,又是可以持久化的。

org.apache.mina.core.service.AbstractIoService#initSession


org.apache.mina.core.session.IoSessionDataStructureFactory

看代码也是定义了一个Map: org.apache.mina.core.session.IoSessionAttributeMap

使用这个类管理Session。

基于Mina的配置中心(四)

不过因为我们是配置中心,所以可以使用Map来存储到内存中,因为客户端数量不会很多。我用了一个线程安全ConcurrentHashMap
来存储 Session
对象,key就是客户端的连接信息。

这个样子: /127.0.0.1:55618

package com.lww.mina.session;

import com.lww.mina.util.Const;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.core.session.IoSession;

/**
 * @author lww
 * @date 2020-07-06 23:21
 */
public class SessionManager {

    /**
     * 存放session的线程安全的map集合
     */
    private static ConcurrentHashMap<String, IoSession> sessions = new ConcurrentHashMap<>();

    /**
     * 线程安全的自增类,用于统计连接数
     */
    private static final AtomicInteger CONNECTIONS_COUNTER = new AtomicInteger(0);

    /**
     * 添加session
     */
    public static void addSession(String account, IoSession session) {
        sessions.put(account, session);
        CONNECTIONS_COUNTER.incrementAndGet();
    }

    /**
     * 获取session
     */
    public static IoSession getSession(String key) {
        return sessions.get(key);
    }

    /**
     * 替换session,通过key
     */
    public static void replaceSession(String key, IoSession session) {
        sessions.put(key, session);
    }

    /**
     * 移除session通过key
     */
    public static void removeSession(String key) {
        sessions.remove(key);
        CONNECTIONS_COUNTER.decrementAndGet();
    }

    /**
     * 移除session通过session
     */
    public static void removeSession(IoSession session) {
        String key = (String) session.getAttribute(Const.SESSION_KEY);
        removeSession(key);
    }

    public static ConcurrentHashMap<String, IoSession> getSessions() {
        return sessions;
    }
}
复制代码

客户端每次连接服务器,都会在 message
表中更新连接信息,当连接不断,IP和端口是不会改变的,服务器也可以拿着这个 Session
和客户端通信,而且客户端断开重连,客户端的端口每次都可能是不一样的,存在 Map
中也可以方便的管理。

配置更新 主动推送 MessageChangeListener

可能有些人还记得我之前写过的 SpringBoot事件发布与订阅
,在框架中,这个确实很常用,SpringBoot的源码中到处用到了事件的发布与订阅。

先说一下配置更新推送的原理,我是在更新的时候发布了一个事件

SpringBeanFactoryUtils.getApplicationContext().publishEvent(new MessageEvent(afterMessage));

事件类

package com.lww.mina.event;

import com.lww.mina.domain.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEvent;

/**
 * 消息更新事件
 *
 * @author lww
 * @date 2020-07-07 00:28
 */
@Slf4j
public class MessageEvent extends ApplicationEvent {

    private Message message;

    public MessageEvent(Message message) {
        super(message);
        log.info("发布消息 MessageEvent_MessageEvent_message:{}", message);
        this.message = message;
    }

    public Message getMessage() {
        return message;
    }
}
复制代码

然后用 MessageChangeListener
监听这个事件,从 message
中取出客户端连接信息,然后作为 key
map
中取到对应的 Session
,通过 Session
发送消息给客户端。

MessageChangeListener

package com.lww.mina.listener;

import com.lww.mina.domain.Message;
import com.lww.mina.event.MessageEvent;
import com.lww.mina.handler.MinaServerHandler;
import com.lww.mina.session.SessionManager;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.mina.core.session.IoSession;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

/**
 * @author lww
 * @date 2020-07-07 00:30
 */
@Slf4j
@Component
public class MessageChangeListener {

    @EventListener
    public void onApplicationEvent(MessageEvent event) {
        log.info("接收事件 MessageChangeListener_onApplicationEvent_event:{}", event);
        //推送配置
        Message message = event.getMessage();
        Assert.isTrue(StringUtils.isNotBlank(message.getRemoteAddress()), "初始配置无法发送配置信息,需要客户端连接一次后,获取客户端地址端口等信息!");
        try {
            IoSession session = SessionManager.getSession(message.getRemoteAddress());
            if (session != null) {
                MinaServerHandler handler = new MinaServerHandler();
                handler.messageSent(session, message);
            }
        } catch (Exception e) {
            log.info("MessageChangeListener_onApplicationEvent_e:{}", e);
        }
    }
}
复制代码

总结

到这里, Server
端已经完成的差不多了,至于 Controller
Service
里的业务代码,就不粘了,都是些普通的CRUD,有一些地方用到了 Mybatis-Plus
的AR模式,确实很好用。

不过 Controller
Service
的代码我都会提交到 GitHub
的,感兴趣的可以去 GitHub
看一下。

Server
端完成了,接下来就是 Client
了。先说一下,在 Client
里,如果是和 Server
重复的或者类似的,我会简单说一下或者一笔带过。毕竟 Client
里要讲的东西太多了。很多黑科技哦,敬请期待!

还有一点,我们要把项目打包,发布到 maven
仓库,在 client
中引入这个 mina-base
模块,当然可以申请发布到 maven
中央仓库,不过为了节省时间,我自己搭建了一个私人 maven
仓库,在 pom.xml
中配置仓库,就可以引入我的 mina-base
包了。当然也可以把项目打成 Jar
包,然后作为第三方包引入。

这个服务器是我买的最低配版的服务器,大家就只在这里用一下好了:joy:

<repositories>
    <repository>
        <id>public</id>
        <name>Team Maven Repository</name>
        <url>http://148.70.249.148:8081/nexus/content/groups/public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>
复制代码

为了方便调试,还要把代码一起提交到仓库。在 pom.xml
中添加下面的配置

<plugin>
    <artifactId>maven-source-plugin</artifactId>
    <version>2.1</version>
    <configuration>
        <attach>true</attach>
    </configuration>
    <executions>
        <execution>
            <phase>compile</phase>
            <goals>
                <goal>jar</goal>
            </goals>
        </execution>
    </executions>
</plugin>
复制代码

使用这个命令就可以把代码发布到仓库了。

mvn clean deploy -Dmaven.test.skip=true -Dmaven.javadoc.skip=true

基于Mina的配置中心(四)

Server端现在还有一些问题,会在第五章解决。

项目源码

欢迎大家关注我的公众号,共同学习,一起进步。加油

基于Mina的配置中心(四)

本文使用 mdnice
排版

原文 

https://juejin.im/post/5f04b05c5188252e98364408

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 基于Mina的配置中心(四)

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址