前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot整合Netty并使用Protobuf进行数据传输 (附工程)

SpringBoot整合Netty并使用Protobuf进行数据传输 (附工程)

作者头像
二哥聊运营工具
发布2021-12-17 16:56:07
6320
发布2021-12-17 16:56:07
举报
文章被收录于专栏:程序员泥瓦匠

前言

本篇文章主要介绍的是SpringBoot整合Netty以及使用Protobuf进行数据传输的相关内容。Protobuf会简单的介绍下用法,至于Netty在之前的文章中已经简单的介绍过了,这里就不再过多细说了。

Protobuf

介绍

protocolbuffer(以下简称PB)是google 的一种数据交换的格式,它独立于语言,独立于平台。google 提供了多种语言的实现:java、c#、c++、go 和python,每一种实现都包含了相应语言的编译器以及库文件。由于它是一种二进制的格式,比使用 xml进行数据交换快许多。可以把它用于分布式应用之间的数据通信或者异构环境下的数据交换。作为一种效率和兼容性都很优秀的二进制数据传输格式,可以用于诸如网络传输、配置文件、数据存储等诸多领域。

官方地址: https://github.com/google/protobuf

使用

这里的使用就只介绍Java相关的使用。 首先我们需要建立一个proto文件,在该文件定义我们需要传输的文件。 例如我们需要定义一个用户的信息,包含的字段主要有编号、名称、年龄。 那么该protobuf文件的格式如下: :这里使用的是proto3,相关的注释我已写了,这里便不再过多讲述了。需要注意一点的是proto文件和生成的Java文件名称不能一致!

代码语言:javascript
复制
syntax = "proto3";
// 生成的包名
option java_package="com.pancm.protobuf";
//生成的java名
option java_outer_classname = "UserInfo";

message UserMsg {
      
     // ID
     int32 id = 1;
      
    // 姓名
     string name = 2;
      
    // 年龄
      int32 age = 3;
	  
	 // 状态
     int32 state = 4;
}

创建好该文件之后,我们把该文件和protoc.exe(生成Java文件的软件)放到E盘目录下的protobuf文件夹下,然后再到该目录的dos界面下输入:protoc.exe --java_out=文件绝对路径名称。 例如:

代码语言:javascript
复制
protoc.exe --java_out=E:\protobuf User.proto

输入完之后,回车即可在同级目录看到已经生成好的Java文件,然后将该文件放到项目中该文件指定的路径下即可。

注:生成protobuf的文件软件和测试的protobuf文件我也整合到该项目中了,可以直接获取的。

Java文件生成好之后,我们再来看怎么使用。 这里我就直接贴代码了,并且将注释写在代码中,应该更容易理解些吧。。。 代码示例:

代码语言:javascript
复制
	 // 按照定义的数据结构,创建一个对象
    	UserInfo.UserMsg.Builder userInfo = UserInfo.UserMsg.newBuilder();
    	userInfo.setId(1);
    	userInfo.setName("xuwujing");
    	userInfo.setAge(18);
    	UserInfo.UserMsg userMsg = userInfo.build();
        // 将数据写到输出流
        ByteArrayOutputStream output = new ByteArrayOutputStream();
        userMsg.writeTo(output);
        // 将数据序列化后发送
        byte[] byteArray = output.toByteArray();
        // 接收到流并读取
        ByteArrayInputStream input = new ByteArrayInputStream(byteArray);
        // 反序列化
        UserInfo.UserMsg userInfo2 = UserInfo.UserMsg.parseFrom(input);
        System.out.println("id:" + userInfo2.getId());
        System.out.println("name:" + userInfo2.getName());
        System.out.println("age:" + userInfo2.getAge());

注:这里说明一点,因为protobuf是通过二进制进行传输,所以需要注意下相应的编码。还有使用protobuf也需要注意一下一次传输的最大字节长度。

输出结果:

代码语言:javascript
复制
id:1
name:xuwujing
age:18

SpringBoot整合Netty

说明:如果想直接获取工程那么可以直接跳到底部,通过链接下载工程代码。

开发准备

环境要求 JDK::1.8 Netty::4.0或以上(不包括5) Protobuf:3.0或以上

如果对Netty不熟的话,可以看看我之前写的一些文章。大神请无视~。~ 地址:https://blog.csdn.net/column/details/17640.html

