resin4.0.44+websocket 实现私信功能服务端消息推送

最近项目开发中,碰到一个新的开发需求——私信功能。

项目要求:类似微博中发送私信功能,给对方发送一条私信消息,如果对方在线就立马接受到消息提示,并显示到页面上。如果对方不在线,则下次登录以后,显示消息提示。

技术选择:websocket也是目前比较流行的接收服务器端消息的一门HTML5技术,我们服务器采用的是resin4.0+,所以综合考虑采用基于resin的websocket形式实现该功能。

软件版本:resin4.0.44、websocket、SpringMVC、redis 这里着重强调下,项目架构是SpringMVC结构,这里就不在赘述Spring相关的配置,主要介绍下resin下的websocket如何实现消息推送。 第一步: 新建websocket数据封装Bean,用来保存websocket+user对应信息。

package com.gochina.tc.websocket;

import com.caucho.websocket.WebSocketContext;

/**
 * websocket封装bean
 * @author hwy
 *
 */
public class WebSocketBean {

    private String userCode;//用户的code
    private int hashCode;//websocket的hashCode
    private WebSocketContext webSocketContext;//websocketContext

    public WebSocketBean(String userCode, int hashCode,
            WebSocketContext webSocketContext) {
        super();
        this.userCode = userCode;
        this.hashCode = hashCode;
        this.webSocketContext = webSocketContext;
    }

    public String getUserCode() {
        return userCode;
    }

    public void setUserCode(String userCode) {
        this.userCode = userCode;
    }

    public int getHashCode() {
        return hashCode;
    }

    public void setHashCode(int hashCode) {
        this.hashCode = hashCode;
    }

    public WebSocketContext getWebSocketContext() {
        return webSocketContext;
    }

    public void setWebSocketContext(WebSocketContext webSocketContext) {
        this.webSocketContext = webSocketContext;
    }

}

第二步: 新建MyWebSocketServlet,用来连接前端websocket与下边的listener建立关系的入口统一配置,将所有接受到的用户的websocket请求暂存起来。

package com.gochina.tc.websocket;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;

import com.caucho.websocket.WebSocketContext;
import com.caucho.websocket.WebSocketListener;
import com.caucho.websocket.WebSocketServletRequest;

@Controller
@RequestMapping(value = "/websocket")   
@SuppressWarnings("serial")
public class MyWebSocketServlet extends HttpServlet{

    private static List<WebSocketBean> socketList = new ArrayList<WebSocketBean>();

    @RequestMapping(value = "{userCode}")
    public void service(HttpServletRequest req,HttpServletResponse res,@PathVariable("userCode") String userCode) 
            throws IOException, ServletException{
       //当调用了下面的startWebSocket函数后,该socket就会和相应的listener建立起对应关系
        WebSocketListener listener = new MyWebSocketListener();

        WebSocketServletRequest wsReq = (WebSocketServletRequest) req;
        WebSocketContext webSocketContext = wsReq.startWebSocket(listener);

        WebSocketBean webSocketBean = new WebSocketBean(userCode, webSocketContext.hashCode(), webSocketContext);
        socketList.add(webSocketBean);
    }

    /**
     * 获取连接的websocket列表
     * @return
     */
    public static List<WebSocketBean> getSockList(){
        return socketList;
    }
}

注意:这里的@RequestMapping(value = “{userCode}”)中的userCode是前端建立连接时,传过来的用户的唯一标示,根据这个标示确定是哪位用户发起的websocket请求。 第三步: 新建MyWebSocketListener,这个是用来监听websocket整个生命周期,包含启动、关闭、失去连接等等一系列操作。WebSocketListener是resin封装过一次的,我们只需实现它,然后进行自己的业务逻辑处理即可。

package com.gochina.tc.websocket;

import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.Reader;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.caucho.websocket.WebSocketContext;
import com.caucho.websocket.WebSocketListener;
import com.gochina.tc.util.redis.RedisUtil;

/**
 * websocket消息监听
 * @author hwy
 *
 */
public class MyWebSocketListener implements WebSocketListener{

    private Logger log = LoggerFactory.getLogger(MyWebSocketListener.class);

    /**
     * 移除websocket
     * @param webSocketContext
     */
    public void remove(WebSocketContext webSocketContext){
        List<WebSocketBean> socketList = MyWebSocketServlet.getSockList();
        WebSocketBean webSocketBean = null;
        for(WebSocketBean socket:socketList){
            if(socket.getHashCode() == webSocketContext.hashCode()){
                webSocketBean = socket;
                break;
            }
        }
        if(webSocketBean != null){
            socketList.remove(webSocketBean);
        }
    }

    /**
     * 关闭连接
     */
    @Override
    public void onClose(WebSocketContext webSocketContext) throws IOException {
         remove(webSocketContext);
         log.info(webSocketContext.hashCode()+" is closed");
    }

