结合实际的场景来把netty这个框架运行起来,一起去梳理这个过程,里面用到了nio和Reactor,nio实现了对应的API,但是它没有对多线程进行结合,大牛才设计出来reactor这个模式,来实现高性能的nio的编程,经过梳理才到了netty,reactor一定要搞懂。推送系统先别管是什么推送系统,先理解成一个客户端和服务端的一个程序,也先别管具体的业务场景,功能的属性比较弱,推送系统本身就是比较简单一个推送系统,里面也没有增删查改复杂。默认大家都理解长连接和短连接,网络请求的基本概念。
客户端可能通过自定义的协议,或者是app应用,需要跟推送服务器建立一个连接,推和拉的区别是推是服务器主动像客户端发起请求,往往这个技术很难实现的,主动推数据需要建立一条网络通道,服务器才可以完成推送,不说它也不知道是哪个客户端,一定是客户端主动和推送服务器建立了连接socket,一般的情况是通过拉的模式来完成推送,涉及到一些socket的技术点。归根到底就是数据交互,TCP连接的方式,客户端和服务端时间的交互。一个客户端跟推送服务器连接,10个,100个,1000个,百万个连接怎么办?不管程序如何优化始终是需要有上限的。有上限肯定是多台,推送系统是多台。网络请求如何形成集群呢?
push-server-dispatch 就类似网络的DNS的服务器,咱们平常登录网页的时候填写的是域名,通过DNS告诉我们域名所在的IP,直接访问对应的IP地址。
SocketServer
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class SocketServer {
public static void main(String[] args) throws IOException, Exception {
// server
ServerSocket serverSocket = new ServerSocket(9999);
// 获取新连接
while (true) {
final Socket accept = serverSocket.accept();
// accept.getOutputStream().write("推送实例");
InputStream inputStream = accept.getInputStream();
while (true) {
byte[] request = new byte[220];
int read = inputStream.read(request);
if (read == -1) {
break;
}
// 得到请求内容,解析,得到发送对象和发送内容
String content = new String(request);
if(content.getBytes().length > 220) {
// TODO
} else if(content.getBytes().length < 220) {
}
// 每次读取到的数据,不能够保证是一条完整的信息
System.out.println(content);
}
}
}
}
SocketClient
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
public class SocketClient {
public static void main(String[] args) throws Exception {
Socket socket = new Socket("localhost", 9999);
OutputStream outputStream = socket.getOutputStream();
// 消息长度固定为 220字节,包含有
// 1. 目标用户ID长度为10, 10 000 000 000 ~ 19 999 999 999
// 2. 消息内容字符串长度最多70。按一个汉字3字节,内容的最大长度为210字节
byte[] request = new byte[220];
byte[] userId = "10000000000".getBytes();
byte[] content = "加油!!学习netty推送服务!学习netty推送服务!学习netty推送服务!".getBytes();
System.arraycopy(userId, 0, request, 0, 10);
System.arraycopy(content, 0, request, 10, content.length);
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
outputStream.write(request);
} catch (IOException e) {
e.printStackTrace();
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
Thread.sleep(2000L); // 两秒后退出
socket.close();
}
}
请求、响应之后,关闭已经建立的TCP连接,下次请求再建立一个连接(浏览器)。里面有keeplive保持连接的特性。
2.长连接
请求、响应之后,不关闭TCP连接,多次请求,复用同一个连接。存在一个问题,数据链都是在一个通道里面,你的也好,我的也好,都在一个通道,请求过来响应过去,不管请求和响应都是数据包在流转,数据包流转。
为了避免频繁创建连接/释放连接带来的性能损耗,以及消息获取的实时性,采用长连接的形式。
交互中存在的问题
发送一条消息(12345),在发送一条消息(66666),犹豫网络卡顿了,或者是发送卡顿了,或者一些不明原因,接收到的消息是123 66666 45 ,服务端接收到的消息是1条,而不是发送了2次的2条消息,发送2条消息,理论上是2条数据,但是在实际的传输过程中,变成了1条数据。
当发送方发送数据的时候,操作系统底层,并不是直接通过网线就直接出去了,操作系统有个发送的缓冲区,接收方有个也有个缓冲区,接收方从缓存中读取数据。这里面就会涉及到一个粘包和拆包的问题。
1.粘包《发送方》
操作系统接收到发送缓冲区,可能会判断目前的数据太小了,等一下发,等第二个进入缓冲区的时候的才完成下一步的发送,这里面有个算法的【Nagle】,算法就是为了做这一件事,就是为了提高网络效率,而去做的事情,举个例子:一个人去坐车可能车不开,非要达到多少人了车才开。这是为了提高网络的性能。这就是沾在一起的情况。
2.拆包《发送方》
一下发送了,5,6条数据过来,数据量太大了,太多了一下发不完,发不了。5条数据拿出来一半来发,把这一半发过去,剩下一半,每个链接有自己专属的缓冲区,不会存在冲突。
3.粘包《接收方》
接收到进入接收缓冲区,没有立刻处理accapt了,马上下个也来了,缓存冲里面的数据有堆积,读的时候发现缓冲区里面的数据怎么这么多了。一个一个读,发现好几个沾在一起。
4.拆包《接收方》
最后读数据发现读出来了很多空的。因为数据被拆了。
上边这块就是网络编程肯定会出现这种情况,怎么样解决呢。消息发送是有协议的,比如Http协议,可以通过协议本身发现数据是否完整,比方说,数据被拆分了,被沾在一起了,这些都是可以通过数据的内容发现出来的。每次都判断接收到的是否满足数组的长度,
XNettyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
public class XNettyServer {
public static void main(String[] args) throws Exception {
// 1、 线程定义
// accept 处理连接的线程池
EventLoopGroup acceptGroup = new NioEventLoopGroup();
// read io 处理数据的线程池
EventLoopGroup readGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(acceptGroup, readGroup);
// 2、 选择TCP协议,NIO的实现方式
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 3、 职责链定义(请求收到后怎么处理)
ChannelPipeline pipeline = ch.pipeline();
// TODO 3.1 增加解码器
// pipeline.addLast(new XDecoder());
// TODO 3.2 打印出内容 handdler
pipeline.addLast(new XHandller());
}
});
// 4、 绑定端口
System.out.println("启动成功,端口 9999");
b.bind(9999).sync().channel().closeFuture().sync();
} finally {
acceptGroup.shutdownGracefully();
readGroup.shutdownGracefully();
}
}
}
XHandller
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 后续处理handdler
*/
@ChannelHandler.Sharable
public class XHandller extends ChannelInboundHandlerAdapter {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 输出 bytebuf
ByteBuf buf = (ByteBuf) msg;
byte[] content = new byte[buf.readableBytes()];
buf.readBytes(content);
System.out.println(Thread.currentThread()+ ": 最终打印"+new String(content));
((ByteBuf) msg).release(); // 引用计数减一
// ctx.fireChannelRead();
}
// 异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
XDecoder
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
// 编解码一定是根据协议~
public class XDecoder extends ByteToMessageDecoder {
static final int PACKET_SIZE = 220;
// 用来临时保留没有处理过的请求报文
ByteBuf tempMsg = Unpooled.buffer();
// in输入 --- 处理 --- out 输出
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println(Thread.currentThread()+"收到了一次数据包,长度是:" + in.readableBytes());
// in 请求的数据
// out 将粘在一起的报文拆分后的结果保留起来
// 1、 合并报文
ByteBuf message = null;
int tmpMsgSize = tempMsg.readableBytes();
// 如果暂存有上一次余下的请求报文,则合并
if (tmpMsgSize > 0) {
message = Unpooled.buffer();
message.writeBytes(tempMsg);
message.writeBytes(in);
System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
} else {
message = in;
}
// 2、 拆分报文
// 这个场景下,一个请求固定长度为3,可以根据长度来拆分
// i+1 i+1 i+1 i+1 i+1
// 不固定长度,需要应用层协议来约定 如何计算长度
// 在应用层中,根据单个报文的长度及特殊标记,来将报文进行拆分或合并
// dubbo rpc协议 = header(16) + body(不固定)
// header最后四个字节来标识body
// 长度 = 16 + body长度
// 0xda, 0xbb 魔数
int size = message.readableBytes();
int counter = size / PACKET_SIZE;
for (int i = 0; i < counter; i++) {
byte[] request = new byte[PACKET_SIZE];
// 每次从总的消息中读取220个字节的数据
message.readBytes(request);
// 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
out.add(Unpooled.copiedBuffer(request));
}
// 3、多余的报文存起来
// 第一个报文:i+ 暂存
// 第二个报文:1 与第一次
size = message.readableBytes();
if (size != 0) {
System.out.println("多余的数据长度:" + size);
// 剩下来的数据放到tempMsg暂存
tempMsg.clear();
tempMsg.writeBytes(message.readBytes(size));
}
}
}
自研过于复杂,采用合适的开源协议(XMPP/MQTT/WebSocket)
PS:下节一起完成下代码编写,websocket完成的推送代码实现。