前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >netty学习

netty学习

作者头像
用户8447427
发布2022-08-18 15:45:34
3260
发布2022-08-18 15:45:34
举报
文章被收录于专栏:userlyz学习记录

netty学习

过程

执行流程示意图
执行流程示意图

server启动 parentGroup 可以简单的理解一个为一个线程池 NioEventLoop可以理解为一个线程,它本身不是一个线程,但是会绑定一个线程 NioEventLoop对指定的port进行连接监听

client启动 执行步骤二 eventLoopGroup和那个parentGroup是一样的

Pipeline是一个双向链表,包含很多的处理器

parentGroup childGroup parentGroup相当于迎宾员,childgroup相当于服务员。parentGroup只是管客户端链接的,childGroup后续所有的服务

核心概念

Channael

管道,其实对Socket的封装

EventLoopGroup

是一个eventloop池,包含很多eventloop。EventLoop本身只是一个线程驱动,在生命周期之内只绑定一个线程

netty为每一个Channnel分配一个EventLoop,用于处理用户连接,对用户请求处理等所有事件Channel和EventLoop的关系是n:1,而EventLoop和线程的关系是1:1,一个EventLoop可以和很多的Channel绑定

serverBootstrap

服务端使用的是ServerBootstrap;客户端是Bootstrap。相当于粘合剂,将各个组件关联起来

ChannelHeader和ChannelPipeline

ChannelHeader是对Channel中数据的处理器,可以是系统本身定义好的编码器也可以是用户定义的。这些处理器会被统一添加到一个ChannelPipeline的对象中,然后按照顺序对Channel中的数据进行一次处理

ChannelFuture

Netty中的所有I/O操作都是异步的Netty中定义了ChannelFuture对象作为异步操作的代言人,表示异步操作本身。如果想获取该异步操作的的返回值,可以通过该异步对象中的addListener()方法为该异步操作添加监听器,为其注册回调,当结果出来之后马上调用执行

回调:当结果出来之后立马启用

代码

依赖

代码语言:javascript
复制
<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.63.Final</version>
    </dependency>
</dependencies>

IDEA的maven项目的netty包的导入(其他jar同) - CccccDi - 博客园 (cnblogs.com)

IDEA引入Netty包 - 亲爸爸 - 博客园 (cnblogs.com)

demo

服务器端:

代码语言:javascript
复制
package com.lyz.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SomeServer {
    public static void main(String[] args){
        // 创建一个group,用于处理客户端连接请求
        NioEventLoopGroup parentGroup=new NioEventLoopGroup();
        // 创建一个group,用于处理客户端连接上sever之后的后续请求
        NioEventLoopGroup childGroup=new NioEventLoopGroup();
        try {
            //bootstrap用于初始化channel
            ServerBootstrap bootstrap=new  ServerBootstrap();
            //指定两个要使用的group
            bootstrap.group(parentGroup,childGroup)
                    //指定创建的channel的类型
                    .channel(NioServerSocketChannel.class)
                    //指定要使用的处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        //初始化channel方法
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //channel一旦创建完毕就会同时绑定一个pipeline
                            ChannelPipeline pipeline=ch.pipeline();
                            //添加编码器
                            pipeline.addLast(new StringEncoder());
                            //添加解码器
                            pipeline.addLast(new StringDecoder());
                            //添加自定义的处理器
                            pipeline.addLast(new SomeServerHeadler());
                        }
                    });
            //创建channel,绑定指定的主机(hostName,port)
            //sync() 将异步变成同步的
            ChannelFuture future =null;
            future=bootstrap.bind(8888).sync();//这个方法不执行完毕不往下执行,这就是为啥给他从异步变成同步
            System.out.println("服务器8888已经启动");
            //当channel被关闭之后,会触发closeFuture()的执行,去完成一些首位工作
            future.channel().closeFuture().sync();
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            //将两个group进行优雅关闭
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
代码语言:javascript
复制
package com.lyz.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