    /**
     * 断开连接
     */
    @Override
    public void onDisconnect(WebSocketContext webSocketContext) throws IOException {
         remove(webSocketContext);
         log.info(webSocketContext.hashCode()+" is disconnect");
    }

    /**
     * 接收二进制消息
     */
    @Override
    public void onReadBinary(WebSocketContext arg0, InputStream arg1)
            throws IOException {

    }

    /**
     * 接收文本消息并发送消息
     */
    @Override
    public void onReadText(WebSocketContext webSocketContext, Reader reader)
            throws IOException {
        PrintWriter out = null;
        int ch;
        String text = "";
        while ((ch = reader.read()) >= 0) {
            text = text+(char)ch;
        }
        int hashCode = webSocketContext.hashCode();

        List<WebSocketBean> socketList = MyWebSocketServlet.getSockList();
        for(WebSocketBean socket:socketList){
            try{
                if(socket.getHashCode() == hashCode){
                    int count = 0;
                    Object o = RedisUtil.get("tc_user_message_"+socket.getUserCode());//从redis中获取消息数目
                    if(o != null){
                        count = Integer.parseInt(o+"");
                    }
                    out = socket.getWebSocketContext().startTextMessage();
                    out.print(count);
                    out.close();
                    break;
                }
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                if(out != null){
                    out.close();
                }
                if(reader != null){
                    reader.close();
                }
            }
        }

    }

    /**
     * 开始连接
     */
    @Override
    public void onStart(WebSocketContext webSocketContext) throws IOException {
        webSocketContext.setTimeout(43200000);//设置连接关闭时间
        log.info(webSocketContext.hashCode()+" is start");
    }

    /**
     * 连接超时
     */
    @Override
    public void onTimeout(WebSocketContext webSocketContext) throws IOException {
         remove(webSocketContext);
         log.info(webSocketContext.hashCode()+" is timeOut");
    }

     /**
     * 给所有在线用户发送消息
     * @param text
     */
    public void sendToOnlingUsers(String text){
        List<WebSocketBean> socketList = MyWebSocketServlet.getSockList();
        PrintWriter out = null;
        for(WebSocketBean socket:socketList){
            try{
                out = socket.getWebSocketContext().startTextMessage();
                out.print(text);
                out.close();
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                if(out != null){
                    out.close();
                }
            }
        }
    }

    /**
     * 给某个用户发送消息
     * @param text
     * @param userCode
     */
    public void sendToOneUser(String text,String userCode){
        List<WebSocketBean> socketList = MyWebSocketServlet.getSockList();
        int i = 0;
        PrintWriter out = null;
        for(WebSocketBean socket:socketList){
            String code = socket.getUserCode();
            if(code.equals(userCode)){
                try{
                    out = socket.getWebSocketContext().startTextMessage();
                    out.print(text);
                    out.close();
                }catch(Exception e){
                    e.printStackTrace();
                }finally{
                    if(out != null){
                        out.close();
                    }
                }
            }
        }
    }
}

注意:这里面有几个地方需要着重注意下。 首先,在onStart方法是websocket每次建立连接时会触发,每次关闭连接和断开连接的时候,会相应触发onStop()和onDisconnect()方法,需要移除暂存的websocket对象。 其次,webSocketContext.setTimeout(43200000);//设置连接关闭时间,在onStart方法里面有个设置连接关闭时间的方法,此处有坑。。。最初我的resin版本是4.0.44版本以下的,测试发现,每次连接在200s以后就会莫名的断开,即使设置了setTimeOut()依旧200s自动关闭。而且官网实例中明确写了在onStart()中setTimetOut()即可修改连接关闭时间的。。。百思不得姐的时候。google翻看resin更新日志中无意间看到了这个 resin change log。 Resin Change Log

Resin 3.1 changes 4.0.44 - in progress

