专栏首页niceyooNetty心跳机制-长连接
原创

Netty心跳机制-长连接

前文需求回顾

完成对红酒窖的室内温度采集及监控功能。由本地应用程序+温度传感器定时采集室内温度上报至服务器,如果温度 >20 °C 则由服务器下发重启空调指令,如果本地应用长时间不上传温度给服务器,则给户主手机发送一条预警短信。


Netty入门篇-从双向通信开始「上文」

上篇算是完成简单的双向通信了,我们接着看看 “如果本地应用长时间不上传温度给服务器...”,很明显客户端有可能挂了嘛,所以怎么实现客户端与服务端的长连接就是本文要实现的了。

什么是心跳机制

百度百科:心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。

简单说,这个心跳机制是由客户端主动发起的消息,每隔一段时间就向服务端发送消息,告诉服务端自己还没死,可不要给户主发送预警短信啊。

如何实现心跳机制

1、客户端代码修改

我们需要改造一下上节中客户端的代码,首先是在责任链中增加一个心跳逻辑处理类HeartbeatHandler

public class NettyClient {

    private static String host = "127.0.0.1";

    public static void main(String[] args) {
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap
                // 1.指定线程模型
                .group(workerGroup)
                // 2.指定 IO 类型为 NIO
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                // 3.IO 处理逻辑
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                    ch.pipeline()
                        .addLast(new IdleStateHandler(0, 10, 0))
                            .addLast(new StringDecoder())
                            .addLast(new StringEncoder())
                            .addLast(new HeartbeatHandler())
                            .addLast(new NettyClientHandler());
                    }
                });
        // 4.建立连接
        bootstrap.connect(host, 8070).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println("连接成功!");
            } else {
                System.err.println("连接失败!");
            }

        });
    }
}

没什么变化,主要是增加了HeartbeatHandler,我们来看看这个类:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.nio.charset.Charset;
import java.time.LocalTime;

public class HeartbeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
                System.out.println("10秒了,需要发送消息给服务端了" + LocalTime.now());
                //向服务端送心跳包
                ByteBuf buffer = getByteBuf(ctx);
                //发送心跳消息,并在发送失败时关闭该连接
                ctx.writeAndFlush(buffer).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("捕获的异常:" + cause.getMessage());
        ctx.channel().close();
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        // 1. 获取二进制抽象 ByteBuf
        ByteBuf buffer = ctx.alloc().buffer();
        String time = "heartbeat:客户端心跳数据:" + LocalTime.now();
        // 2. 准备数据,指定字符串的字符集为 utf-8
        byte[] bytes = time.getBytes(Charset.forName("utf-8"));
        // 3. 填充数据到 ByteBuf
        buffer.writeBytes(bytes);
        return buffer;
    }

}

还是继承自ChannelInboundHandlerAdapter,不过这次重写的是userEventTriggered()方法,这个方法在客户端的所有ChannelHandler中,如果10s内没有发生write事件时触发,所以我们在该方法中给服务端发送心跳消息。

业务逻辑处理类NettyClientHandler没有改动,代码如下:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Random;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println(new Date() + ": 客户端写出数据");

        // 1. 获取数据
        ByteBuf buffer = getByteBuf(ctx);

        // 2. 写数据
        ctx.channel().writeAndFlush(buffer);
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        // 1. 获取二进制抽象 ByteBuf
        ByteBuf buffer = ctx.alloc().buffer();
        Random random = new Random();
        double value = random.nextDouble() * 14 + 8;
        String temp = "获取室内温度:" + value;

        // 2. 准备数据,指定字符串的字符集为 utf-8
        byte[] bytes = temp.getBytes(Charset.forName("utf-8"));

        // 3. 填充数据到 ByteBuf
        buffer.writeBytes(bytes);

        return buffer;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println(new Date() + ": 客户端读到数据 -> " + msg.toString());
    }

}

对如上代码不了解的可以回看上一节:Netty入门篇-从双向通信开始

2、服务端代码修改

