专栏首页Java技术栈Netty 中的心跳机制,还有谁不会?

Netty 中的心跳机制,还有谁不会?

作者:rickiyang 出处:www.cnblogs.com/rickiyang/p/11074231.html

我们知道在TCP长连接或者WebSocket长连接中一般我们都会使用心跳机制–即发送特殊的数据包来通告对方自己的业务还没有办完,不要关闭链接。

那么心跳机制可以用来做什么呢?

我们知道网络的传输是不可靠的,当我们发起一个链接请求的过程之中会发生什么事情谁都无法预料,或者断电,服务器重启,断网线之类。

如果有这种情况的发生对方也无法判断你是否还在线。所以这时候我们引入心跳机制,在长链接中双方没有数据交互的时候互相发送数据(可能是空包,也可能是特殊数据),对方收到该数据之后也回复相应的数据用以确保双方都在线,这样就可以确保当前链接是有效的。

1. 如何实现心跳机制

一般实现心跳机制由两种方式:

  • TCP协议自带的心跳机制来实现;
  • 在应用层来实现。

但是TCP协议自带的心跳机制系统默认是设置的是2小时的心跳频率。它检查不到机器断电、网线拔出、防火墙这些断线。而且逻辑层处理断线可能也不是那么好处理。另外该心跳机制是与TCP协议绑定的,那如果我们要是使用UDP协议岂不是用不了?所以一般我们都不用。

而一般我们自己实现呢大致的策略是这样的:

  1. Client启动一个定时器,不断发送心跳;
  2. Server收到心跳后,做出回应;
  3. Server启动一个定时器,判断Client是否存在,这里做判断有两种方法:时间差和简单标识。

时间差:

  1. 收到一个心跳包之后记录当前时间;
  2. 判断定时器到达时间,计算多久没收到心跳时间=当前时间-上次收到心跳时间。如果改时间大于设定值则认为超时。

简单标识:

  1. 收到心跳后设置连接标识为true;
  2. 判断定时器到达时间,如果未收到心跳则设置连接标识为false;

今天我们来看一下Netty的心跳机制的实现,在Netty中提供了IdleStateHandler类来进行心跳的处理,它可以对一个 Channel 的 读/写设置定时器, 当 Channel 在一定事件间隔内没有数据交互时(即处于 idle 状态), 就会触发指定的事件。

该类可以对三种类型的超时做心跳机制检测:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
  • readerIdleTimeSeconds:设置读超时时间;
  • writerIdleTimeSeconds:设置写超时时间;
  • allIdleTimeSeconds:同时为读或写设置超时时间;

下面我们还是通过一个例子来讲解IdleStateHandler的使用。

服务端:

public class HeartBeatServer {
    private int port;

    public HeartBeatServer(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new HeartBeatServerChannelInitializer());

        try {
            ChannelFuture future = server.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        HeartBeatServer server = new HeartBeatServer(7788);
        server.start();
    }
}

服务端Initializer:

public class HeartBeatServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast("handler",new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast(new HeartBeatServerHandler());
    }
}

在这里IdleStateHandler也是handler的一种,所以加入addLast。我们分别设置4个参数:读超时时间为3s,写超时和读写超时为0,然后加入时间控制单元。另外,关注公众号Java技术栈,在后台回复:面试,可以获取我整理的 Java 系列面试题和答案,非常齐全。

服务端handler:

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
    private int loss_connect_time = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "Server :" + msg.toString());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            //服务端对应着读事件,当为READER_IDLE时触发
                IdleStateEvent event = (IdleStateEvent)evt;
            if(event.state() == IdleState.READER_IDLE){
                loss_connect_time++;
                System.out.println("接收消息超时");
                if(loss_connect_time > 2){
                    System.out.println("关闭不活动的链接");
                    ctx.channel().close();
                }
            }else{
                super.userEventTriggered(ctx,evt);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

我们看到在handler中调用了userEventTriggered方法,IdleStateEvent的state()方法一个有三个值:READER_IDLE,WRITER_IDLE,ALL_IDLE。正好对应读事件写事件和读写事件。

再来写一下客户端:

public class HeartBeatsClient {
    private  int port;
    private  String address;

    public HeartBeatsClient(int port, String address) {
        this.port = port;
        this.address = address;
    }

    public void start(){
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new HeartBeatsClientChannelInitializer());

        try {
            ChannelFuture future = bootstrap.connect(address,port).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        HeartBeatsClient client = new HeartBeatsClient(7788,"127.0.0.1");
        client.start();
    }
}

客户端Initializer:

public class HeartBeatsClientChannelInitializer extends  ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast("handler", new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast(new HeartBeatClientHandler());
    }
}