//自定义服务端处理器,用于处理来自客户端的数据
public class SomeServerHeadler extends ChannelInboundHandlerAdapter {
    //一种回调方法:当client将数据写入到channel并发送到server后,server就会触发该方法的执行
    /**
     * @param ctx 表示当前处理器(其实他就是当前处理器封装的一个节点)
     * @param msg client发来的数据
     * @throws Exception
     * **/
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
        //输出client的地址与发送来的数据
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        //向客户端发送一个随机的uuid
        //UUID.randomUUID()
        ctx.channel().writeAndFlush("from server"+msg);
        TimeUnit.MICROSECONDS.sleep(500);
    }
    //一旦在服务器端发生异常,就会触发该方法的运行
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        //关闭channel
        ctx.close();
    }
}

客户端

代码语言:javascript
复制
package com.lyz.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SomeClient {
    public static void main(String[] args) {
        NioEventLoopGroup group =new  NioEventLoopGroup();
        try{
            //对比server端使用的是ServerBootstrap
            Bootstrap bootstrap=new Bootstrap();
            bootstrap.group(group)
                    //指定要创建的channel的类型
                    //server指定的是NioServerSocketChannel
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline=ch.pipeline();
                            pipeline.addLast(new StringEncoder());//pipeline.addLast("可以自定义处理器名,不添加默认为类名",new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new SomeClientHeadler());
                        }
                    });
            ChannelFuture future=bootstrap.connect("localhost",8888).sync();
            future.channel().closeFuture().sync();
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }
}
代码语言:javascript
复制
package com.lyz.client;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.time.LocalDateTime;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;


//自定义客户端处理器,处理来自于server的数据
public class SomeClientHeadler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        Scanner in =new Scanner(System.in);
        System.out.println("请输入您要发送的信息");
        String data=in.nextLine();
        ctx.channel().writeAndFlush("from client:"+ data);
        TimeUnit.MILLISECONDS.sleep(500);
    }

    //当channel被激活的时候会触发该方法的执行,该方法指挥执行一次
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        ctx.channel().writeAndFlush("send the first data");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

课程设计结果

服务器端

代码语言:javascript
复制
package com.lyz.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.HashMap;
import java.util.Map;

public class SomeServer {
    public static Map<String, ChannelHandlerContext> user_list=new HashMap<>();

    public static void main(String[] args){
        // 创建一个group,用于处理客户端连接请求
        NioEventLoopGroup parentGroup=new NioEventLoopGroup();
        // 创建一个group,用于处理客户端连接上sever之后的后续请求
        NioEventLoopGroup childGroup=new NioEventLoopGroup();
        try {
            //bootstrap用于初始化channel
            ServerBootstrap bootstrap=new  ServerBootstrap();
            //指定两个要使用的group
            bootstrap.group(parentGroup,childGroup)
                    //指定创建的channel的类型
                    .channel(NioServerSocketChannel.class)
                    //指定要使用的处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        //初始化channel方法
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //channel一旦创建完毕就会同时绑定一个pipeline
                            ChannelPipeline pipeline=ch.pipeline();
                            //添加编码器
                            pipeline.addLast(new StringEncoder());
                            //添加解码器
                            pipeline.addLast(new StringDecoder());
                            //添加自定义的处理器
                            pipeline.addLast(new SomeServerHeadler());
                        }
                    });
            //创建channel,绑定指定的主机(hostName,port)
            //sync() 将异步变成同步的
            ChannelFuture future =null;
            future=bootstrap.bind(8888).sync();//这个方法不执行完毕不往下执行,这就是为啥给他从异步变成同步
            System.out.println("服务器8888已经启动");
            //当channel被关闭之后,会触发closeFuture()的执行,去完成一些首位工作
            future.channel().closeFuture().sync();
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            //将两个group进行优雅关闭
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
代码语言:javascript
复制
package com.lyz.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