服务端代码主要是开启TCP底层心跳机制支持,.childOption(ChannelOption.SO_KEEPALIVE, true) ,其他的代码并没有改动:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyServer {

    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                .group(bossGroup, workerGroup)

                // 指定Channel
                .channel(NioServerSocketChannel.class)

                //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
                .option(ChannelOption.SO_BACKLOG, 1024)

                //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                .childOption(ChannelOption.SO_KEEPALIVE, true)

                //将小的数据包包装成更大的帧进行传送,提高网络的负载
                .childOption(ChannelOption.TCP_NODELAY, true)

                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new NettyServerHandler());
                    }
                });

        serverBootstrap.bind(8070);
    }

}

我们再来看看服务端的业务处理类 NettyServerHandler

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        String message = byteBuf.toString(Charset.forName("utf-8"));
        System.out.println(new Date() + ": 服务端读到数据 -> " + message);
        /** 心跳数据是不发送数据 **/
        if(!message.contains("heartbeat")){
            ByteBuf out = getByteBuf(ctx);
            ctx.channel().writeAndFlush(out);
        }
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        byte[] bytes = "我是发送给客户端的数据:请重启冰箱!".getBytes(Charset.forName("utf-8"));
        ByteBuf buffer = ctx.alloc().buffer();
        buffer.writeBytes(bytes);
        return buffer;
    }

}

channelRead() 方法增加了一个 if 判断,判断如果包含heartbeat字符串就认为这是客户端发过来的心跳,这种判断是非常low的,因为到目前为止我们一直是用简单字符串来传递数据的,上边传递的数据就直接操作字符串;那么问题来了,如果我们想传递对象怎么搞呢?下节写。我们先来看一下如上代码客户端与服务端运行截图:

服务端

客户端

至此,整个心跳机制就完成了,这样每隔10秒客户端就会给服务端发送一个心跳消息,下节我们通过了解通协议以完善心跳机制的代码。

本文首发于博客园:https://www.cnblogs.com/niceyoo/p/13275731.html

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • QQ互联登陆(Java) - niceyoo

    腾讯开放平台网址: https://connect.qq.com/index.html

    niceyoo
  • 基于Docker的Redis集群搭建

    如果出现上图情况,Exited (1) 3 seconds ago,可以通过 docker logs 查看:

    niceyoo
  • 基于Docker搭建私有镜像仓库

    通常我们在docker中拉取的镜像都是在docker hub在线存储库中获取的,这个在线存储库里的docker镜像可以由任何用户发布和使用,显然这在某些场景下是...

    niceyoo
  • SpringMVC 请求参数,返回值格式打印,遍于开发调试

    用户2603479
  • Akka(34): Http:Unmarshalling,from Json

      Unmarshalling是Akka-http内把网上可传输格式的数据转变成程序高级结构话数据的过程,比如把Json数据转换成某个自定义类型的实例。按具体流...

    用户1150956
  • Python 字符串 去中间空格

    这招不太灵光,不仅是因为编码的问题(2.x 的版本中使用 repr()可以看到空格对应的编码,用其替换),而且太麻烦,不够灵活 。

    py3study
  • 也谈 ngx.ctx 继承问题

    在前一阵子的 OpenResty Con 2018 上,来自又拍云的 @tokers 分享了他们对 ngx.ctx 的 hack,以确保在发生内部跳转后 ngx...

    poslua
  • 看阿里首席架构师是如何选择并落地架构方案的

    如何针对当前需求,选择合适的应用架构,如何面向未来,保证架构平滑过渡,这个是软件开发者,特别是架构师,都需要深入思考的问题。

    技术zhai
  • 深信服一面C++

    牛客网
  • 【AI科技】无人驾驶汽车面临的两大挑战:数据收集与安全漏洞

    据外媒报道,随着汽车公司继续与科技公司与叫车服务公司之间的合作以更快更有效地创新,无人驾驶汽车即将变成现实。但是在无人驾驶时代,人们对于未来隐私的安全产生了疑问...

    AiTechYun

扫码关注云+社区

领取腾讯云代金券