这里我们设置了IdleStateHandler的写超时为3秒,客户端执行的动作为写消息到服务端,服务端执行读动作。Spring Boot 学习笔记分享给你看下。

客户端handler:

public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {

    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF_8));

    private static final int TRY_TIMES = 3;

    private int currentTime = 0;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("激活时间是:"+new Date());
        System.out.println("链接已经激活");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("停止时间是:"+new Date());
        System.out.println("关闭链接");
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("当前轮询时间:"+new Date());
        if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.WRITER_IDLE) {
                if(currentTime <= TRY_TIMES){
                    System.out.println("currentTime:"+currentTime);
                    currentTime++;
                    ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
                }
            }
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String) msg;
        System.out.println(message);
        if (message.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

启动服务端和客户端我们看到输出为:

我们再来屡一下思路:

  1. 首先客户端激活channel,因为客户端中并没有发送消息所以会触发客户端的IdleStateHandler,它设置的写超时时间为3s;
  2. 然后触发客户端的事件机制进入userEventTriggered方法,在触发器中计数并向客户端发送消息;
  3. 服务端接收消息;
  4. 客户端触发器继续轮询发送消息,直到计数器满不再向服务端发送消息;
  5. 服务端在IdleStateHandler设置的读消息超时时间5s内未收到消息,触发了服务端中handler的userEventTriggered方法,于是关闭客户端的链接。

大体我们的简单心跳机制就是这样的思路,通过事件触发机制以及计数器的方式来实现,上面我们的案例中最后客户端没有发送消息的时候我们是强制断开了客户端的链接,那么既然可以关闭,我们是不是也可是重新链接客户端呢?因为万一客户端本身并不想关闭而是由于别的原因导致他无法与服务端通信。下面我们来说一下重连机制。

当我们的服务端在未读到客户端消息超时而关闭客户端的时候我们一般在客户端的finally块中方的是关闭客户端的代码,这时我们可以做一下修改的,finally是一定会被执行新的,所以我们可以在finally块中重新调用一下启动客户端的代码,这样就又重新启动了客户端了,上客户端代码:

/**
 * 本Client为测试netty重连机制
 * Server端代码都一样,所以不做修改
 * 只用在client端中做一下判断即可
 */
public class HeartBeatsClient2 {

    private  int port;
    private  String address;
    ChannelFuture future;

    public HeartBeatsClient2(int port, String address) {
        this.port = port;
        this.address = address;
    }

    public void start(){
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new HeartBeatsClientChannelInitializer());

        try {
            future = bootstrap.connect(address,port).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //group.shutdownGracefully();
            if (null != future) {
                if (future.channel() != null && future.channel().isOpen()) {
                    future.channel().close();
                }
            }
            System.out.println("准备重连");
            start();
            System.out.println("重连成功");
        }

    }

    public static void main(String[] args) {
        HeartBeatsClient2 client = new HeartBeatsClient2(7788,"127.0.0.1");
        client.start();
    }
}

其余部分的代码与上面的实例并无异同,只需改造客户端即可,我们再运行服务端和客户端会看到客户端虽然被关闭了,但是立马又被重启:

当然生产级别的代码应该不是这样实现的吧,哈哈。另外,关注公众号Java技术栈,在后台回复:面试,可以获取我整理的 Java 系列面试题和答案,非常齐全。

