前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring环境下使用Netty写Socket和Http详解

Spring环境下使用Netty写Socket和Http详解

作者头像
品茗IT
发布2019-09-12 09:49:42
1.8K0
发布2019-09-12 09:49:42
举报
文章被收录于专栏:品茗IT品茗IT

Spring环境下使用Netty写Socket和Http详解

官方主页

Spring

Netty

一、概述

Netty是目前最流行的由JBOSS提供的一个Java开源框架NIO框架,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

相比JDK原生NIO,Netty提供了相对十分简单易用的API,非常适合网络编程。Netty是完全基于NIO实现的,所以Netty是异步的。

Mina同样也是一款优秀的NIO框架,而且跟Netty是出自同一个人之手,但是Netty要晚一点,优点更多一些,想了解更多可以直接搜索mina和netty比较。

使用Netty,我们可以作为Socket服务器,也可以用来做Http服务器,这里,我们将这两种方式都详细介绍一下。

Git地址:

Gitee

品茗IT:提供在线快速构建Spring项目工具一站式Springboot项目生成

二、依赖Jar包

我们假定你已经建好了Spring环境。这里只引入Netty的jar包。

代码语言:javascript
复制
<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.17.Final</version>
</dependency>

完整依赖:

代码语言:javascript
复制
<?xml version="1.0"?>
<project
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>cn.pomit</groupId>
		<artifactId>SpringWork</artifactId>
		<version>0.0.1-SNAPSHOT</version>
	</parent>
	<artifactId>NettyNew</artifactId>
	<packaging>jar</packaging>
	<name>NettyNew</name>
	<url>http://maven.apache.org</url>
	<dependencies>
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.17.Final</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
		</dependency>
	</dependencies>
	<build>
		<finalName>NettyNew</finalName>
	</build>
</project>

父pom管理了所有依赖jar包的版本,地址:

https://www.pomit.cn/spring/SpringWork/pom.xml

三、Socket服务器和Http服务器的使用时的些许差异

可以说,没有差异,只是ChannelHandler不同而已。

因此,这里先说明一些公共的使用,然后再针对socket和http做区分。

四、Netty服务器配置

我们可以编写一个公共的服务器配置模板NettyServiceTemplate,这个模板既可以被tcp使用,也可以被http使用。

NettyServiceTemplate:

代码语言:javascript
复制
package cn.pomit.springwork.nettynew.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

public abstract class NettyServiceTemplate {
	static private EventLoopGroup bossGroup = new NioEventLoopGroup();
	static private EventLoopGroup workerGroup = new NioEventLoopGroup();

	abstract protected ChannelHandler[] createHandlers();

	abstract public int getPort();

	abstract public String getName();

	@PostConstruct
	public void start() throws Exception {
		ServerBootstrap b = new ServerBootstrap();
		b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					public void initChannel(SocketChannel ch) throws Exception {
						ChannelHandler[] handlers = createHandlers();
						for (ChannelHandler handler : handlers) {
							ch.pipeline().addLast(handler);
						}
					}
				}).option(ChannelOption.SO_BACKLOG, 128).option(ChannelOption.SO_REUSEADDR, true)
				.childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_REUSEADDR, true);

		ChannelFuture cf = b.bind(getPort()).await();
		// cf.channel().closeFuture().await();
		if (!cf.isSuccess()) {
			System.out.println("无法绑定端口:" + getPort());
			throw new Exception("无法绑定端口:" + getPort());
		}

		System.out.println("服务[{" + getName() + "}]启动完毕,监听端口[{" + getPort() + "}]");
	}

	@PreDestroy
	public void stop() {
		bossGroup.shutdownGracefully().syncUninterruptibly();
		workerGroup.shutdownGracefully().syncUninterruptibly();
		System.out.println("服务[{" + getName() + "}]关闭。");
	}
}

代码贴出来之后,我们还是要讲下里面的一些重点:

bossGroup和workerGroup就是为了建立线程池,这个自行百度。

ChannelHandler使用抽象方法由子类去实现,这样就可以根据不同的ChannelHandler实现不同的功能。

以上注释了

代码语言:javascript
复制
cf.channel().closeFuture().await();

