Netty 那些事儿 ——— 心跳机制

本文是Netty文集中“Netty 那些事儿”系列的文章。主要结合在开发实战中,我们遇到的一些“奇奇怪怪”的问题,以及如何正确且更好的使用Netty框架,并会对Netty中涉及的重要设计理念进行介绍。

什么是心跳机制?

心跳说的是在客户端和服务端在互相建立ESTABLISH状态的时候,如何通过发送一个最简单的包来保持连接的存活,还有监控另一边服务的可用性等。

心跳包的作用

  • 保活 Q:为什么说心跳机制能保持连接的存活,它是集群中或长连接中最为有效避免网络中断的一个重要的保障措施? A:之所以说是“避免网络中断的一个重要保障措施”,原因是:我们得知公网IP是一个宝贵的资源,一旦某一连接长时间的占用并且不发数据,这怎能对得起网络给此连接分配公网IP,这简直是对网络资源最大的浪费,所以基本上所有的NAT路由器都会定时的清除那些长时间没有数据传输的映射表项。一是回收IP资源,二是释放NAT路由器本身内存的资源,这样问题就来了,连接被从中间断开了,双发还都不晓得对方已经连通不了了,还会继续发数据,这样会有两个结果:a) 发方会收到NAT路由器的RST包,导致发方知道连接已中断;b) 发方没有收到任何NAT的回执,NAT只是简单的drop相应的数据包 通常我们测试得出的是第二种情况会多些,就是客户端是不知道自己应经连接断开了,所以这时候心跳就可以和NAT建立关联了,只要我们在NAT认为合理连接的时间内发送心跳数据包,这样NAT会继续keep连接的IP映射表项不被移除,达到了连接不会被中断的目的。
  • 检测另一端服务是否可用 TCP的断开可能有时候是不能瞬时探知的,甚至是不能探知的,也可能有很长时间的延迟,如果前端没有正常的断开TCP连接,四次握手没有发起,服务端无从得知客户端的掉线,这个时候我们就需要心跳包来检测另一端服务是否还存活可用。

应用层的心跳机制   VS   TCP的keepalive机制

  • 传输层心跳是保证连接可用,但应用层心跳却可以保证服务可用. TCP的keepalive机制能保证连接没有问题,但当进程出现死锁或者阻塞的情况下,虽然连接没有问题,但是服务已经不能正常使用了。
  • 从TCP的keepalive机制的本质上来说,是用来检测长时间不活跃的连接的,不适合用来及时检测连接的状态;而应用层的心跳机制具有更大的灵活性,可以自己控制检测的间隔和检测方式,并且可以通过心跳包来附带一些信息等。 TCP有个KeepAlive开关,打开后可以用来检测死连接。通常默认是2小时,可以自己设置。但是注意,这是TCP的全局设置。

用 Netty 的 IdleStateHandler 实现固定周期的心跳机制

因为IdleStateHandler的超时时间是不可改变的,所以通过IdleStateHandler只能实现固定周期的心跳机制。

以此为基础的心跳机制: 方案一: client: ① arg0.pipeline().addLast("ping", new IdleStateHandler(25, 0, 10,TimeUnit.SECONDS)); 处理ReadIdleEvent和AllIdleEvent。 当AllIdleEvent触发时说明此时间段内既没有读也没有写操作,那么就发送一个心跳包。 因为ReadIdleEvent的超时时间比AllIdleEvent长,所以如果在指定时间范围内收到了心跳包的回复是不会触发这个事件的。所以如果ReadIdleEvent事件被触发了,则认为和服务端的连接已经断掉了,那么就close这个channel。?注意,这边有个有个优化,每次发送心跳包的时候就计数下,如果有收到pong包则重新计数,依次来实现发送N此心跳包后依旧么有回复的情况下,再关闭这个channel。 ② 通过channelInactive方法来处理客户端的重连机制的。该方法触发使,会调用一个延迟器来执行和服务端的重连。 server: ① arg0.pipeline().addLast("ping", new IdleStateHandler(25 * N, 0, 0,TimeUnit.SECONDS)); N为客户端重试发送心跳包的次数,这么设计主要是为了让客户端和服务端能几乎同时的去关闭这个channel。 当ReadIdleEvent被触发时,则认为和客户端的这次连接已经断掉了,则close这个channel。