首先还是Maven的相关依赖:

代码语言:javascript
复制
<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>1.8</java.version>
		<netty.version>4.1.22.Final</netty.version>
		<protobuf.version>3.5.1</protobuf.version>
		<springboot>1.5.9.RELEASE</springboot>
		<fastjson>1.2.41</fastjson>
		<maven.compiler.source>1.8</maven.compiler.source>
   		<maven.compiler.target>1.8</maven.compiler.target>
	</properties>


	<dependencies>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
			<version>${springboot}</version>
		</dependency>


		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<version>${springboot}</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<version>${springboot}</version>
			<optional>true</optional>
		</dependency>

		
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>${netty.version}</version>
		</dependency>

		<dependency>
			<groupId>com.google.protobuf</groupId>
			<artifactId>protobuf-java</artifactId>
			<version>${protobuf.version}</version>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>${fastjson}</version>
		</dependency>

		
	<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.12</version>
			<scope>test</scope>
		</dependency> 
</dependencies>

添加了相应的maven依赖之后,配置文件这块暂时没有什么可以添加的,因为暂时就一个监听的端口而已。

代码编写

代码模块主要分为服务端和客户端。 主要实现的业务逻辑: 服务端启动成功之后,客户端也启动成功,这时服务端会发送一条protobuf格式的信息给客户端,然后客户端给予相应的应答。客户端与服务端连接成功之后,客户端每个一段时间会发送心跳指令给服务端,告诉服务端该客户端还存过中,如果客户端没有在指定的时间发送信息,服务端会关闭与该客户端的连接。当客户端无法连接到服务端之后,会每隔一段时间去尝试重连,只到重连成功!

服务端

首先是编写服务端的启动类,相应的注释在代码中写得很详细了,这里也不再过多讲述了。不过需要注意的是,在之前的我写的Netty文章中,是通过main方法直接启动服务端,因此是直接new一个对象的。而在和SpringBoot整合之后,我们需要将Netty交给springBoot去管理,所以这里就用了相应的注解。 代码如下:

代码语言:javascript
复制
@Service("nettyServer")
public class NettyServer {
	private static final int port = 9876; // 设置服务端端口
	private static EventLoopGroup boss = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接
	private static EventLoopGroup work = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接
	private static ServerBootstrap b = new ServerBootstrap();
	
	@Autowired
	private NettyServerFilter nettyServerFilter;
	
	
	public void run() {
		try {
			b.group(boss, work);
			b.channel(NioServerSocketChannel.class);
			b.childHandler(nettyServerFilter); // 设置过滤器
			// 服务器绑定端口监听
			ChannelFuture f = b.bind(port).sync();
			System.out.println("服务端启动成功,端口是:" + port);
			// 监听服务器关闭监听
			f.channel().closeFuture().sync();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			// 关闭EventLoopGroup,释放掉所有资源包括创建的线程
			work.shutdownGracefully();
			boss.shutdownGracefully();
		}
	}
}

服务端主类编写完毕之后,我们再来设置下相应的过滤条件。 这里需要继承Netty中ChannelInitializer类,然后重写initChannel该方法,进行添加相应的设置,如心跳超时设置,传输协议设置,以及相应的业务实现类。 代码如下:

代码语言:javascript
复制
	@Component
	 public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
	
	@Autowired
	private NettyServerHandler nettyServerHandler;
	
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline ph = ch.pipeline();
      
         //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
         ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
         // 解码和编码,应和客户端一致
         //传输的协议 Protobuf
         ph.addLast(new ProtobufVarint32FrameDecoder());
         ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
         ph.addLast(new ProtobufVarint32LengthFieldPrepender());
         ph.addLast(new ProtobufEncoder());
         
         //业务逻辑实现类
         ph.addLast("nettyServerHandler", nettyServerHandler);
       }
     }