这部分代码是阻塞了当前主线程一直等待结束,后面的代码就不能执行了。

两次await的不同:两次await虽然都是针对ChannelFuture的,但是两次的ChannelFuture不一样,打断点看了下,分别是:

代码语言:javascript
复制
AbstractBootstrap$PendingRegistrationPromise@177655b7(success)
AbstractChannel$CloseFuture@3c4f9fbb(incomplete)

原理就不细说了,我也不知道,毕竟不是专精这方面的。

五、启动Server

这里先说server的启动,下面会写具体server的实现。

5.1 直接启动

因为后面要写不同的Server,所以这里先把启动Server写出来,免得眼晕。

我们可以直接脱离Spring环境启动,如果不放心也可以放到线程池中启动:

TestApp:

代码语言:javascript
复制
package cn.pomit.springwork.nettynew;

import cn.pomit.springwork.nettynew.server.http.JsonHttpServer;
import cn.pomit.springwork.nettynew.server.tcp.StringTcpServer;

public class TestApp {
	
	public static void main(String args[]) throws ClassNotFoundException, InstantiationException, IllegalAccessException{		
		StringTcpServer stringTcpServerTest = new StringTcpServer(8088);
		JsonHttpServer jsonHttpServer = new JsonHttpServer(8880);
		try {
			stringTcpServerTest.start();
			jsonHttpServer.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
5.2 Spring环境下启动

Spring环境下,将每个Server加上@Service注解,因为父类有注解@PostConstruct,所以可以调用start方法启动server。

@Service注解的的service可以通过@Autowired注入需要使用的bean。

注意: 理论上ChannelHandler也可以作为bean的,但是netty要求ChannelHandler是每个线程一份的,就算指定bean的scope是原型也无效。当然可以使用@Sharable注解解决,@Sharable注解的handler可以被复用。但没特殊需要,尽量避免,使用了@Sharable,又会牵扯到线程安全问题,我们的bean是可以通过引用传入来使用的。

下面的xml只是将server作为bean做配置,并没使用bean注入等。

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
	xmlns:jms="http://www.springframework.org/schema/jms" xmlns:jaxws="http://cxf.apache.org/jaxws"
	xsi:schemaLocation="
                    http://www.springframework.org/schema/beans
                    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                    http://www.springframework.org/schema/tx 
                    http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
                    http://www.springframework.org/schema/aop 
                    http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
                    http://www.springframework.org/schema/context      
                    http://www.springframework.org/schema/context/spring-context-4.0.xsd
                    http://www.springframework.org/schema/cache 
                    http://www.springframework.org/schema/cache/spring-cache-4.0.xsd
                    http://www.springframework.org/schema/jms 
                    http://www.springframework.org/schema/jms/spring-jms-4.0.xsd   
                    http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd ">

	<context:annotation-config />
	<context:component-scan base-package="com.cff.springwork">
	</context:component-scan>
	
	<bean id="annotationPropertyConfigurerNettyNew"
		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="order" value="1" />
		<property name="ignoreUnresolvablePlaceholders" value="true" />
		<property name="locations">
			<list>
				<value>classpath:nettynew.properties</value>
			</list>
		</property>
	</bean>
	<bean id="stringTcpServer" class="cn.pomit.springwork.nettynew.server.tcp.StringTcpServer" init-method="start" destroy-method="stop">
		<property name="port" value="${nettynew.tcp.port}" />
		<property name="name" value="${nettynew.tcp.name}" />
	</bean>
	<bean id="jsonHttpServer" class="cn.pomit.springwork.nettynew.server.http.JsonHttpServer" init-method="start" destroy-method="stop">
		<property name="port" value="${nettynew.http.port}" />
		<property name="name" value="${nettynew.http.name}" />
	</bean>
</beans>

配置文件nettynew.properties:

代码语言:javascript
复制
nettynew.tcp.port=6666
nettynew.tcp.name=TcpServer
nettynew.http.port=6666
nettynew.http.name=HttpServer

六、Socket(Tcp)的Server配置

6.1 Socket(Tcp)的Server

如果我们单纯的使用tcp传递数据,比如String数据,我们可以这样定义一个Server:

StringTcpServer:

代码语言:javascript
复制
package cn.pomit.springwork.nettynew.server.tcp;

import cn.pomit.springwork.nettynew.handler.tcp.StringTcpServerHandler;
import cn.pomit.springwork.nettynew.server.NettyServiceTemplate;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class StringTcpServer extends NettyServiceTemplate {
	private int port = 8088;
	private String name = "String Server";

	// SpringBeanService springBeanService;

	public StringTcpServer(int port) {
		this.port = port;
	}

	// 可以启动server时将spring的bean传递进来。
	// public StringTcpServerTest(int port, SpringBeanService springBeanService)
	// {
	// this.port = port;
	// this.springBeanService = springBeanService;
	// }

	@Override
	protected ChannelHandler[] createHandlers() {
		return new ChannelHandler[] { 
				new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()),
				new StringDecoder(), 
				new StringEncoder(),

				// 如果想使用spring的bean,可以将springBeanService传递给StringTcpServerHandler,如new
				// StringTcpServerHandler(springBeanService)
				new StringTcpServerHandler() };
	}

	@Override
	public int getPort() {
		return port;
	}

	@Override
	public String getName() {
		return name;
	}

	public void setPort(int port) {
		this.port = port;
	}

	public void setName(String name) {
		this.name = name;
	}

}

这里,DelimiterBasedFrameDecoder定义了 以("\n")为结尾分割的 解码器。

然后是String解码与编码器

然后是自定义的一个处理器Handler。

6.2 Socket(Tcp)的ChannelHandler

StringTcpServerHandler:

代码语言:javascript
复制
package cn.pomit.springwork.nettynew.handler.tcp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class StringTcpServerHandler extends SimpleChannelInboundHandler<String> {
	String charset = "UTF-8";

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
		System.out.println("内容:" + msg);
		ctx.writeAndFlush("返回内容:" + msg);
	}
}

七、HTTP 的Server配置

7.1 HTTP 的Server

如果我们想使用http传输json数据,我们可以这样玩:

JsonHttpServer:

代码语言:javascript
复制
package cn.pomit.springwork.nettynew.server.http;

import org.springframework.stereotype.Service;

import cn.pomit.springwork.nettynew.coder.http.HttpMsgRequestDecoder;
import cn.pomit.springwork.nettynew.coder.http.HttpMsgResponseEncoder;
import cn.pomit.springwork.nettynew.handler.http.JsonHttpServerHandler;
import cn.pomit.springwork.nettynew.server.NettyServiceTemplate;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;

public class JsonHttpServer extends NettyServiceTemplate {
	int port = 8888;
	String name = "Json Server";
	private String charset = "UTF-8";
	private int timeout = 60;
	
