from:https://juejin.im/post/5c97ae12e51d45580b681b0b
一直想写一篇关于im即时通讯分享的文章,无奈工作太忙,很难抽出时间。今天终于从公司离职了,打算好好休息几天再重新找工作,趁时间空闲,决定静下心来写一篇文章,毕竟从前辈那里学到了很多东西。工作了五年半,这三四年来一直在做社交相关的项目,有直播、即时通讯、短视频分享、社区论坛等产品,深知即时通讯技术在一个项目中的重要性,本着开源分享的精神,也趁这机会总结一下,所以写下这篇文章,文中有不对之处欢迎批评与指正。
本文将介绍:
本想花一部分时间介绍一下利用AIDL实现多进程通信,提升应用保活率,无奈这种方法在目前大部分Android新版本上已失效,而且也比较复杂,所以考虑再三,把AIDL这一部分去掉,需要了解的童鞋可以私信我。先来看看效果,由于Gif超过微信限制,请大家移步查看:
https://user-gold-cdn.xitu.io/2019/4/22/16a42c85e653b88c?imageslim
不想看文章的同学可以直接移步到Github fork源码:github地址(https://github.com/FreddyChen/NettyChat)。接下来,让我们进入正题。
这里需要简单解释一下,TCP/UDP/WebSocket的区别。这里就很好地解释了TCP/UDP的优缺点和区别(https://www.cnblogs.com/Leonardo-li/p/8206945.html),以及适用场景,简单地总结一下:
至于WebSocket,后续可能会专门写一篇文章来介绍。综上所述,决定采用TCP协议。
对于App网络传输协议,我们比较常见的、可选的,有三种,分别是json/xml/protobuf,老规矩,我们先分别来看看这三种格式的优缺点:
在一个需要大量的数据传输的场景中,如果数据量很大,那么选择protobuf可以明显的减少数据量,减少网络IO,从而减少网络传输所消耗的时间。考虑到作为一个主打社交的产品,消息数据量会非常大,同时为了节约流量,所以采用protobuf是一个不错的选择。
首先,我们来了解一下,Netty到底是个什么东西。网络上找到的介绍:Netty是由JBOSS提供的基于Java NIO的开源框架,Netty提供异步非阻塞、事件驱动、高性能、高可靠、高可定制性的网络应用程序和工具,可用于开发服务端和客户端。
Mina其实跟Netty很像,大部分API都相同,因为是同一个作者开发的。但感觉Mina没有Netty成熟,在使用Netty的过程中,出了问题很轻易地可以找到解决方案,所以,Netty是一个不错的选择。
好了,废话不多说,直接开始吧。
首先,我们新建一个Project,在Project里面再新建一个Android Library,Module名称暂且叫做im_lib,如图所示:
然后,分析一下我们的消息结构,每条消息应该会有一个消息唯一id,发送者id,接收者id,消息类型,发送时间等,经过分析,整理出一个通用的消息类型,如下:
根据上述所示,我整理了一个思维导图,方便大家参考:
这是基础部分,当然,大家也可以根据自己需要自定义比较适合自己的消息结构。
我们根据自定义的消息类型来编写proto文件。
然后执行命令(我用的mac,windows命令应该也差不多):
然后就会看到,在和proto文件同级目录下,会生成一个java类,这个就是我们需要用到的东东:
我们打开瞄一眼:
东西比较多,不用去管,这是google为我们生成的protobuf类,直接用就行,怎么用呢?直接用这个类文件,拷到我们开始指定的项目包路径下就可以啦:
加依赖后,可以看到,MessageProtobuf类文件已经没有报错了,顺便把netty的jar包也导进来一下,还有fastjson的:
建议用netty-all-x.x.xx.Final的jar包,后续熟悉了,可以用精简的jar包。
至此,准备工作已结束,下面,我们来编写java代码,实现即时通讯的功能。
为什么需要封装呢?说白了,就是为了解耦,为了方便日后切换到不同框架实现,而无需到处修改调用的地方。
举个栗子,比如Android早期比较流行的图片加载框架是Universal ImageLoader,后期因为某些原因,原作者停止了维护该项目,目前比较流行的图片加载框架是Picasso或Glide,因为图片加载功能可能调用的地方非常多,如果不作一些封装,早期使用了Universal ImageLoader的话,现在需要切换到Glide,那改动量将非常非常大,而且还很有可能会有遗漏,风险度非常高。
那么,有什么解决方案呢?
很简单,我们可以用工厂设计模式进行一些封装,工厂模式有三种:简单工厂模式、抽象工厂模式、工厂方法模式。在这里,我采用工厂方法模式进行封装,具体区别,可以参见:设计模式相关资料。
我们分析一下,ims(IM Service,下文简称ims)应该是有初始化、建立连接、重连、关闭连接、释放资源、判断长连接是否关闭、发送消息等功能,基于上述分析,我们可以进行一个接口抽象:
public interface IMSClientInterface {
/**
* 初始化
*
* @param serverUrlList 服务器地址列表
* @param listener 与应用层交互的listener
* @param callback ims连接状态回调
*/
void init(Vector<String> serverUrlList, OnEventListener listener, IMSConnectStatusCallback callback);
/**
* 重置连接,也就是重连
* 首次连接也可认为是重连
*/
void resetConnect();
/**
* 重置连接,也就是重连
* 首次连接也可认为是重连
* 重载
*
* @param isFirst 是否首次连接
*/
void resetConnect(boolean isFirst);
/**
* 关闭连接,同时释放资源
*/
void close();
/**
* 标识ims是否已关闭
*
* @return
*/
boolean isClosed();
/**
* 发送消息
*
* @param msg
*/
void sendMsg(MessageProtobuf.Msg msg);
/**
* 发送消息
* 重载
*
* @param msg
* @param isJoinTimeoutManager 是否加入发送超时管理器
*/
void sendMsg(MessageProtobuf.Msg msg, boolean isJoinTimeoutManager);
/**
* 获取重连间隔时长
*
* @return
*/
int getReconnectInterval();
/**
* 获取连接超时时长
*
* @return
*/
int getConnectTimeout();
/**
* 获取应用在前台时心跳间隔时间
*
* @return
*/
int getForegroundHeartbeatInterval();
/**
* 获取应用在后台时心跳间隔时间
*
* @return
*/
int getBackgroundHeartbeatInterval();
/**
* 设置app前后台状态
*
* @param appStatus
*/
void setAppStatus(int appStatus);
/**
* 获取由应用层构造的握手消息
*
* @return
*/
MessageProtobuf.Msg getHandshakeMsg();
/**
* 获取由应用层构造的心跳消息
*
* @return
*/
MessageProtobuf.Msg getHeartbeatMsg();
/**
* 获取应用层消息发送状态报告消息类型
*
* @return
*/
int getServerSentReportMsgType();
/**
* 获取应用层消息接收状态报告消息类型
*
* @return
*/
int getClientReceivedReportMsgType();
/**
* 获取应用层消息发送超时重发次数
*
* @return
*/
int getResendCount();
/**
* 获取应用层消息发送超时重发间隔
*
* @return
*/
int getResendInterval();
/**
* 获取消息转发器
*
* @return
*/
MsgDispatcher getMsgDispatcher();
/**
* 获取消息发送超时定时器
*
* @return
*/
MsgTimeoutTimerManager getMsgTimeoutTimerManager();
}
OnEventListener是与应用层交互的listener:
IMConnectStatusCallback是ims连接状态回调监听器:
然后写一个Netty tcp实现类:
接下来,写一个工厂方法:
封装部分到此结束,接下来,就是实现了。
其中,MsgDispatcher是消息转发器,负责将接收到的消息转发到应用层:
ExecutorServiceFactory是线程池工厂,负责调度重连及心跳线程:
resetConnect()方法作为连接的起点,首次连接以及重连逻辑,都是在resetConnect()方法进行逻辑处理,我们来瞄一眼:
可以看到,非首次进行连接,也就是连接一个周期失败后,进行重连时,会先让线程休眠一段时间,因为这个时候也许网络状况不太好,接着,判断ims是否已关闭或者是否正在进行重连操作,由于重连操作是在子线程执行,为了避免重复重连,需要进行一些并发处理。开始重连任务后,分四个步骤执行:
ResetConnectRunnable是重连任务,核心的重连逻辑都放到这里执行:
toServer()是真正连接服务器的地方:
initBootstrap()是初始化Netty Bootstrap:
注:NioEventLoopGroup线程数设置为4,可以满足QPS是一百多万的情况了,至于应用如果需要承受上千万上亿流量的,需要另外调整线程数。参考自:netty实战之百万级流量NioEventLoopGroup线程数配置
接着,我们来看看TCPChannelInitializerHanlder:
其中,ProtobufEncoder和ProtobufDecoder是添加对protobuf的支持,LoginAuthRespHandler是接收到服务端握手认证消息响应的处理handler,HeartbeatRespHandler是接收到服务端心跳消息响应的处理handler,TCPReadHandler是接收到服务端其它消息后的处理handler,先不去管,我们重点来分析下LengthFieldPrepender和LengthFieldBasedFrameDecoder,这就需要引申到TCP的拆包与粘包啦。
引用网上一张图片来解释一下在TCP出现拆包、粘包以及正常状态下的三种情况,如侵请联系我删除:
了解了TCP出现拆包/粘包的原因,那么,如何解决呢?通常来说,有以下四种解决方式:
netty针对以上四种场景,给我们封装了以下四种对应的解码器:
我们用到的就是LengthFieldBasedFrameDecoder自定义长度消息解码器,同时配合LengthFieldPrepender编码器使用,关于参数配置,建议参考netty--最通用TCP黏包解决方案:LengthFieldBasedFrameDecoder和LengthFieldPrepender这篇文章,讲解得比较细致。我们配置的是消息头长度为2个字节,所以消息包的最大长度需要小于65536个字节,netty会把消息内容长度存放消息头的字段里,接收方可以根据消息头的字段拿到此条消息总长度,当然,netty提供的LengthFieldBasedFrameDecoder已经封装好了处理逻辑,我们只需要配置lengthFieldOffset、lengthFieldLength、lengthAdjustment、initialBytesToStrip即可,这样就可以解决TCP的拆包与粘包,这也就是netty相较于原生nio的便捷性,原生nio需要自己处理拆包/粘包等问题。
接着,我们来看看LoginAuthHandler和HeartbeatRespHandler:
可以看到,当接收到服务端握手消息响应后,会从扩展字段取出status,如果status=1,则代表握手成功,这个时候就先主动向服务端发送一条心跳消息,然后利用Netty的IdleStateHandler读写超时机制,定期向服务端发送心跳消息,维持长连接,以及检测长连接是否还存在等。
这个就比较简单,收到心跳消息响应,无需任务处理,直接打印一下方便我们分析即可。
心跳包是定期发送,也可以自己定义一个周期,比如Android微信智能心跳方案,为了简单,此处规定应用在前台时,8秒发送一个心跳包,切换到后台时,30秒发送一次,根据自己的实际情况修改一下即可。心跳包用于维持长连接以及检测长连接是否断开等。
接着,我们利用Netty的读写超时机制,来实现一个心跳消息管理handler:
可以看到,利用userEventTriggered()方法回调,通过IdleState类型,可以判断读超时/写超时/读写超时,这个在添加IdleStateHandler时可以配置,下面会贴上代码。首先我们可以在READER_IDLE事件里,检测是否在规定时间内没有收到服务端心跳包响应,如果是,那就触发重连操作。在WRITER_IDEL事件可以检测客户端是否在规定时间内没有向服务端发送心跳包,如果是,那就主动发送一个心跳包。发送心跳包是在子线程中执行,我们可以利用之前写的work线程池进行线程管理。 addHeartbeatHandler()代码如下:
从图上可看到,在IdleStateHandler里,配置的读超时为心跳间隔时长的3倍,也就是3次心跳没有响应时,则认为长连接已断开,触发重连操作。写超时则为心跳间隔时长,意味着每隔heartbeatInterval会发送一个心跳包。读写超时没用到,所以配置为0。
onConnectStatusCallback(int connectStatus)为连接状态回调,以及一些公共逻辑处理:
连接成功后,立即发送一条握手消息,再次梳理一下整体流程:
看看TCPReadHandler收到消息是怎么处理的:
可以看到,在channelInactive()及exceptionCaught()方法都触发了重连,channelInactive()方法在当链路断开时会调用,exceptionCaught()方法在当出现异常时会触发,另外,还有诸如channelUnregistered()、channelReadComplete()等方法可以重写,在这里就不贴了,相信聪明的你一眼就能看出方法的作用。 我们仔细看一下channelRead()方法的逻辑,在if判断里,先判断消息类型,如果是服务端返回的消息发送状态报告类型,则判断消息是否发送成功,如果发送成功,从超时管理器中移除,这个超时管理器是干嘛的呢?下面讲到消息重发机制的时候会详细地讲。在else里,收到其他消息后,会立马给服务端返回一个消息接收状态报告,告诉服务端,这条消息我已经收到了,这个动作,对于后续需要做的离线消息会有作用。如果不需要支持离线消息功能,这一步可以省略。最后,调用消息转发器,把接收到的消息转发到应用层即可。
代码写了这么多,我们先来看看运行后的效果,先贴上缺失的消息发送代码及ims关闭代码以及一些默认配置项的代码。 发送消息:
关闭ims:
ims默认配置:
还有,应用层实现的ims client启动器:
由于代码有点多,不太方便全部贴上,如果有兴趣可以下载demo体验。