方案二: client: ① arg0.pipeline().addLast("ping", new IdleStateHandler(10, 0, 0,TimeUnit.SECONDS)); 当ReadIdleEvent事件被触发使,则发送一个心跳包,并对发送的心跳包进行计数。如果连接正常,则会收到服务端的pong包,这时会清空计数器。当然在收到其他的数据包时也是会清空这个计数器的。 当发送心跳包的计数值达到一定数量的时候,则认为和服务端的连接已经断掉了,这个时候则会close掉这个channel。 ② 通过channelInactive方法来处理客户端的重连机制的。该方法触发使,会调用一个延迟器来执行和服务端的重连。 server: ① arg0.pipeline().addLast("ping", new IdleStateHandler(10 * N, 0, 0,TimeUnit.SECONDS)); N为客户端重试发送心跳包的次数,这么设计主要是为了让客户端和服务端能几乎同时的去关闭这个channel。 当ReadIdleEvent被触发时,则认为和客户端的这次连接已经断掉了,则close这个channel。

这里个人建议使用第二个方案来实现心跳机制。因为通过查看源码发现,无论channel的写操作是否成功,只有是执行了channel的write操作就会注册IdleStateHandler中的writeListener到write操作的promise中。这样只要操作完成,无论是失败还是成功都会触发注册到其上的listener的回调。这样的话就可能出现使得即使write操作失败了也不会触发和写有关的超时事件了的情况了,即AllIdleEvent就不会被触发了,这将导致即便这个时候写操作时因为一些逻辑关系而操作失败了,我们的心跳机制在几次ReadIdleEvent事件被触发后,会错误的认为连接已经“断”了,而去关闭这个channel了(实际上,有可能是write操作是失败的,但因为AllIdleEvent没有被触发,那么就不会发送心跳包,最终导致ReadIdleEvent事件的触发)。 当然,到底使用AllIdleEvent还是ReadIdleEvent活着WriteIdleEvent还是要根据实际的业务情况来决定的

代码示例

我们通过一个简单的聊天系统来展示如何在Netty中使用心跳机制,并采用“方案二”的方式来实现。由客户端向服务器端发起心跳(ping包),服务器端在收到客户端的心跳包后会返回一个响应(pong包)。若服务端在一定时间内没有收到客户端任何的数据包时(包括心跳包以及逻辑数据包),则认为该客户端已经无法正常通信了,那么就关掉相应socket以释放资源;而客户端同理。

首先我们发送的是自定义的消息包

