前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布

netty

作者头像
Dean0731
发布2020-10-10 17:15:16
4680
发布2020-10-10 17:15:16
举报
文章被收录于专栏:blog-技术博客blog-技术博客

BIO

同步阻塞bio:链接数目较少

image-20200922101237506
image-20200922101237506
代码语言:javascript
复制
public static void main(String args[]) throws IOException {
    ExecutorService pool = ThreadPool.getCachedThreadPool();
    ServerSocket socket = new ServerSocket(6666);
    System.out.println("服务器启动....");
    while(true){
        final Socket client = socket.accept();
        System.out.println("this is a client");
        pool.execute(new Runnable() {
            @Override
            public void run() {
                handler(client);
            }
        });
    }
}
public static void handler(Socket socket){
    System.out.print(String.format("线程id: %s 线程name: %s",Thread.currentThread().getId(),Thread.currentThread().getName()));
    byte[] bytes = new byte[1024];
    try (InputStream inputStream = socket.getInputStream()) {
        while(true){
            int read = inputStream.read(bytes);
            if(read!=-1){
                System.out.print(new String(bytes,0,read));
            }else {
                break;
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    }finally {
        try {
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

NIO

  • 同步非阻塞nio:链接数目较多且链接比较短,聊天,弹幕,服务器间通讯
  • 三大组件:
    • channel,
      • 每个channel对应一个buffer
      • 双向,可以返回os底层的情况
      • file channel
      • dataprogram channel
      • file channel
    • buffer,
      • 内存块,底层是数组
      • 数据读取的位置,与bio不同,不使用流,但也可以双向读取(filp方法:反转)
      • 重要属性
        • Capacity:容量大小,不可改变
        • Limit:极限位置标记,可变,默认就是Capacity
        • mark:标记 默认-1
        • Position:下一个要被读或写的位置索引

      buffer的读写都依赖这四个属性,

    • Selector
      • 对应一个线程
      • 对应多个channel

代码语言:javascript
复制
String str = "hello world";
try (FileOutputStream stream = new FileOutputStream("d:/desktop/1.txt")) {
    FileChannel channel = stream.getChannel();
    ByteBuffer buffer1 = ByteBuffer.allocate(1024);
    buffer1.put(str.getBytes());
    buffer1.flip();
    channel.write(buffer1);
    stream.close();
} catch (IOException e) {
    e.printStackTrace();
}
image-20200922120031552
image-20200922120031552

文件复制:同一个buffer

image-20200922120221883
image-20200922120221883

文件复制:使用通道的transform(),或transto()方法,更方便

文件直接修改,文件内容不再进入jvm,buffer直接映射到文件

image-20200922122102802
image-20200922122102802

通信示例

代码语言:javascript
复制
public static void server() throws Exception{
    // 创建ServerSocketChannel  ---》ServerSocket
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    InetSocketAddress socketAddress = new InetSocketAddress(7000);
    serverSocketChannel.socket().bind(socketAddress);
    serverSocketChannel.configureBlocking(false);
    // 创建Selector
    Selector selector = Selector.open();

    // serverSocketChannel注册到seletcor,事件为OP——ACCEPT
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    while(true){
        // 没有事件
        if(selector.select(1000) == 0){ // selector 等待1秒钟
            System.out.println("waiting for 1 seconds");
            continue;
        }
        // 有事件
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
        while(keyIterator.hasNext()){
            // 获取key
            SelectionKey key = keyIterator.next();
            // 获取对应通道
            if(key.isAcceptable()){
                // 生成socketChannel
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                // socketChannel注册到selector,关注事件为OP_read
                socketChannel.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024));
            }
            if (key.isReadable()){
                // 反向获取到channel
                SocketChannel channel = (SocketChannel) key.channel();
                // 会获取到buffer
                ByteBuffer buffer = (ByteBuffer) key.attachment();
                channel.read(buffer);
                System.out.println(String.format("当前线程:%s-%s,form客户端:%s",Thread.currentThread().getId(),Thread.currentThread().getName(),new String(buffer.array())));
            }
            // 删除selectionkey,防止重复操作
            keyIterator.remove();
        }
    }
}
public static void client() throws Exception{
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);
    // 未连接上或正在连接
    if(!socketChannel.connect(new InetSocketAddress("127.0.0.1",7000))){
        while(!socketChannel.finishConnect()){
            System.out.println("正在连接,但客户端没有阻塞");
        }
    }
    // 已连接
    String str = "hello world";
    ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
    // 发送数据
    socketChannel.write(buffer);
}
image-20200922140108426
image-20200922140108426
image-20200922101211665
image-20200922101211665

零拷贝:没有cpu copy或copy 的信息很少

示例:服务器读取文件,发送给客户

  • 传统:用户---》内核---》用户---》内核,三次状态切换,四次copy
    • 硬件--》内核:dma copy
      • 用户态--》内核态
    • 内核---》用户:cpu copy
      • 内核态--》用户态
    • 用户---》socket: cpu copy
      • 用户态---》内核态
    • socket---》硬件协议:dma copy
  • mmap:内存映射,将用户内存映射到内核
    • 还是3次切换,但只有3次copy
    • 硬件--》内核:dma copy
      • 用户态--》内核态
      • 此时内核数据用户共享,但此时程序获取数据会进行状态切换,所以还是3次状态切换
    • 内核---》socket: cpu copy
      • 用户态---》内核态
    • socket---》硬件协议:dma copy

sendFile:2次切换,3次copy linux2.1

  • 硬件--》内核:dma copy
    • 用户态--》内核态
  • 内核---》socket:cup copy
    • 内核态---》用户态
  • socket---》硬件协议:dma copy

sendFile:2次切换,2次copy linux2.4

  • 硬件--》内核:dma copy
    • 用户态--》内核态
    • 内核态---》用户态
  • 内核(socket)---》硬件协议:dma copy

AIO

异步非阻塞aio:链接数目多,并且链接时间长

image-20200922153711739
image-20200922153711739

Netty

  • 用于基于nio的数据传输(大数据,小数据都可)框架
  • 自己用nio写,还是比较偏底层的,比较麻烦
  • 简化nio的开发流程
  • tcp/upd(传输协议)----》nio(基于传输协议的api),netty------》用用协议(http,websocket,ssl等等)
传统网络I/O服务模型
image-20200922155132004
image-20200922155132004
REACTOR模型
  • 基于I/O多路复用
  • 基于线程池线程复用
image-20200922160119610
image-20200922160119610

单Rector单线程

image-20200922160638095
image-20200922160638095

单Rector多线程

image-20200922160845862
image-20200922160845862
主从Reactor
image-20200922162138699
image-20200922162138699
Netty
  • BossGroup,负责接收客户端连接
  • WorkGroup,负责读写
  • NioEventLoopGroup,事件循环组
    • NioEventLoop,不断循环的处理任务线程,有一个selector
  • boss nioeventloop
    • 轮训accept事件
    • 处理accept事件,与client建立连接,
    • 生成NioSocketChannel,并注册到某个work的NIOEventLoop的selector
    • 处理任务队列的任务
  • work nioeventloop
    • 轮训read,write事件
    • 处理i/o事件,即read,write,在NiosocketChannel处理
    • 处理队列任务
  • work nioeventloop处理数据会使用pipline
    • pipline包含channel
代码语言:javascript
复制
// netty 聊天
public class Server{

    public static void main(String[] args){
        // 连个线程组,
        // 处理链接,默认线程数=cup核数*2
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 处理业务
        EventLoopGroup workGroup = new NioEventLoopGroup();
        // 服务端启动对象
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {
            bootstrap.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)  // 通道类型
                .option(ChannelOption.SO_BACKLOG, 128) // 线程队列链接数
                .childOption(ChannelOption.SO_KEEPALIVE, true) //保持活动链接状态
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new NettyServerHandler());
                    }
                }); // 给workgroup的eventloop设置处理器
            // 绑定端口,并启动
            ChannelFuture cf = bootstrap.bind("127.0.0.1",6668).sync();
            System.out.println("服务端启动....");
            // 关闭,当有关闭事件时关闭
            cf.channel().closeFuture().sync();
        }catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

}
class NettyServerHandler extends ChannelInboundHandlerAdapter {
    // 读取客户端消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress());
        System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
        
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
代码语言:javascript
复制
public class Client {
    public static void main(String[] args) {
        // 事件循环组
        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        // 客户端对象
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(eventExecutors) // 设置线程组
                .channel(NioSocketChannel.class) // 设置通道实现类
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new NettyClientHandler()); // 加入处理器
                    }
                });
            // 启动
            ChannelFuture cf = bootstrap.connect("127.0.0.1", 6668).sync();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }
}
class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("ctx"+ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("客户端发来的信息",CharsetUtil.UTF_8));
    }

    // 读取客户端消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress());
        System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

