专栏首页Node开发实时会话系统实现(2) --- express-ws改写会话系统

实时会话系统实现(2) --- express-ws改写会话系统

上一篇提到过实际上会话系统最简单的方式是http轮询:用户发送信息时实现一个http接口保存用户聊天信息,然后在客户端实现一个定时器,定时获取用户A与用户B的聊天信息,并且重新渲染聊天界面。我们在上一篇成功通过http轮询的方式实现绘画系统,但是我们也有提到过http轮询的缺点在于轮询中的大部分请求都是没有实际意义的,所以会极大的浪费和消耗带宽和服务器资源。所以本节课我们通过express框架支持的一个websocket库--express-ws来改写上一篇实现的会话系统。

客户端代码其实和上篇文章基本一致,只是增加了个上传视频的按钮,因为小程序没有选择文件的api,所以我们只能通过wx.chooseImage来选择图片发送,通过wx.chooseVideo来选择视频发送,实际上效果就是在上一篇的基础上加了一个视频按钮,因为上一篇没有提到图片发送和视频发送的客户端逻辑有读者私聊问到具体的逻辑,这边简单讲讲小程序客户端如何实现图片选择发送以及视频选择发送。实际上我们可以通过wx.chooseImage选择图片,这个api实际上很简单,指定最多可选择图片张数以及图片来源等,选择成功实际上会返回一个图片的临时路径tempFilePaths,然后使用form-data的方式将tempFilePaths提交到后端接口进行图片上传,图片上传成功后会返回图片的url地址,这时候再进行信息发送保存聊天记录到数据库。我们可以看一眼逻辑代码:

//发送图片
  chooseImage() {
    var that = this;
    wx.chooseImage({
      count: 1, // 默认9张图片
      sizeType: ['original', 'compressed'], // 可以指定是原图还是压缩图,默认二者都有
      sourceType: ['album', 'camera'], // 可以指定来源是相册还是相机,默认二者都有
      success: function(res) {
        // 返回选定照片的本地文件路径列表,tempFilePath可以作为img标签的src属性显示图片
        var tempFilePaths = res.tempFilePaths;
        that.setData({
          loading: true,
          increase: false
        });

        wx.uploadFile({
          url: utils.basePath + '/users/upload_avatar',
          filePath: tempFilePaths[0],
          name: 'avatar',
          headers: {
            'Content-Type': 'form-data'
          },

          success: function(res) {
            that.setData({
              loading: false
            });

            var result = JSON.parse(res.data);
            if (result.status == 200) {
              //图片上传成功,将聊天记录保存数据库
              var chatInfo = that.data.chatInfo;
              chatInfo.chat_content = result.payload.avatar_path;
              chatInfo.chat_type = 1;
              chatInfo = JSON.stringify(chatInfo);
              websocket.send(chatInfo);

              //接受服务器消息
              wx.onSocketMessage(function(res) {
                var data = JSON.parse(res.data).data;
                data[0].flagtime = true;

                for (var i = 1; i < data.length; i++) {
                  var currenttime = new Date(data[i].created_date).getTime();
                  var begintime = new Date(data[i - 1].created_date).getTime();

                  if (currenttime - begintime > 1000 * 60) {
                    data[i].flagtime = true;
                  } else {
                    data[i].flagtime = false;
                  }
                }
                that.setData({
                  newslist: data
                });

                //将聊天界面定位到最新的聊天记录
                that.bottom();
              });
            } else {
              $Toast({
                content: result.err,
                type: 'error'
              });
            }
          }
        });
      }
    });
  }

视频发送实际上和图片发送几乎一致,就是将wx.chooseImage换成wx.chooseVideo就可以,但是视频上传这里面有几个坑需要逃避一下:

  • wx.chooseVideo有个属性compressed参数可以设置视频是否需要压缩,默认是true,视频会经过压缩上传,经过实测发现视频经过压缩清晰度极低,所以可以携带compressed参数关闭视频压缩。
  • 视频大小实际上和微信是保持一致的,无法发送超过24M的视频,但是我测试的时候发现超过1M的服务器一直报413状态码提示视频过大,实际上就是我们后端没有设置body最大的长度,比如我是Nginx对上传的域名pic.niyueling.cn增加了client_max_body_size实行,设置为25M,就可以躲过413状态这个坑。