	public JsonHttpServer(int port) {
		this.port = port;
	}

	@Override
	protected ChannelHandler[] createHandlers() {
		return new ChannelHandler[] { 
				new HttpResponseEncoder(), 
				new HttpRequestDecoder(),
				new HttpObjectAggregator(1048576), 
				new HttpMsgResponseEncoder(charset, timeout),
				new HttpMsgRequestDecoder(charset), 
				new JsonHttpServerHandler() };
	}

	@Override
	public int getPort() {
		return port;
	}

	@Override
	public String getName() {
		return name;
	}

}

其中,HttpResponseEncoder和HttpRequestDecoder是编码解码器。

HttpObjectAggregator将请求合并,如果没有,同一个http请求我们可能需要处理两次。

HttpMsgResponseEncoder和HttpMsgRequestDecoder是自定义的通用http处理方式。JsonHttpServerHandler是自定义的json处理器。

我们可以自定义一个http实体HttpResponseMsg,方便处理http响应等:

7.2 HTTP 的ChannelHandler

JsonHttpServerHandler:

代码语言:javascript
复制
package cn.pomit.springwork.nettynew.handler.http;

import cn.pomit.springwork.nettynew.model.http.HttpResponseMsg;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class JsonHttpServerHandler extends SimpleChannelInboundHandler<String> {
	String charset = "UTF-8";

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
		System.out.println("post内容:" + msg);
		HttpResponseMsg hrm = new HttpResponseMsg();
    	hrm.setResType(HttpResponseMsg.ResType.JSON.getValue());
    	hrm.setResCode(HttpResponseMsg.ResCode.OK.getValue());
    	hrm.setMessage(msg);
    	ctx.writeAndFlush(hrm);
	}

}
7.3 HTTP 的实体