Netty Task

  • 用户自定义普通任务
  • 用户自定义定时任务
  • 非Reactor调用channel:服务器推送信息到客户端
代码语言:javascript
复制
// 用户自定义的普通任务
 @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress());
        System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
        // 若此时是耗时任务,客户端都要等待服务器执行完毕,
        // 通过下面方式可以异步执行,就是将任务交给了netty的task
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(ctx.channel().remoteAddress());
                System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
            }
        });
        // 任务放在scheduleTaskQueue
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println(ctx.channel().remoteAddress());
                System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
            }
        },5, TimeUnit.SECONDS);
    }

Netty实现Http协议

代码语言:javascript
复制
public class Main{

    public static void main(String[] args){
        // 连个线程组,
        // 处理链,默认线程数=cup核数*2
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 处理业务
        EventLoopGroup workGroup = new NioEventLoopGroup();
        // 服务端启动对象
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {
            bootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)  // 通道类型
                    .option(ChannelOption.SO_BACKLOG, 128) // 线程队列链接数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //保持活动链接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {// 给workgroup添加handler
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {// 每个请求pipline与handler是不共享的,

                            // 得到管道
                            ChannelPipeline pipeline = ch.pipeline();
                            // http 解码器
                            pipeline.addLast(new HttpServerCodec());
                            // 响应
                            pipeline.addLast(new NettyServerHandler());

                        }
                    }); // 给workgroup的eventloop设置处理器
            // 绑定端口,并启动
            ChannelFuture cf = bootstrap.bind("127.0.0.1",8080).sync();
            System.out.println("服务端启动....");
            // 关闭,当有关闭事件时关闭
            cf.channel().closeFuture().sync();
        }catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