本文分享自微信公众号 - Java技术栈(javastack),作者:点击关注 ????

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-06-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 这样讲 Netty 中的心跳机制,还有谁不会?

    顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.

    肉眼品世界
  • 一文搞懂 Netty 的整体流程,还有谁不会?

    作者:fredalxin 地址:https://fredal.xin/netty-process

    Java技术栈
  • 没吃透Netty底层通讯原理,还能算得上Java老司机?

    搞了N年Java,仍有不少朋友困惑:用了很多年Dubbo,觉得自己挺厉害,跳槽面试时一问RPC,一问底层通讯,一问NIO和AIO,就一脸懵逼,到底该怎么办?

    Java团长
  • Netty 总结篇

    Netty一个主要的目标就是促进“关注点分离”:使业务逻辑从网络基础设施应用程序中分离。不仅仅是Netty框架,其他框架的设计目的也大都是为了使业务程序和底层技...

    luoxn28
  • 使用Netty,我们到底在开发些什么?

    在java界,netty无疑是开发网络应用的拿手菜。你不需要太多关注复杂的nio模型和底层网络的细节,使用其丰富的接口,可以很容易的实现复杂的通讯功能。

    xjjdog
  • MQTT服务接入超时案例:MQTT服务和Netty在异常场景下的保护机制

    Netty 4.1提供了MQTT协议栈,基于此可以非常方便地创建MQTT服务,尽管开发简单,但是在实际环境中会面临各种挑战,甚至会面临一些不遵循MQTT规范的端...

    博文视点Broadview
  • 一篇文章,读懂Netty的高性能架构之道

    Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通...

    Spark学习技巧
  • 自已开发IM有那么难吗?手把手教你自撸一个Andriod版简易IM (有源码)

    一直想写一篇关于im即时通讯分享的文章,无奈工作太忙,很难抽出时间。今天终于从公司离职了,打算好好休息几天再重新找工作,趁时间空闲,决定静下心来写一篇文章,毕竟...

    JackJiang
  • Spring Cloud Gateway真的有那么差吗?

    Spring Cloud从一开始最受大家质疑的就是网关性能,那是由于Spring Cloud最初选择了使用Netflix几年前开源的Zuul作为基础,而高性能版...

    程序猿DD
  • 自已开发IM有那么难吗?手把手教你自撸一个Andriod版简易IM (有源码)

    一直想写一篇关于im即时通讯分享的文章,无奈工作太忙,很难抽出时间。今天终于从公司离职了,打算好好休息几天再重新找工作,趁时间空闲,决定静下心来写一篇文章,毕竟...

    JackJiang
  • 膨胀了!我要手写QQ底层!(附源码)

    一直想写一篇关于im即时通讯分享的文章,无奈工作太忙,很难抽出时间。今天终于从公司离职了,打算好好休息几天再重新找工作,趁时间空闲,决定静下心来写一篇文章,毕竟...

    小林C语言
  • 网络编程中,关于Keep-Alive与Idle你了解多少?

    有朋友问我,关于使用Netty开发长连接应用,为什么要加一个Keep Alive为true的配置。你是否也有这样的疑惑呢?

    Java艺术
  • Netty 那些事儿 ——— 心跳机制

    tomas家的小拨浪鼓
  • Springboot 2.0 +protobuf + Netty 实战(附源码)

    这一篇文章主要介绍如何用Springboot 整合 Netty,这里也是在网上搜寻了一些Netty例子学习后总结来的,借鉴了他人的写法和经验。如有重复部分,还请...

    java进阶架构师
  • websocket(三) 进阶!netty框架实现websocket达到高并发

    引言: 在前面两篇文章中,我们对原生websocket进行了了解,且用demo来简单的讲解了其用法。但是在实际项目中,那样的用法是不可取的,理由是tomcat对...

    生活创客
  • Netty 断线重连解决方案

    本篇文章是Netty专题的第七篇,前面六篇文章如下: 高性能NIO框架Netty入门篇 高性能NIO框架Netty-对象传输 高性能NIO框架Netty-整合k...

    猿天地
  • 基于Netty实现海量接入的推送服务技术要点

    用户1263954
  • RPC 实战总结与进阶延伸

    ByteBuf 是必须要掌握的核心工具类,并且能够理解 ByteBuf 的内部构造。ByteBuf 包含三个指针:读指针 readerIndex、写指针 wri...

    MickyInvQ
  • 基于 Netty 如何实现高性能的 HTTP Client 的连接池

    使用netty作为http的客户端,pool又该如何进行设计。本文将会进行详细的描述。

    芋道源码

扫码关注云+社区

领取腾讯云代金券