首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JAVA服务器推送功能设计,消息方法总结

JAVA服务器推送功能设计,消息方法总结

作者头像
IT架构圈
发布2021-01-04 10:22:43
2K0
发布2021-01-04 10:22:43
举报
文章被收录于专栏:IT架构圈IT架构圈

结合实际的场景来把netty这个框架运行起来,一起去梳理这个过程,里面用到了nio和Reactor,nio实现了对应的API,但是它没有对多线程进行结合,大牛才设计出来reactor这个模式,来实现高性能的nio的编程,经过梳理才到了netty,reactor一定要搞懂。推送系统先别管是什么推送系统,先理解成一个客户端和服务端的一个程序,也先别管具体的业务场景,功能的属性比较弱,推送系统本身就是比较简单一个推送系统,里面也没有增删查改复杂。默认大家都理解长连接和短连接,网络请求的基本概念。

(一)设计和思路
  • ① 介绍

客户端可能通过自定义的协议,或者是app应用,需要跟推送服务器建立一个连接,推和拉的区别是推是服务器主动像客户端发起请求,往往这个技术很难实现的,主动推数据需要建立一条网络通道,服务器才可以完成推送,不说它也不知道是哪个客户端,一定是客户端主动和推送服务器建立了连接socket,一般的情况是通过拉的模式来完成推送,涉及到一些socket的技术点。归根到底就是数据交互,TCP连接的方式,客户端和服务端时间的交互。一个客户端跟推送服务器连接,10个,100个,1000个,百万个连接怎么办?不管程序如何优化始终是需要有上限的。有上限肯定是多台,推送系统是多台。网络请求如何形成集群呢?

  1. 推送API对外开放的,个推,极光,飞鸽,移动互联网应用开发,对外提供一套API,统一的。用户注册发短信,与农户注册发通知,订单中心,聊天系统。业务系统都是统一的。
  2. 消息不可能直接达到推送系统,中间需要存在一个消息队列,消息队列进行存储,用到一个中间件(消息中间件或者数据库)
  3. 推送系统和消息队列产生信息的交互,在将对应的消息推送给指定的client。如果是个网络连接就意味着,当前这个用户只会给一个系统建立连接,目前推送系统只有5台。推送系统是集群的。
  4. 所以clients要跟推送系统中间添加一个负载均衡,这种中心化的nginx不是需要考虑的,正常的系统是中间添加一个push-server-dispatch 对用户接口(分派接口的),根据请求返回消息推送服务器的地址。
  • ② 客户端和推送系统之前的push-server-dispatch

push-server-dispatch 就类似网络的DNS的服务器,咱们平常登录网页的时候填写的是域名,通过DNS告诉我们域名所在的IP,直接访问对应的IP地址。

  1. 推送服务注册更新在push-server-dispatch
  2. userId发送请求给push-server-dispatch
  3. 推送系统返回一个服务地址
  4. client 跟推送服务建立连接(中间不会存在nginx来进行通信的,如果中间添加了负载均衡,还需要在通过负载均衡确定某一个推送服务,量大的话都无法使用,只能进行1VS1的)肯定有老铁说zookeeper,这里先不说zookeeper的事情。
  • ③ 简单的数据包的扭转

SocketServer

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class SocketServer {
    public static void main(String[] args) throws IOException, Exception {
        // server
        ServerSocket serverSocket = new ServerSocket(9999);

        // 获取新连接
        while (true) {
            final Socket accept = serverSocket.accept();
            // accept.getOutputStream().write("推送实例");
            InputStream inputStream = accept.getInputStream();
            while (true) {
                byte[] request = new byte[220];
                int read = inputStream.read(request);
                if (read == -1) {
                    break;
                }
                // 得到请求内容,解析,得到发送对象和发送内容
                String content = new String(request);
                if(content.getBytes().length > 220) {
                   // TODO
                } else if(content.getBytes().length < 220) {

                }
                // 每次读取到的数据,不能够保证是一条完整的信息
                System.out.println(content);
            }
        }
    }
}