// SimpleChannelInboundHandler<HttpObject>是ChannelInboundHandlerAdapter的子类
// 通信数据被封装httpobject

class NettyServerHandler extends SimpleChannelInboundHandler<HttpObject> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        System.out.println(ctx.channel().pipeline().hashCode()+" "+this.hashCode());
        HttpRequest request = (HttpRequest) msg;
        URI uri = new URI(request.uri());
        if("/favicon.ico".equals(uri.getPath())){
            System.out.println("不作响应");
            return;
        }
        ByteBuf content = Unpooled.copiedBuffer("我是服务器,", CharsetUtil.UTF_16);
        DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK,content);
        defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
        defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());
        ctx.writeAndFlush(defaultFullHttpResponse);
    }
}
image-20200922202021791
image-20200922202021791

Unpooled类

代码语言:javascript
复制
// 该对象包含数组,读取的时候不同flip进行翻转,底层维护了readerIndex,writerIndex
ByteBuf a = Unpooled.buffer(100);
ByteBuf b = Unpooled.copiedBuffer("helloworld",CharsetUtil.UTF_8);

Netty心跳检测

代码语言:javascript
复制
// 心跳检测处理器,触发器
// 3s没读取,发送检测包
// 5s没有写,发送检测包
// 7s没有读,也没有写,发送检测包
// 事件传递到下一个handler处理(自定义)
pipline.addLast(new IdleStateHandler(3,5,7TimeUnit.SECONDS))
pipline.addLast(new ChannelInboundHandlerAdapter(){
    public void userEventTriggered(ChannelHandlerContext ctx,Object evt){
        if(evt instance of IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            swicth(event.state()){
                case READER_IDLE:
                XXX
                case WRITER_IDLE:
                XXX
                case ALL_IDLE:
                XXX
            }
        }
    }
})

Websocket

image-20201002101425900
image-20201002101425900
代码语言:javascript
复制
class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        ctx.channel().writeAndFlush(new TextWebSocketFrame("xxxx"));
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        super.handlerAdded(ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        super.handlerRemoved(ctx);
    }
}
代码语言:javascript
复制
var socket;
if(windows.WebSocket){
    socket = new WebSocket("ws://localhost:7000/hello");
    socket.onmessage = function(ev){
        收到消息
    }
    socket.onopen = function(ev){
        连接开启
    }
    socket.onclose = function(ev){
        连接关闭
    }
}