jsee: self signed cert should support Firefox and Chrome default cipher-suites(#5884) jsee: self signed cert should check expire (#5885) class-loader: excessive reread of jar certificates (#5850, rep by konfetov) log: add sanity check for log rollover (#5845, rep by Keith F.) deploy (git): use utf-8 to store path names (#5874, rep by alpor9) websocket: setTimeout was being overridden by Port keepaliveTimeout (#5841, rep by A. Durairaju) jni: on windows, skip JNI for File metadata like length (#5865, rep by Mathias Lagerwall) db: isNull issues with left join (#5853, rep by Thomas Rogan) websocket: check for socket close on startTextMessage (#5837, rep by samadams) log: when log rollover fails, log to stderr (#5855, rep by Rock) filter: allow private instantiation (#5839, rep by V. Selvaggio) rewrite: added SetRequestCharacterEncoding (#5862, rep by Yoon) health: change health check timeout to critical instead of fatal to allow for development sleep (#5867) alarm: timing issue with warnings and alarm extraction (#4854, rep by SHinomiya Nobuaki) session: orphan deletion throttling needs faster retry time (rep by Thomas Rogan) mod_caucho: slow PUT/POST uploads with Apache 2.4 (#5846, rep by Stegard) 好吧,果断将resin版本升至最新版4.0.44。 第四步: 前台js发起websocket请求。(只复制js代码)

//websocket获取未读消息数量
 if (userId != null && userId != '') {
    var webSocket ;
    if(window.WebSocket) {//google & Firefox
        webSocket = new WebSocket('ws://127.0.0.1:8080/websocket/'+userId);
     }else if('MozWebSocket' in window) {
            webSocket = new WebSocket('ws://127.0.0.1:8080/websocket/'+userId);
     }else{//不支持websocket浏览器
        getApiData(API.unReadCountApi+"?userCode="+userId, findUnReadMessageCount);
     }
    if(webSocket){
        webSocket.onerror = function(event) {
            console.log("connection error: "+event);
        };

        webSocket.onopen = function(event) {
            console.log("connection established");
            webSocket.send('1');
        };

        //接受到消息后,显示到页面
        webSocket.onmessage = function(event) {
            console.log("connection receive message: "+event.data);
            getUnReadMessageCount(event.data);
        };

        webSocket.onclose = function(event) {
            console.log("connection close");
            webSocket.close();
        };
    }
 }

这里是前端js发起websocket请求代码断,大致写法都类似,只是注意一点的是 webSocket = new WebSocket(‘ws://127.0.0.1:8080/websocket/’+userId);里面的请求地址写法,测试时把127.0.0.1换成线上域名的时候,消息接收不到,消息地址不通,无奈换成了ip地址就可以接受到了。不知是我服务器配置问题还是什么问题,希望了解的同学告诉我一下,感激不尽。 还有一点就是在不支持websocket的浏览器的时候,可以使用ajax长轮询获取服务器端消息,网上有一个socket.js对websocket支持的比较好,包括对不支持的浏览器的兼容问题,好像Spring4.0+已经支持socket.js了,有时间可以学习下,我这里是偷懒了,放弃了对不支持websocket的浏览器(IE10一下),只是在打开页面的时候请求了一次。

经过上面的四步的配置,一个基于resin4.0+websocket实现服务端消息推送的功能就实现了。

当然现在用的比较多的还是tomcat7.0+ 和websocket集成实现该功能,(弱弱吐槽下,别再resin下使用javaee7+webscoket的方式实现,走过的路就是流过的泪啊! 当初第一版后台使用的是javaee7的websocket实现方式,在本地tomcat7+以上测试,没有问题,由于过于自信,直接丢到线上resin服务器下,然后悲剧就发生了。。。启动正常,访问直接导致服务器CPU 300%,难忘的加班开始了。如果大家需要,我也可以写一篇基于javaee7+websocket简版实现服务端消息推送功能(非集成式Spring4.0+那种),最后强调不要在resin下跑。。。不要在resin下跑。。。不要在resin下跑。。。)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏菩提树下的杨过

mybatis: 利用多数据源实现分库存储

之前写过一篇mybatis 使用经验小结 提到过多数据源的处理方式,虽然简单但是姿势不太优雅,今天介绍一些更美观的办法: spring中有一个AbstractR...

2065
来自专栏数据之美

Hadoop 多表 join:map side join 范例

      在没有 pig 或者 hive 的环境下,直接在 mapreduce 中自己实现 join 是一件极其蛋疼的事情,MR中的join分为好几种,比如...

2539
来自专栏杂烩

MongoDB Java环境下的开发 原

        在项目下建立一个lib文件夹,将下载的驱动包放到lib下并build到path下:

862
来自专栏kl的专栏

Feign-声明式java Http客户端

Feign 是Netfilx开源的一个声明web服务客户端,这便得编写web服务客户端更容易,使用Feign 创建一个接口并对它进行注解,它具有可插拔的注解支持...

5185
来自专栏IT笔记

聊一聊生产环境中如何动态监听配置文件变化并重载

上一篇,我们谈到Java中的几种读取properties配置文件的方式,但是在生产环境中,最忌讳的就是重启应用了。比如某个系统的路径常量或者接口变更,需要线上及...

34611
来自专栏Java后端技术

Shiro+Redis实现tomcat集群session共享

  当我们使用了nginx做项目集群以后,就会出现一个很严重的问题亟待解决,那就是:tomcat集群之间如何实现session共享的问题,如果这个问题不解决,就...

1231
来自专栏用户2442861的专栏

Python: Enum枚举的实现

http://www.cnblogs.com/codingmylife/archive/2013/05/31/3110656.html

612
来自专栏Java成神之路

极光推送_总结_01_Java实现极光推送

633
来自专栏码匠的流水账

docker运行storm及wordcount实例

本文简单介绍下怎么使用docker运行storm以及在springboot中使用storm。

1012
来自专栏码匠的流水账

Flux OOM实例

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxSink.java

621

扫码关注云+社区