服务相关的设置的代码写完之后,我们再来编写主要的业务代码。 使用Netty编写业务层的代码,我们需要继承ChannelInboundHandlerAdapterSimpleChannelInboundHandler类,在这里顺便说下它们两的区别吧。 继承SimpleChannelInboundHandler类之后,会在接收到数据后会自动release掉数据占用的Bytebuffer资源。并且继承该类需要指定数据格式。 而继承ChannelInboundHandlerAdapter则不会自动释放,需要手动调用ReferenceCountUtil.release()等方法进行释放。继承该类不需要指定数据格式。 所以在这里,个人推荐服务端继承ChannelInboundHandlerAdapter,手动进行释放,防止数据未处理完就自动释放了。而且服务端可能有多个客户端进行连接,并且每一个客户端请求的数据格式都不一致,这时便可以进行相应的处理。 客户端根据情况可以继承SimpleChannelInboundHandler类。好处是直接指定好传输的数据格式,就不需要再进行格式的转换了。

代码如下:

代码语言:javascript
复制
@Service("nettyServerHandler")
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	/** 空闲次数 */
	private int idle_count = 1;
	/** 发送次数 */
	private int count = 1;


	/**
	 * 建立连接时,发送一条消息
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
		UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0)
				.build();
		ctx.writeAndFlush(userMsg);
		super.channelActive(ctx);
	}

	/**
	 * 超时处理 如果5秒没有接受客户端的心跳,就触发; 如果超过两次,则直接关闭;
	 */
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
		if (obj instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) obj;
			if (IdleState.READER_IDLE.equals(event.state())) { // 如果读通道处于空闲状态,说明没有接收到心跳命令
				System.out.println("已经5秒没有接收到客户端的信息了");
				if (idle_count > 1) {
					System.out.println("关闭这个不活跃的channel");
					ctx.channel().close();
				}
				idle_count++;
			}
		} else {
			super.userEventTriggered(ctx, obj);
		}
	}

	/**
	 * 业务逻辑处理
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		System.out.println("第" + count + "次" + ",服务端接受的消息:" + msg);
		try {
			// 如果是protobuf类型的数据
		  if (msg instanceof UserMsg) {
				UserInfo.UserMsg userState = (UserInfo.UserMsg) msg;
				if (userState.getState() == 1) {
					System.out.println("客户端业务处理成功!");
				} else if(userState.getState() == 2){
					System.out.println("接受到客户端发送的心跳!");
				}else{
					System.out.println("未知命令!");
				}
			} else {
				System.out.println("未知数据!" + msg);
				return;
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			ReferenceCountUtil.release(msg);
		}
		count++;
	}

	/**
	 * 异常处理
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}

还有个服务端的启动类,之前是通过main方法直接启动, 不过这里改成了通过springBoot进行启动,差别不大。 代码如下:

代码语言:javascript
复制
@SpringBootApplication
public class NettyServerApp {

	public static void main(String[] args) {
		// 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
		ApplicationContext context = SpringApplication.run(NettyServerApp.class, args);
		NettyServer nettyServer = context.getBean(NettyServer.class);
		nettyServer.run();
	}

}

到这里服务端相应的代码就编写完毕了。

客户端

客户端这边的代码和服务端的很多地方都类似,我就不再过多细说了,主要将一些不同的代码拿出来简单的讲述下。 首先是客户端的主类,基本和服务端的差不多,也就是多了监听的端口和一个监听器(用来监听是否和服务端断开连接,用于重连)。 主要实现的代码逻辑如下:

代码语言:javascript
复制
	public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
		ChannelFuture f = null;
		try {
			if (bootstrap != null) {
				bootstrap.group(eventLoopGroup);
				bootstrap.channel(NioSocketChannel.class);
				bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
				bootstrap.handler(nettyClientFilter);
				bootstrap.remoteAddress(host, port);
				f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
					final EventLoop eventLoop = futureListener.channel().eventLoop();
					if (!futureListener.isSuccess()) {
						System.out.println("与服务端断开连接!在10s之后准备尝试重连!");
						eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
					}
				});
				if(initFalg){
					System.out.println("Netty客户端启动成功!");
					initFalg=false;
				}
				// 阻塞
				f.channel().closeFuture().sync();
			}
		} catch (Exception e) {
			System.out.println("客户端连接失败!"+e.getMessage());
		}
	}

注:监听器这块的实现用的是JDK1.8的写法。

客户端过滤其这块基本和服务端一直。不过需要注意的是,传输协议、编码和解码应该一致,还有心跳的读写时间应该小于服务端所设置的时间。 改动的代码如下:

代码语言:javascript
复制
	ChannelPipeline ph = ch.pipeline();
        /*
         * 解码和编码,应和服务端一致
         * */
        //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
        ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));

