NIO 服务端TCP连接管理的方案

最近做的一个项目需要在服务端对连接端进行管理,故将方案记录于此。

方案实现的结果与背景

   因为服务端与客户端实现的是长连接,所以需要对客户端的连接情况进行监控,防止无效连接占用资源。

   完成类似于心跳的接收以及处理

    即:

当连接过长事件(keep-alive Time)没有发送新的消息时,则在服务端切断其客户端的连接。

具体细节

在处理连接(Accpet事件)时:

      将SocketChannel存入HashSet;

         以SocketChannel的HashCode作为Key来存储连接时间(以服务器时间为准)

      (开辟一个HashMap或者利用Redis进行缓存)

在处理读取(Readable)事件时:

       以SocketChannel的HashCode作为Key来存储读取事件发生的时间(以服务器时间为准);

       处理读取事件


    开启一个定时反复运行的管理线程,每次运行对HashSet中的SocketChannel进行轮询,并以SocketChannel的HashCode去取对应的时间(LastSendTime)

    获取当前时间(CurrentTime),进行计算,如果大于Keep-Alive Time,则删除HashMap(/Redis)中的键值对,以及HashSet中的SocketChannel对象,并关闭SocketChannel。

     连接端

       ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.bind(new InetSocketAddress("127.0.0.2",1234));
            serverChannel.configureBlocking(false);
            AnalyisUtil util=new AnalyisUtil();
            RedisConnectionPool connectionPool=new RedisConnectionPool();
            Selector selector = Selector.open();
            SelectionKey key = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                int select = selector.select();
                if (select > 0) {
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectedKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
                        // 接收连接请求
                        if (selectionKey.isAcceptable()) {
                            ServerSocketChannel channel = (ServerSocketChannel) selectionKey
                                    .channel();
                            SocketChannel socketChannel = channel.accept();
                            logger.info("接收到一个新的连接请求"+ socketChannel.getRemoteAddress().toString());
                            socketChannel.configureBlocking(false);
                            //每接收请求,注册到同一个selector中处理
                            socketChannel.register(selector, SelectionKey.OP_READ);
                  //在Redis中存储连接的时间,以SocketChannel的HashCode作为Key
                            connectionPool.getJedis().set("LST_"+socketChannel.hashCode(),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                 //将SocketChannel放入HashSet中管理
                            connectedSokectList.add(socketChannel);
                        } else if (selectionKey.isReadable()) {
                            //执行读事件,在读事件的处理函数中,重新以SocketChannel的HashCode再次存储事件,以刷新时间
                            util.handleReadEvent(selectionKey,messageQueue,logger);
                        }
                    }
                }
            }

    连接处理线程

package ConnectionSystem;

import Util.RedisConnectionPool;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;

public class ConnectionManagerTask implements Runnable {
    private HashSet<SocketChannel> connectedSokectList;
    private long keepalive_Time=5000;
    private Logger logger=Logger.getLogger(ConnectionManagerTask.class);

    ConnectionManagerTask(HashSet<SocketChannel> list){
        logger.info("TCP监听已经启动... ...");
        this.connectedSokectList=list;
    }

    private long cucalateIsAlive(Date lastSendTime) throws ParseException {
        Date currentTime=new Date();
        return currentTime.getTime()-lastSendTime.getTime();
    }

    private boolean handleSocket(SocketChannel channel){
        int channel_code= channel.hashCode();
        RedisConnectionPool connectionPool=new RedisConnectionPool();
        Jedis jedisCilent;
        SocketAddress ipLocation;
        try{
            ipLocation=channel.getRemoteAddress();
            jedisCilent=connectionPool.getJedis();
            String SendTime=jedisCilent.get("LST_"+channel_code);
            if(SendTime!=null) {
                SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                Date lastSendTime = dfs.parse(SendTime);
                if (cucalateIsAlive(lastSendTime) > keepalive_Time) {
                    //超过时间
                    try {
                        if(channel.isConnected()){
                            channel.close();
                            jedisCilent.del("LST_"+channel_code);
                            logger.debug("连接被TCP管理线程关闭,ip:" + ipLocation + ",上次回应时间:" + lastSendTime);
                        }else {
                            logger.debug("当前通道,ip:" + ipLocation + "已经关闭... ..."+ ",上次回应时间:" + lastSendTime);
                        }
                        return true;
                    } catch (IOException e) {
                        logger.error("通道,ip:" + ipLocation + "关闭时发生了异常",e);
                    }
                }else {
                    return false;
                }
            }
            if(channel.isConnected()){
                channel.close();
                logger.debug("连接被TCP管理线程关闭,ip:" + ipLocation + ":未检测到登陆时间... ...");
            }else {
                logger.debug("当前通道,ip:" + ipLocation + "已经关闭... ...");
            }

        }catch (Exception e){
            logger.error("通道关闭时发生了异常",e);
        }
        return true;
    }

    @Override
    public void run() {
        logger.info("当前连接数"+connectedSokectList.size());
        if(connectedSokectList.isEmpty()){
            return;
        }
        Iterator<SocketChannel> iterator = connectedSokectList.iterator();
        while (iterator.hasNext()){
            SocketChannel socketChannel=iterator.next();
            Boolean removeFlag=handleSocket(socketChannel);
            if(removeFlag){
                iterator.remove();
            }
        }
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大数据架构

Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式

1875
来自专栏女程序员的日常

STM8S——Analog/digital converter (ADC)

1、ADC1 and ADC2 are 10-bit successive approximation Anolog to Digital Converters...

3141
来自专栏javathings

Java NIO 实现网络通信

Java NIO 的相关资料很多,对 channel,buffer,selector 如何相关概念也有详细的阐述。但是,不亲自写代码调试一遍,对这些概念的理解仍...

2642
来自专栏清晨我上码

第六节 netty前传-NIO Selector

可以使用单个线程来处理多个channel来节省资源。对于操作系统而言,线程之间切换是昂贵的,并且每个线程也占用操作系统中的一些资源(存储器)。 因此,使用的线程...

1212
来自专栏Linux驱动

Linux-使用patch命令给uboot打补丁(3)

patch:修改文件,让用户对原文件打补丁 用法   patch -p[剥离层级]  <[补丁文件] 打补丁示例: u-boot-1.1.6_jz2440.p...

2619
来自专栏蓝天

基于zookeeper的主备切换方法

继承CZookeeperHelper即可快速实现主备切换: https://github.com/eyjian/mooon/blob/master/mooo...

1212
来自专栏JavaEdge

Tomcat架构解析之3 Connector NIOAcceptorPollerWorkerNioSelectorPool

2804
来自专栏javathings

Java 的 NIO 是如何工作的?

在这个数据爆炸的时代,有大量的数据在系统中流动,一个应用系统的瓶颈往往都是 IO 瓶颈。传统的 javaIO 模型是 BIO,也就是同步阻塞 IO,数据在写入 ...

1251
来自专栏Janti

Java中的NIO基础知识

上一篇介绍了五种NIO模型,本篇将介绍Java中的NIO类库,为学习netty做好铺垫

1153
来自专栏chenssy

【死磕Netty】-----NIO基础详解

原文出处http://cmsblogs.com/ 『chenssy』 转载请注明原创出处,谢谢! Netty 是基于Java NIO 封装的网络通讯框架,只有充...

4486

扫码关注云+社区