HttpResponseMsg:

代码语言:javascript
复制
package cn.pomit.springwork.nettynew.model.http;

public class HttpResponseMsg {
	public enum ResType {  
		HTML("text/html"),
	    JSON("application/json"),
	    JS("application/javascript"),
	    PNG("image/png"),
	    JPG("image/jpg");
	    String value = null;
	    ResType(String value) {
	        this.value = value;
	    }
	    public String getValue() {
	        return value;
	    }
	}  
	
	public enum ResCode {  
		NOT_FOUND(404),
	    OK(200),
	    INTERNAL_ERROR(500);
	    int value = 200;
	    ResCode(int value) {
	        this.value = value;
	    }
	    public int getValue() {
	        return value;
	    }
	}  
	public int resCode;
	
	public String resType;
	
	public String message;

	public int getResCode() {
		return resCode;
	}

	public void setResCode(int resCode) {
		this.resCode = resCode;
	}

	public String getResType() {
		return resType;
	}

	public void setResType(String resType) {
		this.resType = resType;
	}

	public String getMessage() {
		return message;
	}

	public void setMessage(String message) {
		this.message = message;
	}
	
}
7.4 HTTP 的自定义的Encoder和Decoder

HttpMsgResponseEncoder:

代码语言:javascript
复制
package cn.pomit.springwork.nettynew.coder.http;

import java.util.List;

import cn.pomit.springwork.nettynew.model.http.HttpResponseMsg;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;

public class HttpMsgResponseEncoder extends MessageToMessageEncoder<HttpResponseMsg> {
	private String charset;
	private int timeout;

	public HttpMsgResponseEncoder(String charset, int timeout) {
		super();
		this.charset = charset;
		this.timeout = timeout;
	}

	@Override
	protected void encode(ChannelHandlerContext ctx, HttpResponseMsg message, List<Object> out) {
		try {
			DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(message.getResCode()),
					Unpooled.wrappedBuffer(message.getMessage().getBytes(charset)));
			response.headers().set(HttpHeaderNames.CONTENT_TYPE, message.getResType()+";charset=" + charset);
			response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());

			// 强制keep-alive
			response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
			response.headers().set("Keep-Alive", "timeout=" + timeout);

			out.add(response);
		} catch (Exception e) {
			e.printStackTrace();
		}

	}
}

HttpMsgRequestDecoder:

代码语言:javascript
复制
package cn.pomit.springwork.nettynew.coder.http;

import java.nio.charset.Charset;
import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObject;

public class HttpMsgRequestDecoder extends MessageToMessageDecoder<HttpObject>{
	private String charset;

	public HttpMsgRequestDecoder(String charset) {
		super();
		this.charset = charset;
	}

	@Override
	protected void decode(ChannelHandlerContext ctx, HttpObject in,
			List<Object> out) throws Exception {
		FullHttpRequest request = (FullHttpRequest) in;

		ByteBuf buf = request.content();
		String jsonStr = buf.toString(Charset.forName(charset));
		out.add(jsonStr);
	}
}

至此,以上tcp和http处理方式已完成。

快速构建项目

Spring组件化构建

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-05-20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spring环境下使用Netty写Socket和Http详解
    • 官方主页
      • 一、概述
        • 二、依赖Jar包
          • 三、Socket服务器和Http服务器的使用时的些许差异
            • 四、Netty服务器配置
              • 五、启动Server
                • 5.1 直接启动
                • 5.2 Spring环境下启动
              • 六、Socket(Tcp)的Server配置
                • 6.1 Socket(Tcp)的Server
                • 6.2 Socket(Tcp)的ChannelHandler
              • 七、HTTP 的Server配置
                • 7.1 HTTP 的Server
                • 7.2 HTTP 的ChannelHandler
                • 7.3 HTTP 的实体
                • 7.4 HTTP 的自定义的Encoder和Decoder
              • 快速构建项目
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档