自定义消息格式: | MAGIC | LENGTH | BODY |
MAGIC(byte) :消息类型。{@link ChatProtocol#MAGIC_MESSAGE}表示消息类型;{@link ChatProtocol#MAGIC_HEARTBEAT_PING}表示PING心跳包;{@link ChatProtocol#MAGIC_HEARTBEAT_PONG}表示PONG心跳包
LENGTH(int32) :消息长度
BODY(byte[]) :消息体

客户端主要代码: MyChatClient:

public class MyChatClient {

    private ExecutorService executor;
//    private Future result;
    private static BufferedReader bufferedReader;
    private PrintTask printTask;

    public static void main(String[] args) throws Exception {
        MyChatClient chatClient = new MyChatClient();
        EventLoopGroup group = new NioEventLoopGroup();
        bufferedReader = new BufferedReader(new InputStreamReader(System.in));
        try {
            chatClient.connect(group);
        } finally {
//            group.shutdownGracefully();
        }

        Runtime.getRuntime().addShutdownHook(new Thread(() -> group.shutdownGracefully()));
    }

    public void connect(EventLoopGroup group) {
        try{
            executor = Executors.newSingleThreadExecutor();
            Bootstrap client = new Bootstrap();
            client.group(group).channel(NioSocketChannel.class).handler(new MyChatClientInitializer(this));

            // ======= 说明 ========
            /**
             * 这种写法在重连接的时候回抛 "io.netty.util.concurrent.BlockingOperationException: DefaultChannelPromise@d21e95e(incomplete)"异常
             * 解决方法:不能在ChannelHandler中调用 ChannelFuture.sync() 。通过注册Listener来实现功能
             */
//            ChannelFuture future = client.connect(new InetSocketAddress("127.0.0.1", 5566)).sync();
            // ======= 说明 ========

            //192.168.1.102
            client.remoteAddress(new InetSocketAddress("127.0.0.1", 5566));
            client.connect().addListener((ChannelFuture future) -> {
                if(future.isSuccess()) {

                    // ======= 说明 ========
                    // 这个 死循环 导致了走到了channelRegistered, 后面的channelActive流程就被它堵塞了,以至于没往下走。。。
                    /*while (!readerExit) {
                        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
                        future.channel().writeAndFlush(bufferedReader.readLine());
                    }*/
                    // ======= 说明 ========

                    if(printTask == null) {
                        printTask = new PrintTask(future, bufferedReader);
                    } else {
                        printTask.setFuture(future);
                    }
                    executor.submit(printTask);


                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

MyChatClient主要完成了对服务端的连接,并将connect方法独立出来,以便重连时使用。 上面“说明”注解中提及的两点是Netty线程模式中非常重要的两个知识点,在之前的理论篇以及源码篇都有进行说明,在文章的后面,会再次结合实战情况再次对这两个重要的知识点进行说明。

MyChatClientInitializer:

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline()
                .addLast("idleStateHandler", new IdleStateHandler(MyChatContants.CLIENT_READ_TIME, 0, 0, TimeUnit.SECONDS))
                .addLast("myChatClientIdleHandler", new MyChatClientIdleHandler(chatClient))
                .addLast("myChatDecoder", new MyChatDecoder())
                .addLast("myChatEncoder", new MyChatEncoder())
                .addLast("stringDecoder", new StringDecoder(charset))
                .addLast("stringEncoder", new StringEncoder(charset))
                .addLast("myChatClientHandler", new MyChatClientHandler());
    }

初始化客户端的ChannelHandler,其中就包括了用于实现心跳机制的IdleStateHandler以及MyChatClientIdleHandler。

MyChatClientIdleHandler:

public class MyChatClientIdleHandler extends ChannelInboundHandlerAdapter {

    private final MyChatClient chatClient;

    public MyChatClientIdleHandler(MyChatClient chatClient) {
        this.chatClient = chatClient;
    }

    private int retryCount;

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent)evt;
            if(event.state() == IdleState.READER_IDLE) {
                if(++retryCount > RETRY_LIMIT) {
                    System.out.println("server " + ctx.channel().remoteAddress() + " is inactive to close");
                    closeAndReconnection(ctx.channel());
                } else {
                    System.out.println("send ping package to " + ctx.channel().remoteAddress());
                    ctx.writeAndFlush(MyHeartbeat.getHeartbeatPingBuf());
                }
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    private void closeAndReconnection(Channel channel) {
        channel.close();
        channel.eventLoop().schedule(() -> {
            System.out.println("========== 尝试重连接 ==========");
            chatClient.connect(channel.eventLoop());
        }, 10L, TimeUnit.SECONDS);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        retryCount=0;
        super.channelRead(ctx, msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel() + " 已连上. 可以开始聊天...");
        super.channelActive(ctx);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered : " + ctx.channel());
        super.channelRegistered(ctx);
    }
}

MyChatClientIdleHandler类实现对读空闲时间超时的处理,当发现IdleState.READER_IDLE事件连续发生RETRY_LIMIT次后,则任务与服务端的连接已经失效了,此时就会关闭当前的socket,定义一个延时任务进行与服务器的重新连接。若还未超过RETRY_LIMIT次,则会发送一个心跳包(ping包)。

服务端主要代码: MyChatServerInitializer:

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline()
                .addLast("idleStateHandler", new IdleStateHandler(MyChatContants.SERVER_READ_TIME, 0, 0, TimeUnit.SECONDS))
                .addLast("myChatServerIdleHandler", new MyChatServerIdleHandler())
                .addLast("myChatDecoder", new MyChatDecoder())
                .addLast("myChatEncoder", new MyChatEncoder())
                .addLast("stringDecoder", new StringDecoder(charset))
                .addLast("stringEncoder", new StringEncoder(charset))
                .addLast("myChatServerHandler", new MyChatServerHandler());
    }

初始化服务器端的ChannelHandler,其中就包括了用于实现心跳机制的IdleStateHandler以及MyChatServerIdleHandler。

MyChatServerIdleHandler:

public class MyChatServerIdleHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state() == READER_IDLE) {
                System.out.println("client " + ctx.channel().remoteAddress() + " is inactive to close");
                ctx.channel().close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

}

MyChatServerIdleHandler类实现对读空闲时间超时的处理,若发送了读空闲时间超时则认为和客户端的连接已经失效,就会调用channel.close()来实现socket的关闭以及资源的释放。

MyChatDecoder:

public class MyChatDecoder extends ReplayingDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        byte magic = in.readByte();
        switch (magic) {
            case ChatProtocol.MAGIC_MESSAGE:
                int length = in.readInt();
                ByteBuf body = in.readBytes(length);
                out.add(body);
                break;
            case ChatProtocol.MAGIC_HEARTBEAT_PING:
                System.out.println("收到 " + ctx.channel().remoteAddress() + " 的 ping 包,返回一个 pong 包。");
                ctx.writeAndFlush(MyHeartbeat.getHeartbeatPongBuf());
                break;
            case ChatProtocol.MAGIC_HEARTBEAT_PONG:
                System.out.println("收到 " + ctx.channel().remoteAddress() + " 的 pong 包。");
                break;
        }

    }
}

消息解码器处理器,这里实现了,当收到客户端的心跳包时(ping包),则会返回一个响应(pong包)。

完整的代码欢迎参阅github

关于 Netty4 的 BlockingOperationException 异常

好了,现在我们来说说上面两个注释

            // ======= 说明 ========
            /**
             * 这种写法在重连接的时候回抛 "io.netty.util.concurrent.BlockingOperationException: DefaultChannelPromise@d21e95e(incomplete)"异常
             * 解决方法:不能在ChannelHandler中调用 ChannelFuture.sync() 。通过注册Listener来实现功能
             */
//            ChannelFuture future = client.connect(new InetSocketAddress("127.0.0.1", 5566)).sync();
            // ======= 说明 ========

首先,来说说明下产生BlockingOperationException的场景:当发现连接失效时,通过如下代码进行重连时抛出该异常:

        channel.eventLoop().schedule(() -> {
            System.out.println("========== 尝试重连接 ==========");
            chatClient.connect(channel.eventLoop());
        }, 10L, TimeUnit.SECONDS);

异常是由 ChannelFuture.sync()方法抛出的。那么,我们来看看sync方法的实现:

跟踪sync方法最终会调用到DefaultPromise的await()方法

而异常就是由其中的「checkDeadLock()」方法抛出的:

    protected void checkDeadLock() {
        EventExecutor e = executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(toString());
        }
    }

checkDeadLock()方法是await()方法中在调用Object的wait()前必须调用的方法,以检查当前的上下文是否会使Object wait()的调用造成死锁。 当e!=null && e.inEventLoop()为true,则说明执行当前方法的线程就是EventLoop所关联的线程(即,I/O线程)。

checkDeadLock()方法之后就会调用Object的wait()方法。wait()操作会将当前线程挂起,并释放对象锁,直到另一个线程调动该对象的notifyf()或notifyAll()方法,会唤醒一个被挂起的线程。所以如果挂起的线程和需要调用notify的线程是同一个线程的话,就会发生死锁。(因为线程都已经被挂起了,还怎么去进行notify/notifyAll操作了?)

再者在在Netty4中,一个Channel对于的所以操作都会在它被创建时分配给它的EventLoop中完成,而一个EventLoop的整个生命周期只会和一个线程绑定,不会修改它。 而ChannelFuture则表示Channel异步操作的一个结果。你可以通过ChannelFuture来获取Channel异步操作的结果。获取结果的方式有两种:a) 调用await(*)、sync(*)、get(*) 等方法阻塞当前线程直到获取到异步操作的结果;b) 通过注册回调函数,当操作完成的时候该回调函数会得到调用。