接下来我们一样看下代码:

//发送视频
  chooseVideo() {
    var that = this;
    wx.chooseVideo({
      sourceType: ['album', 'camera'],
      maxDuration: 60,
      compressed: false,
      camera: 'back',
      success: function(res) {
        var tempFilePaths = res.tempFilePath;

        that.setData({
          loading: true,
          increase: false
        });

        wx.uploadFile({
          url: utils.basePath + '/users/upload_video',
          filePath: tempFilePaths,
          name: 'mp4_url',
          headers: {
            'Content-Type': 'form-data'
          },

          success: function(res) {
            if (res.statusCode == 413) {
              that.setData({
                loading: false
              });
              $Toast({
                content: '视频过大,请重新上传',
                type: 'error'
              });
            } else {
              that.setData({
                loading: false
              });

              var result = JSON.parse(res.data);
              if (result.status == 200) {
                //上传视频操作
                var chatInfo = that.data.chatInfo;
                chatInfo.chat_content = result.payload;
                chatInfo.chat_type = 2;
                chatInfo = JSON.stringify(chatInfo);

                websocket.send(chatInfo);

                //接受服务器消息
                wx.onSocketMessage(function(res) {
                  var data = JSON.parse(res.data).data;
                  data[0].flagtime = true;

                  for (var i = 1; i < data.length; i++) {
                    var currenttime = new Date(data[i].created_date).getTime();
                    var begintime = new Date(data[i - 1].created_date).getTime();

                    if (currenttime - begintime > 1000 * 60) {
                      data[i].flagtime = true;
                    } else {
                      data[i].flagtime = false;
                    }
                  }
                  that.setData({
                    newslist: data
                  });

                  that.bottom();
                });
              } else {
                $Toast({
                  content: result.err,
                  type: 'error'
                });
              }
            }
          }
        });
      }
    });
  }

接下来就开始正式讲讲websocket在小程序的使用了,其实websocket在小程序封装的很完美,可以让没接触过websocket开发的快速上手。我们在utils下创建一个websocket.js,在里面封装websocket的基本操作。实际上在会话系统我们目前仅仅需要websocket连接,发送消息,接受消息三个方法,所以我们在websocket.js中定义这三个方法,然后使用module.exports导出,使得在任何界面都可以调用这几个方法,我们看下代码:

const util = require('./util.js');

//发起websocket连接
function connect(user, func) {
  wx.connectSocket({
    url: util.wssPath + '/chat/v1/message?friendphone=' + user.friendInfo.account + '&userphone=' + user.userInfo.account + '&app_sid=' + user.userInfo.app_sid,
    header: { 
      'content-type': 'application/json' 
    },
    success: function (res) {
      console.log(res)
    },

    fail: function (res) {
      console.log(res);
    }
  });

  wx.onSocketOpen(function (res) {
    //接受服务器消息
    wx.onSocketMessage(func);//func回调可以拿到服务器返回的数据
  });

  wx.onSocketError(function (res) {
    wx.showToast({
      title: res.errMsg,
      icon: "none",
      duration: 1000
    });
  });
}

//发送消息
function send(msg) {
  wx.sendSocketMessage({
    data: msg
  });
}

module.exports = {
  connect: connect,
  send: send
}

然后在会话界面的onLoad方法连接websocket,连接成功接口会返回历史聊天记录,可以渲染出聊天界面。我们可以看下onLoad的关键代码:

websocket.connect(this.data.chatInfo, function(res) {
        if (JSON.parse(res.data).data.length == 0) {
          that.setData({
            newslist: []
          });
        } else {
          var data = JSON.parse(res.data).data;
          data[0].flagtime = true;

          for (var i = 1; i < data.length; i++) {
            var currenttime = new Date(data[i].created_date).getTime();
            var begintime = new Date(data[i - 1].created_date).getTime();

            if (currenttime - begintime > 1000 * 60) {
              data[i].flagtime = true;
            } else {
              data[i].flagtime = false;
            }
          }
          that.setData({
            newslist: data
          });

          that.bottom();
        }
      });

然后用户发送消息之后使用刚才封装好的方法send发送消息,消息发送成功服务端会返回新的聊天记录,动态渲染聊天界面。可以看下关键代码:

//封装聊天记录参数
      var chatInfo = that.data.chatInfo;
      chatInfo.chat_content = that.data.message;
      chatInfo.chat_type = 0;
      chatInfo = JSON.stringify(chatInfo);

      websocket.send(chatInfo);

      //接受服务器消息
      wx.onSocketMessage(function(res) {
        var data = JSON.parse(res.data).data;
        data[0].flagtime = true;

        for (var i = 1; i < data.length; i++) {
          var currenttime = new Date(data[i].created_date).getTime();
          var begintime = new Date(data[i - 1].created_date).getTime();

          if (currenttime - begintime > 1000 * 60) {
            data[i].flagtime = true;
          } else {
            data[i].flagtime = false;
          }
        }
        that.setData({
          newslist: data
        });

        that.bottom();
      });

到这里我们小程序端的websocket连接全部实现了。下一步需要在服务端实现wss接口。首先和https一样,小程序只支持wss,所以我们需要申请证书先在Nginx配置wss:

upstream backend_chatws {
    server 127.0.0.1:3001 weight=10;
}

server {
  listen 443 ssl;
  server_name ws.niyueling.cn;
  ssl_certificate /etc/nginx/ctr/ws_niyueling_cn.crt;
  ssl_certificate_key /etc/nginx/ctr/ws_niyueling_cn.key;
  ssl_session_cache    shared:SSL:1m;
  ssl_session_timeout  5m;
    ssl_protocols TLSv1 TLSv1.1 TLSv1.2; 
    ssl_prefer_server_ciphers on;
  server_tokens off;
  access_log /var/log/nginx/api.log  main;

    location / {
        client_max_body_size 100m;
        proxy_redirect off;
        proxy_pass http://backend_chatws;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_read_timeout 604800s;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
    }


    error_page   500 502 503 504  /50x.html;
    location = /50x.html {
        root   /usr/share/nginx/html;
    }
}

刚才已经说过了本篇文章使用express-ws库来封装websocket,接下来我们看看express-ws库的基本使用。因为我们正式开发一般后端不可能把所有接口写在同一个文件中,所以我们这边也通过分文件来实现。首先npm安装express-ws依赖,然后在app.js引入express-ws:

var express = require('express');
var app = express();
var expressWs = require('express-ws')(app);
var chat = require('./routes/chat');
 
app.use('/chat/v1', chat);

app.listen(3001);

可以看到我们在app.js引用了chat.js文件表示我们实际上websocket接口是在chat.js中实现,接下来我们在chat.js中引用express-ws,这里需要注意如果分文件实现接口必须在app.js和具体的接口js文件都引入express-ws才可以正常使用。然后接口的实现实际上和http接口实现方法类似,我们引入express-ws后实际上router就多了一个ws方法,就是用来书写websocket接口,然后接口中实际上是存在两部分逻辑,第一次调用就等于websocket连接事件,这时候我们要查询好友的聊天记录返回,当用户发送消息时,会触发message事件,这时候先保存用户聊天记录再查询最新的聊天记录并返回。我们可以看下代码:

router.ws('/message', function (ws, req) {
  var par = paramAll(req);

  if (!par.friendphone || !par.userphone || !par.app_sid) {
    return ws.send(JSON.stringify({ code: 0, msg: '参数不全!' }));
  }

  //查询用户历史记录
  chatDao.getOnlineChat(par, function (err, data) {
    if (err) {
      return ws.send(JSON.stringify({
        code: 0,
        msg: err
      }));
    }

    return ws.send(JSON.stringify({
      code: 1,
      data: data
    }));
  });

  ws.on('message', function (msg) {
    par.msg = JSON.parse(msg);
    //将记录添加到数据库,并返回最新记录列表
    chatDao.saveOnlineChat(par.msg, function (err, data) {
      if (err) {
        return ws.send(JSON.stringify({
          code: 0,
          msg: err
        }));
      }

      return ws.send(JSON.stringify({
        code: 1,
        data: data
      }));
    });
  });
});

数据库操作逻辑实际上也分别对应两部分,websocket连接时会返回两个好友间的历史聊天记录:

async.waterfall([
            function (callback) {
                connection.beginTransaction(function (err) {
                    return callback(err);
                });
            },
            //通过friendphone查询好友信息
            function (callback) {
                var sql = 'select username, avatar from users where account = ? and app_sid = ?';
                var value = [data.friendphone, data.app_sid];

                connection.query(sql, value, function (err, result) {
                    if (err) {
                        return callback(err);
                    }

                    if (!result[0]) {
                        return callback('用户不存在!');
                    }
                    data.friendname = result[0].username;
                    data.friendavatar = result[0].avatar;

                    return callback(null, 200);
                });
            },
            //通过userphone查询好友信息
            function (info, callback) {
                var sql = 'select username, avatar from users where account = ? and app_sid = ?';
                var value = [data.userphone, data.app_sid];

                connection.query(sql, value, function (err, result) {
                    if (err) {
                        return callback(err);
                    }

                    if (!result[0]) {
                        return callback('用户不存在!');
                    }
                    data.username = result[0].username;
                    data.useravatar = result[0].avatar;

                    return callback(null, 200);
                });
            },
            function (release_info, callback) {
                var sql = 'select id, friendphone, friendname, friendavatar, app_sid, DATE_FORMAT(created_date, "%Y-%m-%d %H:%i:%s") as created_date, userphone, username, useravatar, content, chat_type from online_chat ' +
                    'where (friendphone = ? and userphone = ?) or (friendphone = ? and userphone = ?)';
                var value = [data.friendphone, data.userphone, data.userphone, data.friendphone];

                connection.query(sql, value, function (err, result) {
                    if (err) {
                        return callback(err);
                    }

                    var del_info = result && result.length > 0 ? result : null;

                    if (!del_info) {
                        return callback(null, true, []);
                    }

                    return callback(null, true, del_info);
                });
            }
        ], function (DbErr, isSuccess, uidOrInfo) {
            if (DbErr || !isSuccess) {
                connection.rollback(function () {
                    connection.release();
                });

                return cb(DbErr);
            }

            connection.commit(function (e) {
                if (e) {
                    connection.rollback(function () {
                        connection.release();
                    });

                    return cb(e);
                }

                connection.release();
                cb(null, uidOrInfo);
            });
        });

当客户端用户发送消息就会触发message事件,这时候保存用户聊天信息并返回最新的聊天记录:

async.waterfall([
            function (callback) {
                connection.beginTransaction(function (err) {
                    return callback(err);
                });
            },
            function (callback) {
                var sql = 'insert into online_chat set ?';
                var value = {
                    friendphone: data.friendInfo.account,
                    friendname: data.friendInfo.username,
                    friendavatar: data.friendInfo.avatar,
                    app_sid: data.friendInfo.app_sid,
                    userphone: data.userInfo.account,
                    username: data.userInfo.username,
                    useravatar: data.userInfo.avatar,
                    created_date: new Date(),
                    status: 1,
                    content: data.chat_content,
                    chat_type: data.chat_type
                };

                connection.query(sql, value, function (err, result) {
                    if (err) {
                        return callback(err);
                    }

                    if (result.affectedRows == 0) {
                        return callback('聊天出现故障!');
                    }

                    return callback(null, '保存聊天记录成功!');
                });
            },
            function (release_info, callback) {
                var sql = 'select id, friendphone, friendname, friendavatar, app_sid, DATE_FORMAT(created_date, "%Y-%m-%d %H:%i:%s") as created_date, userphone, username, useravatar, content, chat_type from online_chat ' +
                    'where (friendphone = ? and userphone = ?) or (friendphone = ? and userphone = ?)';
                var value = [data.friendInfo.account, data.userInfo.account, data.userInfo.account, data.friendInfo.account];

                connection.query(sql, value, function (err, result) {
                    if (err) {
                        return callback(err);
                    }

                    var del_info = result && result.length > 0 ? result : null;

                    if (!del_info) {
                        return callback(null, true, []);
                    }

                    return callback(null, true, del_info);
                });
            }
        ], function (DbErr, isSuccess, uidOrInfo) {
            if (DbErr || !isSuccess) {
                connection.rollback(function () {
                    connection.release();
                });

                return cb(DbErr);
            }

            connection.commit(function (e) {
                if (e) {
                    connection.rollback(function () {
                        connection.release();
                    });

                    return cb(e);
                }

                connection.release();
                cb(null, uidOrInfo);
            });
        });

到这里我们使用express-ws改写会话系统就完成了,我们可以测试下:

可以发现我们使用websocket可以开启一个长连接成功实现实时会话系统,有消息送达马上接收渲染,而不用像http轮询一样不断地重复请求接口造成贷款和服务器资源的浪费。目前整个项目前后端已开源于码云,欢迎来一个star。源码地址:

https://gitee.com/mqzuimeng_admin/wx_blog.git

本文分享自微信公众号 - 程序猿周先森(zhanyue_org),作者:逆月翎

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-11-18

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Node中使用ORM框架

    在正常的开发中,大部分都会使用MVC为主要的系统架构模式。而Model一般包含了复杂的业务逻辑以及数据逻辑,因为Model中逻辑的复杂度,所以我们有必要降低系统...

    逆月翎
  • 实时会话系统实现(1) --- http轮询方式

    最近一直在开发博客小程序,最近开发好友系统和实时会话系统。其实众所周知会话系统正常的业务逻辑应该是用户A给用户B发送一个消息,用户A发送后用户B马上可以接收到并...

    逆月翎
  • NodeJS require()源码解析

    最开始谈NodeJS的时候写过一篇文章谈了它与Java各自的优缺点。NodeJS最早的定位是什么样的呢?最早开发者Ryan Dahl是想提升自...

    逆月翎
  • 微信小程序 wx.request 的封装

    自学转行到前端也已近两年,也算是简书和掘金的忠实粉丝,但是以前一直惜字如金(实在是胆子小,水平又低),现在我决定视金钱如粪土(就只是脸皮厚了,水平就那样),好了...

    极乐君
  • 【利用Python进行金融数据分析-06】统计

    import pandas as pd import datetime import numpy as np

    光点神奇
  • R语言与生信系列①(R入门与临床三线表绘制)

    首次分享课讲的是TCGA数据分析,探究某一因素与肿瘤临床数据之间的关系,并自动生成可以用于SCI发表的三线表,如下图所示:

    用户1359560
  • Python中的相关分析correlation analysis

    相关分析(correlation analysis) 研究两个或两个以上随机变量之间相互依存关系的方向和密切程度的方法。 线性相关关系主要采用皮尔逊(Pears...

    Erin
  • 2019-08-01easyMock 的使用

    用户4344670
  • 你离大厂的offer只差这份算法汇总

    定义:算法(Algorithm)是指解题方案的准确而完整的描述,是一系列解决问题的清晰指令,算法代表着用系统的方法描述解决问题的策略机制。也就是说,能够对一定...

    小闫同学啊
  • Spark详解07广播变量BroadcastBroadcast

    Broadcast 顾名思义,broadcast 就是将数据从一个节点发送到其他各个节点上去。这样的场景很多,比如 driver 上有一张表,其他节点上运行的 ...

    Albert陈凯

扫码关注云+社区

领取腾讯云代金券