客户端的业务代码逻辑。 主要实现的几点逻辑是心跳按时发送以及解析服务发送的protobuf格式的数据。 这里比服务端多个个注解, 该注解Sharable主要是为了多个handler可以被多个channel安全地共享,也就是保证线程安全。 废话就不多说了,代码如下:

代码语言:javascript
复制
	@Service("nettyClientHandler")
	@ChannelHandler.Sharable
	public class NettyClientHandler extends ChannelInboundHandlerAdapter {
	@Autowired
	private NettyClient nettyClient;
	
	/** 循环次数 */
	private int fcount = 1;
	
	/**
	 * 建立连接时
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("建立连接时:" + new Date());
		ctx.fireChannelActive();
	}

	/**
	 * 关闭连接时
	 */
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("关闭连接时:" + new Date());
		final EventLoop eventLoop = ctx.channel().eventLoop();
		nettyClient.doConnect(new Bootstrap(), eventLoop);
		super.channelInactive(ctx);
	}

	/**
	 * 心跳请求处理 每4秒发送一次心跳请求;
	 *
	 */
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
		System.out.println("循环请求的时间:" + new Date() + ",次数" + fcount);
		if (obj instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) obj;
			if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果写通道处于空闲状态,就发送心跳命令
				UserMsg.Builder userState = UserMsg.newBuilder().setState(2);
				ctx.channel().writeAndFlush(userState);
				fcount++;
			}
		}
	}

	/**
	 * 业务逻辑处理
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		// 如果不是protobuf类型的数据
		if (!(msg instanceof UserMsg)) {
			System.out.println("未知数据!" + msg);
			return;
		}
		try {

			// 得到protobuf的数据
			UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg;
			// 进行相应的业务处理。。。
			// 这里就从简了,只是打印而已
			System.out.println(
					"客户端接受到的用户信息。编号:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年龄:" + userMsg.getAge());

			// 这里返回一个已经接受到数据的状态
			UserMsg.Builder userState = UserMsg.newBuilder().setState(1);
			ctx.writeAndFlush(userState);
			System.out.println("成功发送给服务端!");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			ReferenceCountUtil.release(msg);
		}
	 }
	}

那么到这里客户端的代码也编写完毕了。

功能测试

首先启动服务端,然后再启动客户端。 我们来看看结果是否如上述所说。

服务端输出结果:

代码语言:javascript
复制
服务端启动成功,端口是:9876
连接的客户端地址:/127.0.0.1:53319
第1次,服务端接受的消息:state: 1

客户端业务处理成功!
第2次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2

接受到客户端发送的心跳!

客户端输入结果:

代码语言:javascript
复制
Netty客户端启动成功!
建立连接时:Mon Jul 16 23:31:58 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:32:02 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:32:06 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:32:10 CST 2018,次数3
循环请求的时间:Mon Jul 16 23:32:14 CST 2018,次数4

通过打印信息可以看出如上述所说。

接下来我们再来看看客户端是否能够实现重连。 先启动客户端,再启动服务端。

客户端输入结果:

代码语言:javascript
复制
Netty客户端启动成功!
与服务端断开连接!在10s之后准备尝试重连!
客户端连接失败!AbstractChannel$CloseFuture@1fbaa3ac(incomplete)
建立连接时:Mon Jul 16 23:41:33 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:41:38 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:41:42 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:41:46 CST 2018,次数3

服务端输出结果:

代码语言:javascript
复制
服务端启动成功,端口是:9876
连接的客户端地址:/127.0.0.1:53492
第1次,服务端接受的消息:state: 1

客户端业务处理成功!
第2次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2

结果也如上述所说!

其它

关于SpringBoot整合Netty使用Protobuf进行数据传输到这里就结束了。 SpringBoot整合Netty使用Protobuf进行数据传输的项目工程地址: https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf

对了,也有不使用springBoot整合的Netty项目工程地址: https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-09-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员泥瓦匠 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Protobuf
    • 介绍
      • 使用
      • SpringBoot整合Netty
        • 开发准备
          • 代码编写
            • 客户端
          • 功能测试
          • 其它
          相关产品与服务
          云服务器
          云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档