Q:所以,BlockingOperationException到底是在什么情况下会被抛出了? A:首先,我们已经直到了当执行wait()线程和将会执行notify()/notifyAll()的线程会是同一个线程时,就会造成死锁。 然后,我们知道当ChannelFuture调用await(*)、sync(*)、get(*) 等方法时就会触发当前线程的wait()操作,并将当前线程挂起,等待Channel相关的操作完成。 所以,如果执行Channel相关操作的线程( 即,EventLoop所关联线程,它们会调用notify()/notifyAll() )和执行ChannelFuture的await(*)、sync(*)、get(*) 等方法的线程( 即,调用wait()的线程 )是同一个线程时,就会发送死锁了!!!

结合示例,因为通过「channel.eventLoop().schedule(Runnable command, long delay, TimeUnit unit)」提交的定时任务最终都将会在EventLoop所关联的线程上得以执行,那么如果你在定时任务中又调用了await()这样操作,就会发生上面说所的,挂起的线程和将会notify()/notifyAll()的线程会是同一个线程时,这就会造成死锁。所以Netty在每次执行Object的await()操作去都会进行判断,判断当前的环境下调用object.await()是否会发送死锁,如果检测任务可能发生死锁,则抛出BlockingOperationException异常,而不会真正的去执行object.await()操作而导致真的死锁发生。