SocketClient


import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;

public class SocketClient {
    public static void main(String[] args) throws Exception {
        Socket socket = new Socket("localhost", 9999);
        OutputStream outputStream = socket.getOutputStream();

        // 消息长度固定为 220字节,包含有
        // 1. 目标用户ID长度为10, 10 000 000 000 ~ 19 999 999 999
        // 2. 消息内容字符串长度最多70。按一个汉字3字节,内容的最大长度为210字节
        byte[] request = new byte[220];
        byte[] userId = "10000000000".getBytes();
        byte[] content = "加油!!学习netty推送服务!学习netty推送服务!学习netty推送服务!".getBytes();
        System.arraycopy(userId, 0, request, 0, 10);
        System.arraycopy(content, 0, request, 10, content.length);

        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    outputStream.write(request);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            }).start();
        }

        countDownLatch.await();
        Thread.sleep(2000L); // 两秒后退出
        socket.close();
    }
}
  • ④ 连接的方式 1.短连接

请求、响应之后,关闭已经建立的TCP连接,下次请求再建立一个连接(浏览器)。里面有keeplive保持连接的特性。

2.长连接

请求、响应之后,不关闭TCP连接,多次请求,复用同一个连接。存在一个问题,数据链都是在一个通道里面,你的也好,我的也好,都在一个通道,请求过来响应过去,不管请求和响应都是数据包在流转,数据包流转。

为了避免频繁创建连接/释放连接带来的性能损耗,以及消息获取的实时性,采用长连接的形式。

交互中存在的问题

发送一条消息(12345),在发送一条消息(66666),犹豫网络卡顿了,或者是发送卡顿了,或者一些不明原因,接收到的消息是123 66666 45 ,服务端接收到的消息是1条,而不是发送了2次的2条消息,发送2条消息,理论上是2条数据,但是在实际的传输过程中,变成了1条数据。

  • ⑤ 发送流程

当发送方发送数据的时候,操作系统底层,并不是直接通过网线就直接出去了,操作系统有个发送的缓冲区,接收方有个也有个缓冲区,接收方从缓存中读取数据。这里面就会涉及到一个粘包和拆包的问题。

1.粘包《发送方》

操作系统接收到发送缓冲区,可能会判断目前的数据太小了,等一下发,等第二个进入缓冲区的时候的才完成下一步的发送,这里面有个算法的【Nagle】,算法就是为了做这一件事,就是为了提高网络效率,而去做的事情,举个例子:一个人去坐车可能车不开,非要达到多少人了车才开。这是为了提高网络的性能。这就是沾在一起的情况。

2.拆包《发送方》

一下发送了,5,6条数据过来,数据量太大了,太多了一下发不完,发不了。5条数据拿出来一半来发,把这一半发过去,剩下一半,每个链接有自己专属的缓冲区,不会存在冲突。

3.粘包《接收方》

接收到进入接收缓冲区,没有立刻处理accapt了,马上下个也来了,缓存冲里面的数据有堆积,读的时候发现缓冲区里面的数据怎么这么多了。一个一个读,发现好几个沾在一起。

4.拆包《接收方》

最后读数据发现读出来了很多空的。因为数据被拆了。

上边这块就是网络编程肯定会出现这种情况,怎么样解决呢。消息发送是有协议的,比如Http协议,可以通过协议本身发现数据是否完整,比方说,数据被拆分了,被沾在一起了,这些都是可以通过数据的内容发现出来的。每次都判断接收到的是否满足数组的长度,

(二)netty如何解决上边的粘包拆包的问题
  • ① 示例展示

XNettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;