function send(message){
    if(socket.readyState==WebSocket.OPEN){
		socket.send(message);
    }
}

编码解码

Netty编码解码

  • StringEncoder,StringDecoder
  • ObjectEncoder,ObjectDecoder

序列化的缺点

  • 无法跨语言
  • 序列化体积大
  • 序列化性能低
google的Protobuf
  • rpc数据交换
    • http+json(以前)
    • tcp+protobuf (现在)
  • 跨语言,支持很多语言
  • 高性能
代码语言:javascript
复制
syntax = "protobuf4";// 版本号,
option optimize_for = SPEED; // 加快解析
option java_package = "top.deanxxx"; // 指定包名
option java_outer_classname = "MyData";//文件名,外部类名
message MyMessage{
	enum DataType{
		StudentType = 0;
		UserType = 1;
	}
	DataType data_Type = 1; // MyMessage 的第一个属性
	oneof dataBody{
		Stduent s = 2; // MyMessage 的另一个属性
		User u = 3;// MyMessage 的另一个属性
	}
}
message User{
	int32 id = 1; // id属性 ,1 表示属性的序号
	string name = 2;
}
message Student{
	int32 id = 1; // id属性 ,1 表示属性的序号
	string name = 2;
}
// protoc.exe --java_out=. xxx.proto
  • 编写proto文件
  • 变为对对应语言
  • 服务端对应语言中的proto编码器(handler),发送
  • 客户端接收,对应解码器(handler)

数据入栈出栈

image-20201002134742983
image-20201002134742983

TCP粘包与拆包

粘包:间隔时间短的包合并为一个

拆包:数据过大拆分

自定义协议+编解码器来解决

关键是解决服务器每次读取数据长度问题。

实现RPC

image-20201002144927179
image-20201002144927179
代码语言:javascript
复制
// 公共接口
public interface{
    String hello(String msg);
}
代码语言:javascript
复制
// provider
public class HelloService implement HelloService{
    String hello(String msg){
        
    }
}
public class ServerBootstrap{
    public static void main(String args){
        
    }
}
public class NettyServer{
    // netty初始化
    private static void startserver0(String hostname,int port){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {
            bootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)  // 通道类型
                    .option(ChannelOption.SO_BACKLOG, 128) // 线程队列链接数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //保持活动链接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new ServerHandler());
                        }
                    }); 
            ChannelFuture cf = bootstrap.bind(hostname,port).sync();
            cf.channel().closeFuture().sync();
        }catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
public class ServerHandler{
    channelRead:
    	xxxxx
}
代码语言:javascript
复制
public class NettyClient{
    private static ExecutorService executor = Executors.newFixedThreadPool(4);
    private static NettyClient client;
    // 编写方法,代理模式
    public  Object getBean(final Class<?>serviceclass,final String providerName){
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serviceClass},(proxy,method,args)->{
            // 每调用一次hello,
            if(cliet ==null){
                initial();
            }
            client.setPara(prividerName+args[0]);
            return executor.submit(client).get()
        })
    }
    
    private static void initial(){
        client = new NettyClient();
         EventLoopGroup eventExecutors = new NioEventLoopGroup();
        // 客户端对象
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    }); 
            // 启动
            ChannelFuture cf = bootstrap.connect("127.0.0.1", 8080).sync();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }
    }
}
public class ClientHandler extends ChannelInboundHandlerAdapter implements Callable{
    private ChannelHandlerContext ctx;
    private String result;
    private para;
    	channelActivate(ctx){
            ctx = ctx
        }
    	synchronzied channelRead(msg){
            result = msg.toString();
            notify();
        }
    synchronized call(){
        ctx.writeAndFlush(para);
        wait();
        retutn result;
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-10-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • BIO
  • NIO
  • 零拷贝:没有cpu copy或copy 的信息很少
  • AIO
  • Netty
  • Netty Task
  • Netty实现Http协议
  • Unpooled类
  • Netty心跳检测
  • Websocket
  • 编码解码
  • 数据入栈出栈
  • TCP粘包与拆包
  • 实现RPC
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档