因此,总的来说,我们不应该在Channel所注册到的EventLoop相关联的线程为同一个线程上调用与该Channel关联的ChannelFuture的sync* 或 await* 方法。好像很拗口。。简单图示如下: Channel_A注册到了EventLoop_A上:Channel_A —— 注册 ——> EventLoop_A ChannelFuture_A表示Channel_A的一个异步操作:ChannelFuture_A —— 关联 ——> Channel_A 那么不能再EventLoop_A 上执行 ChannelFuture_A的await(*)、sync(*)、get(*) 等方法。

同时建议,不在ChannelFuture中调用await(*)、sync(*)、get(*) 等方法来获取操作的结果;而是使用注册Listener的方法,通过回调函数来获取操作结果。

不要阻塞EventLoop线程

第二个注释说明:

client.connect().addListener((ChannelFuture future) -> {
                if(future.isSuccess()) {

                    // ======= 说明 ========
                    // 这个 死循环 导致了走到了channelRegistered, 后面的channelActive流程就被它堵塞了,以至于没往下走。。。
                    /*while (!readerExit) {
                        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
                        future.channel().writeAndFlush(bufferedReader.readLine());
                    }*/
                    // ======= 说明 ========

                   // 业务逻辑代码
                       ......
                }
}

发生这个问题的原因在于: ① 一个EventLoop在它的整个生命周期当中都只会与唯一一个Thread进行绑定。 ② 所有由EventLoop所处理的各种I/O事件都将在它所关联的那个Thread上进行处理。 ③ 一个Channel在它的整个生命周期中只会注册在一个EventLoop上。 ④ ChannelFuture代表了一个Channel的异步操作,并且可以通过注册ChannelFutureListener使得再Channel的异步操作结束后以回调的方式来获取这个执行结果。 ⑤ 值得注意的是:ChannelFutureListener的operationComplete方法是由I/O线程执行的。

因此,如果在client.connect()这个异步操作上注册了一个ChannelFutureListener,而该ChannelFutureListener的operationComplete方法中却执行了一个死循环,这会导致整个I/O线程就卡在了这个死循环上,而无法继续执行Channel该有的其他流程,以及导致注册到该EventLoop上所有的Channel操作都无法得以执行了。

因此,我们要注意,千万不要阻塞这个I/O线程(即,EventLoop所关联的线程),也不要在该线程上执行任何耗时的操作。我们应该将耗时的操作放到另外的一个线程池中去执行。

后记

本文主要对心跳机制进行了简单介绍,并主要针对Netty下如何实现固定周期的心跳机制进行了深入的讨论,同时结合示例对真实开发中很容易遇到的两个Netty线程模式的问题进行了讨论和说明。

参考

圣思园《精通并发与Netty》 https://github.com/jianfengye/doc_web/blob/master/linux/heartbeat.md https://blog.coderzh.com/2015/03/05/WhyHeartBeatNeeded/ http://www.voidcn.com/article/p-wcfsgijn-bbx.html

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java Edge

操作系统之进程管理一、进程二、进程状态及状态转换三、进程队列四、进程控制五、深入理解六、线程七、死锁二、资源分配图(RAG:Resource Allocation Graph)三、死锁预防四、死锁避免

35210
来自专栏开发技术

Redis Cluster的搭建与部署,实现redis的分布式方案

  上篇Redis Sentinel安装与部署,实现redis的高可用实现了redis的高可用,针对的主要是master宕机的情况,我们发现所有节点的数据都是一...

1621
来自专栏Janti

玩转Kafka的生产者——分区器与多线程

上篇文章学习kafka的基本安装和基础概念,本文主要是学习kafka的常用API。其中包括生产者和消费者,

1142
来自专栏小灰灰

Redis实现分布式锁相关注意事项

Redis实现分布式锁相关注意事项 查看了不少关于redis实现分布式锁的文章,无疑要设计一个靠谱的分布式并不太容易,总会出现各种鬼畜的问题;现在就来小述一下...

2288
来自专栏北京马哥教育

Linux系统进程的知识总结,进程与线程之间的纠葛...

当一个程序开始执行后,在开始执行到执行完毕退出这段时间内,它在内存中的部分就叫称作一个进程。

630
来自专栏用户2442861的专栏

Thrift 服务模型和序列化机制深入学习

http://www.liuqianfei.com/article/065b0f1ee59a4cf0b94a84c4e33af127

582
来自专栏Java面试通关手册

Mysql锁机制简单了解一下

Java面试通关手册(Java学习指南,欢迎Star,会一直完善下去,欢迎建议和指导):https://github.com/Snailclimb/Java_G...

15011
来自专栏Java成神之路

Eclipse插件开发_学习_02_GEF入门实例

(2)搜索 editors,选择 org.eclipse.ui.editors  扩展点,finish

822
来自专栏IT笔记

Tomcat优化之配置线程池

简介 线程池作为提高程序处理数据能力的一种方案,应用非常广泛。大量的服务器都或多或少的使用到了线程池技术,不管是用Java还是C++实现,线程池都有如下的特点:...

2988
来自专栏IT可乐

RabbitMQ详解(三)------RabbitMQ的五种队列

  上一篇博客我们介绍了RabbitMQ消息通信中的一些基本概念,这篇博客我们介绍 RabbitMQ 的五种工作模式,这也是实际使用RabbitMQ需要重点关注...

992

扫码关注云+社区