前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty -NIO体验群聊系统

Netty -NIO体验群聊系统

作者头像
疯狂的KK
发布2020-02-19 11:28:45
5990
发布2020-02-19 11:28:45
举报

非Netty形式基于Nio的原理体验群聊系统,为了更好的理解Netty的通信,从NIO角度体验下Selector,SelectionKey,channel的关系。

idea打开当前类继承关系

Selector

真正执行时的类型----WindowsSelectorImpl

Selector.keys().size()

当每链接一个客户端,注册一个key,生成独立的socketChannel

selector.selectedKeys();注册到channel哪些发生了事件

key.interestOps(Selector.keys().ops)
serverSocketChannel
public abstract class ServerSocketChannel
    extends AbstractSelectableChannel
    implements NetworkChannel
{
}

ServerSocketChannel监听客户端链接

Socketchannel负责读写操作

群聊需求:

1.能监听服务端上线下线

2.发送和接收消息,并实现转发

3.展示IP地址+接收消息

注意事项:

1.转发消息时需排除自己

2.channel注册完毕需要移除防止重复操作

3.简单代码实现获取本机IP

通用代码获取本机地址

InetAddress localHost = InetAddress.getLocalHost();
String address= localHost.getHostAddress().toString();
String name= localHost.getHostName().toString();

接口请求时可能由于Nginx反向代理,获取不到真实IP,单纯request信息获取不到IP,可参考如下代码

  try
    {
        String ip = request.getHeader("x-forwarded-for");
        if (ip == null || ip.length() == 0
                || "unknown".equalsIgnoreCase(ip)) {
            ip = request.getHeader("Proxy-Client-IP");
        }
        if (ip == null || ip.length() == 0
                || "unknown".equalsIgnoreCase(ip)) {
            ip = request.getHeader("WL-Proxy-Client-IP");
        }
        if (ip == null || ip.length() == 0
                || "unknown".equalsIgnoreCase(ip)) {
            ip = request.getRemoteAddr();
            if (ip.equals("127.0.0.1") || ip.equals("0:0:0:0:0:0:0:1")) {
              // 根据网卡取本机配置的IP
                InetAddress inet = InetAddress.getLocalHost();
                ip = inet.getHostAddress();

               // 对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
                if (ip != null && ip.length() > 15) { // "***.***.***.***".length()
                  // = 15
                    if (ip.indexOf(",") > 0) {
                        ip = ip.substring(0, ip.indexOf(","));
                    }
                }
            }
        }
        if (ip != null && ip.length() > 15) { // "***.***.***.***".length()
            if (ip.indexOf(",") > 0) {
                ip = ip.substring(0, ip.indexOf(","));
            }
        }
        System.out.println("ip" + ip);
    } catch(
    Exception e)

    {
        e.printStackTrace();
    }

服务器端代码

/**
 * @author zhaokk
 * @create 2020-02-12-13:58
 */
public class GroupChatServer {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private static final int PORT = 6667;
    //初始化任务
    public GroupChatServer() {
        //得到选择器
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
            //设置非阻塞
            serverSocketChannel.configureBlocking(false);
            //注册到selector
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

监听事件,并处理消息,提示本机上线,并移除注册channel,无连接时提示

 //监听
    public void listen() {
        try {
            while (true) {
                int count = selector.select(2000);
                if (count > 0) {
                    //处理事件
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        //监听到Accept
                        if (key.isAcceptable()) {
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            //注册到selector
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            //提示上线
                            System.out.println(socketChannel.getRemoteAddress() + "上线");
                        }
                        //读事件
                        if (key.isReadable()) {
                            readData(key);

                        }
                        //防止重复处理
                        iterator.remove();
                    }

                } else {

                    System.out.println("等待连接...");
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

读取客户端消息离线后关闭通道

 //读取客户端消息
    private void readData(SelectionKey key) {
        SocketChannel channel = null;
        try {
            //取到关联channel
            channel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = channel.read(buffer);
            if (count > 0) {
                //转成字符串输出
                java.lang.String s = new java.lang.String(buffer.array());
                //输出
                InetAddress localHost = InetAddress.getLocalHost();
                String m = localHost.getHostAddress().toString();
                System.out.println("客户端"+"--------"+m+"消息" + s);
                //转发消息
                sendInfoToOther(s, channel);
            }
        } catch (IOException e) {
            try {
                System.out.println(channel.getRemoteAddress() + "离线了");
                //离线后 处理
                key.cancel();
                //关闭通道
                channel.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

转发消息,排除自己

//转发消息到通道    发送的消息   排除自己
    private void sendInfoToOther(java.lang.String msg, SocketChannel socketChannel) {

        //服务器转发消息
        System.out.println("服务器转发消息 ....." + msg);
        //遍历selector 注册的channel   排除自己
        selector.keys().forEach(key -> {
            Channel targetChannel = key.channel();
            //排除自己    是一个socketchannel   但不是自己
            if (targetChannel instanceof SocketChannel && targetChannel != socketChannel) {
                SocketChannel channel = (SocketChannel) targetChannel;
                ByteBuffer wrap = ByteBuffer.wrap(msg.getBytes());
                //将buffer数据写入channel
                try {
                    channel.write(wrap);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

启动代码

 public static void main(String[] args) {

        //启动
        GroupChatServer groupChatServer = new GroupChatServer();

        groupChatServer.listen();

    }

客户端代码

/**
 * @author zhaokk
 * @create 2020-02-12-14:37
 */
public class GroupClient {

    //链接服务器
    //发送消息
    //接收消息

    private final String HOST = "127.0.0.1";

    private final int port = 6667;

    private Selector selector;

    private SocketChannel socketChannel;

    private String username;

    public GroupClient() throws IOException {

        selector = Selector.open();
        //链接服务器
        socketChannel= socketChannel.open(new InetSocketAddress(HOST, port));
        socketChannel.configureBlocking(false);
        //注册到selector
        socketChannel.register(selector, SelectionKey.OP_READ);
        InetAddress localHost = InetAddress.getLocalHost();
        String s = localHost.getHostAddress().toString();
        username = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println(username +"--------"+s+ "准备...");

    }

发送消息,读取消息

//发送

    public void sendInfo(String info) {

        info = username + ":" + info;

        try {
            socketChannel.write(ByteBuffer.wrap(info.getBytes()));
            //回复
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    public void readInfo() {
        try {
            int select = selector.select(1000);
            if (select > 0) {
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isReadable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        channel.read(buffer);
                        //读到的数据转成字符串
                        String s = new String(buffer.array());
                        System.out.println(s.trim());
                    }
                }
                iterator.remove();
            } else {
                System.out.println("无可用通道...");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

启动

public static void main(String[] args) throws IOException {
        //启动
        GroupClient groupClient = new GroupClient();
        new Thread(() -> {
            while (true) {
                groupClient.readInfo();
                try {
                    Thread.currentThread().sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        //发送数据到服务器端
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String s = scanner.nextLine();
            groupClient.sendInfo(s);
        }
    }

当开启服务器端等待连接,开启客户端提示信息上线

客户端无连接提示无通道

输入消息kkisready

关闭客户端提示离线

Selectionkey.OP_ACCEPT

内置传输流程

通过当前的demo更好的理解Netty如何做到一对一一对多的聊天,通过控制channel中的selector实现,每次channel操作注册完毕需要移除

我向着我的目标前进,我遵循着我的路途,我越过踌躇者与落后者。我的前进将是他们的没落!

---尼采

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-02-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 赵KK日常技术记录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档