//自定义服务端处理器,用于处理来自客户端的数据
public class SomeServerHeadler extends ChannelInboundHandlerAdapter {
    //一种回调方法:当client将数据写入到channel并发送到server后,server就会触发该方法的执行
    /**
     * @param ctx 表示当前处理器(其实他就是当前处理器封装的一个节点)
     * @param msg client发来的数据
     * @throws Exception
     * **/
    public static Map<String, ChannelHandlerContext> user_list=new HashMap<>();
    public void sendto(ChannelHandlerContext ctx,Object msg)throws Exception{
        ctx.channel().writeAndFlush("from server"+msg);
        TimeUnit.MICROSECONDS.sleep(500);
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
        //输出client的地址与发送来的数据
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        msg=msg.toString().replaceAll("[\\r\\n]","");
        //登录
        String username="";
        if (msg.toString().startsWith("LOGIN:")){
            username=msg.toString().split(":")[1];
            if (user_list.containsKey(username)){
                System.out.println(username+"登录失败");
                String data="失败,请重新登录";
                sendto(user_list.get(username),data);
            }
            else{
                System.out.println(username+"登录成功");
                user_list.put(username,ctx);
                String data=username+"登录成功";
                sendto(user_list.get(username),data);
            }
        }
        else if (msg.toString().startsWith("SENDTO:")){//SENDTO:1;MSG:HELLO,I AM 1
            String sendto_name=msg.toString().split(";")[0].split(":")[1];
            String data=msg.toString().split(";")[1].split(":")[1];
            System.out.println(sendto_name+"------"+data);
            sendto(user_list.get(sendto_name),data);
        }
        else if (msg.toString().startsWith("SENDALL:")){
            Set<String> keys=user_list.keySet();
            String data=msg.toString().split(":")[1];
            for(String key:keys){
                sendto(user_list.get(key),data);
            }
        }
        else if(msg.toString().startsWith("QUERY")){
            String data=user_list.toString();
            sendto(ctx,data);
        }
        else if (msg.toString().startsWith("EXIT")){
            user_list.remove(username);
            String data="退出成功";
            sendto(ctx,data);
            ctx.close();
        }
        else{
            String data="失败,请重新输入";
            sendto(ctx,data);
        }
    }
    //一旦在服务器端发生异常,就会触发该方法的运行
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        //关闭channel
        ctx.close();
    }


}

客户端

代码语言:javascript
复制
package com.lyz.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SomeClient {
    public static void main(String[] args) {
        NioEventLoopGroup group =new  NioEventLoopGroup();
        try{
            //对比server端使用的是ServerBootstrap
            Bootstrap bootstrap=new Bootstrap();
            bootstrap.group(group)
                    //指定要创建的channel的类型
                    //server指定的是NioServerSocketChannel
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline=ch.pipeline();
                            pipeline.addLast(new StringEncoder());//pipeline.addLast("可以自定义处理器名,不添加默认为类名",new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new SomeClientHeadler());
                        }
                    });
            ChannelFuture future=bootstrap.connect("localhost",8888).sync();
            future.channel().closeFuture().sync();
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }
}
代码语言:javascript
复制
package com.lyz.client;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.time.LocalDateTime;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;


//自定义客户端处理器,处理来自于server的数据
public class SomeClientHeadler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        sendto(ctx,msg);
    }

    //当channel被激活的时候会触发该方法的执行,该方法指挥执行一次
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        Scanner in =new Scanner(System.in);
        System.out.println("请输入您的LOGIN:");
        String Login=in.nextLine();
        ctx.channel().writeAndFlush("LOGIN:"+Login);
    }
    
    public void sendto(ChannelHandlerContext ctx,Object msg)throws Exception{
        if(msg.toString().equals("登录失败,请重新登录"))
        {
            System.out.println("请输入您的LOGIN:");
            Scanner in =new Scanner(System.in);
            String Login=in.nextLine();
            ctx.channel().writeAndFlush("LOGIN:"+Login);
        }
        else if (msg.toString().equals("退出成功")){
            ctx.channel().closeFuture().sync();
        }
        else {
            System.out.println("请输入您要发送的信息");
            Scanner in =new Scanner(System.in);
            String data = in.nextLine();
            ctx.channel().writeAndFlush(data);
            TimeUnit.MILLISECONDS.sleep(500);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-12-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • netty学习
    • 过程
      • 核心概念
        • Channael
        • EventLoopGroup
        • serverBootstrap
        • ChannelHeader和ChannelPipeline
        • ChannelFuture
      • 代码
        • 依赖
        • 服务器端:
        • 客户端
    • demo
    • 课程设计结果
      • 服务器端
        • 客户端
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档