public class XNettyServer {
    public static void main(String[] args) throws Exception {
        // 1、 线程定义
        // accept 处理连接的线程池
        EventLoopGroup acceptGroup = new NioEventLoopGroup();
        // read io 处理数据的线程池
        EventLoopGroup readGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptGroup, readGroup);
            // 2、 选择TCP协议,NIO的实现方式
            b.channel(NioServerSocketChannel.class);
            b.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 3、 职责链定义(请求收到后怎么处理)
                    ChannelPipeline pipeline = ch.pipeline();
                    // TODO 3.1 增加解码器
                    // pipeline.addLast(new XDecoder());
                    // TODO 3.2 打印出内容 handdler
                    pipeline.addLast(new XHandller());
                }
            });
            // 4、 绑定端口
            System.out.println("启动成功,端口 9999");
            b.bind(9999).sync().channel().closeFuture().sync();
        } finally {
            acceptGroup.shutdownGracefully();
            readGroup.shutdownGracefully();
        }
    }
}

XHandller

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 后续处理handdler
 */
@ChannelHandler.Sharable
public class XHandller extends ChannelInboundHandlerAdapter {

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 输出 bytebuf
        ByteBuf buf = (ByteBuf) msg;
        byte[] content = new byte[buf.readableBytes()];
        buf.readBytes(content);
        System.out.println(Thread.currentThread()+ ": 最终打印"+new String(content));
        ((ByteBuf) msg).release(); // 引用计数减一
        // ctx.fireChannelRead();
    }

    // 异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

XDecoder

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

// 编解码一定是根据协议~
public class XDecoder extends ByteToMessageDecoder {
    static final int PACKET_SIZE = 220;

    // 用来临时保留没有处理过的请求报文
    ByteBuf tempMsg = Unpooled.buffer();

    // in输入   --- 处理  --- out 输出
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println(Thread.currentThread()+"收到了一次数据包,长度是:" + in.readableBytes());
        // in 请求的数据
        // out 将粘在一起的报文拆分后的结果保留起来

        // 1、 合并报文
        ByteBuf message = null;
        int tmpMsgSize = tempMsg.readableBytes();
        // 如果暂存有上一次余下的请求报文,则合并
        if (tmpMsgSize > 0) {
            message = Unpooled.buffer();
            message.writeBytes(tempMsg);
            message.writeBytes(in);
            System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
        } else {
            message = in;
        }

        // 2、 拆分报文
        // 这个场景下,一个请求固定长度为3,可以根据长度来拆分
        // i+1 i+1 i+1 i+1 i+1
        // 不固定长度,需要应用层协议来约定 如何计算长度
        // 在应用层中,根据单个报文的长度及特殊标记,来将报文进行拆分或合并
        // dubbo rpc协议 = header(16) + body(不固定)
        // header最后四个字节来标识body
        // 长度 = 16 + body长度
        // 0xda, 0xbb 魔数


        int size = message.readableBytes();
        int counter = size / PACKET_SIZE;
        for (int i = 0; i < counter; i++) {
            byte[] request = new byte[PACKET_SIZE];
            // 每次从总的消息中读取220个字节的数据
            message.readBytes(request);

            // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
            out.add(Unpooled.copiedBuffer(request));
        }

        // 3、多余的报文存起来
        // 第一个报文:i+  暂存
        // 第二个报文:1 与第一次
        size = message.readableBytes();
        if (size != 0) {
            System.out.println("多余的数据长度:" + size);
            // 剩下来的数据放到tempMsg暂存
            tempMsg.clear();
            tempMsg.writeBytes(message.readBytes(size));
        }
    }

}
 
  • ② 流程梳理
  1. netty读取数据触发,2个请求数据被合并了
  2. 数据解析(编解码),数据规范,http或者自己写的都是可以解析出来,解析过将合并变成一个一个的请求。请求拆分,根据协议,编解码都是自己来写的。decode将输入的数据进行处理,在输出到后面的环节。存在一个等待的过程长度不够,先保存下来。够了再处理。
  3. 交给Xhandler来进行处理。

自研过于复杂,采用合适的开源协议(XMPP/MQTT/WebSocket)

PS:下节一起完成下代码编写,websocket完成的推送代码实现。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程坑太多 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • (一)设计和思路
  • (二)netty如何解决上边的粘包拆包的问题
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档