前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >把酒言欢话聊天,基于Vue3.0+Tornado6.1+Redis发布订阅(pubsub)模式打造异步非阻塞(aioredis)实时(websocket)通信聊天系统

把酒言欢话聊天,基于Vue3.0+Tornado6.1+Redis发布订阅(pubsub)模式打造异步非阻塞(aioredis)实时(websocket)通信聊天系统

作者头像
用户9127725
发布2022-08-08 14:16:01
1.8K0
发布2022-08-08 14:16:01
举报
文章被收录于专栏:刘悦的技术博客

    “表达欲”是人类成长史上的强大“源动力”,恩格斯早就直截了当地指出,处在蒙昧时代即低级阶段的人类,“以果实、坚果、根作为食物;音节清晰的语言的产生是这一时期的主要成就”。而在网络时代人们的表达欲往往更容易被满足,因为有聊天软件的存在。通常意义上,聊天大抵都基于两种形式:群聊和单聊。群聊或者群组聊天我们可以理解为聊天室,可以有人数上限,而单聊则可以认为是上限为2个人的特殊聊天室。

    为了开发高质量的聊天系统,开发者应该具备客户机和服务器如何通信的基本知识。在聊天系统中,客户端可以是移动应用程序(C端)或web应用程序(B端)。客户端之间不直接通信。相反,每个客户端都连接到一个聊天服务,该服务支撑双方通信的功能。所以该服务在业务上必须支持的最基本功能:

    1.能够实时接收来自其他客户端的信息。

    2.能够将每条信息实时推送给收件人。

    当客户端打算启动聊天时,它会使用一个或多个网络协议连接聊天服务。对于聊天服务,网络协议的选择至关重要,这里,我们选择Tornado框架内置Websocket协议的接口,简单而又方便,安装tornado6.1

代码语言:javascript
复制
pip3 install tornado==6.1

    随后编写程序启动文件main.py:

代码语言:javascript
复制
import tornado.httpserver
import tornado.websocket

import tornado.ioloop

import tornado.web

import redis

import threading

import asyncio

# 用户列表
users = []

# websocket协议
class WB(tornado.websocket.WebSocketHandler):


	# 跨域支持
	def check_origin(self,origin):

		return True

	# 开启链接
	def open(self):

                users.append(self)


	# 接收消息
	def on_message(self,message):

		self.write_message(message['data'])

	# 断开
	def on_close(self):

		users.remove(self)
# 建立torando实例

app = tornado.web.Application(

	[

	(r'/wb/',WB)

	],debug=True

)

if __name__ == '__main__':


	# 声明服务器
	http_server_1 = tornado.httpserver.HTTPServer(app)

	# 监听端口
	http_server_1.listen(8000)

	# 开启事件循环
	tornado.ioloop.IOLoop.instance().start()

    如此,就在短时间搭建起了一套websocket协议服务,每一次有客户端发起websocket连接请求,我们都会将它添加到用户列表中,等待用户的推送或者接收信息的动作。

    下面我们需要通过某种形式将消息的发送方和接收方联系起来,以达到“聊天”的目的,这里选择Redis的发布订阅模式(pubsub),以一个demo来实例说明,server.py

代码语言:javascript
复制
import redis

r = redis.Redis()
r.publish("test",'hello')

    随后编写 client.py:

代码语言:javascript
复制
import redis
r = redis.Redis()
ps = r.pubsub()
ps.subscribe('test')  
for item in ps.listen(): 
    if item['type'] == 'message':
        print(item['data'])

    可以这么理解:订阅者(listener)负责订阅频道(channel);发送者(publisher)负责向频道(channel)发送二进制的字符串消息,然后频道收到消息时,推送给订阅者。

    频道不仅可以联系发布者和订阅者,同时,也可以利用频道进行“消息隔离”,即不同频道的消息只会给订阅该频道的用户进行推送:

    根据发布者订阅者逻辑,改写main.py:

代码语言:javascript
复制
import tornado.httpserver
import tornado.websocket

import tornado.ioloop

import tornado.web

import redis

import threading

import asyncio

# 用户列表
users = []

# 频道列表
channels = ["channel_1","channel_2"]


# websocket协议
class WB(tornado.websocket.WebSocketHandler):


	# 跨域支持
	def check_origin(self,origin):

		return True

	# 开启链接
	def open(self):


		users.append(self)


	# 接收消息
	def on_message(self,message):

		self.write_message(message['data'])

	# 断开
	def on_close(self):

		users.remove(self)






# 基于redis监听发布者发布消息
def redis_listener(loop):

	asyncio.set_event_loop(loop)

	async def listen(): 

		r = redis.Redis(decode_responses=True)

		# 声明pubsb实例
		ps = r.pubsub()

		# 订阅聊天室频道

		ps.subscribe(["channel_1","channel_2"])


		# 监听消息
		for message in ps.listen():

			print(message)

			# 遍历链接上的用户
			for user in users:

				print(user)

				if message["type"] == "message" and message["channel"] == user.get_cookie("channel"):


					user.write_message(message["data"])

	future = asyncio.gather(listen())
	loop.run_until_complete(future)



# 接口  发布信息
class Msg(tornado.web.RequestHandler):


	# 重写父类方法
	def set_default_headers(self):

		# 设置请求头信息
		print("开始设置")
		# 域名信息
		self.set_header("Access-Control-Allow-Origin","*")
		# 请求信息
		self.set_header("Access-Control-Allow-Headers","x-requested-with")
		# 请求方式
		self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")

	

	# 发布信息
	async def post(self):

		data = self.get_argument("data",None)

		channel = self.get_argument("channel","channel_1")

		print(data)

		# 发布
		r = redis.Redis()

		r.publish(channel,data)

		return self.write("ok")


# 建立torando实例

app = tornado.web.Application(

	[

	(r'/send/',Msg),
	(r'/wb/',WB)

	],debug=True

)

if __name__ == '__main__':


	loop = asyncio.new_event_loop()

	# 单线程启动订阅者服务
	threading.Thread(target=redis_listener,args=(loop,)).start()


	# 声明服务器
	http_server_1 = tornado.httpserver.HTTPServer(app)

	# 监听端口
	http_server_1.listen(8000)

	# 开启事件循环
	tornado.ioloop.IOLoop.instance().start()

    这里假设默认有两个频道,逻辑是这样的:由前端控制websocket链接用户选择将消息发布到那个频道上,同时每个用户通过前端cookie的设置具备频道属性,当具备频道属性的用户对该频道发布了一条消息之后,所有其他具备该频道属性的用户通过redis进行订阅后主动推送刚刚发布的消息,而频道的推送只匹配订阅该频道的用户,达到消息隔离的目的。

    需要注意的一点是,通过线程启动redis订阅服务时,需要将当前的loop实例传递给协程对象,否则在订阅方法内将会获取不到websocket实例,报这个错误:

代码语言:javascript
复制
IOLoop.current() doesn't work in non-main

    这是因为Tornado底层基于事件循环ioloop,而同步框架模式的Django或者Flask则没有这个问题。

    下面编写前端代码,这里我们使用时下最流行的vue3.0框架,编写chat.vue:

代码语言:javascript
复制
<template>
  <div>


            <h1>聊天窗口</h1>


            <van-tabs v-model:active="active" @click="change_channel">

              <van-tab title="客服1号">


                <table>
              
              <tr v-for="item,index in msglist" :key="index">
                
                {{ item }}

              </tr>

            </table>
                


              </van-tab>


              <van-tab title="客服2号">
                

                <table>
              
              <tr v-for="item,index in msglist" :key="index">
                
                {{ item }}

              </tr>

            </table>


              </van-tab>

            </van-tabs>


            


            <van-field label="聊天信息" v-model="msg" />

            <van-button color="gray" @click="commit">发送</van-button>

   
  </div>
</template>

<script>

export default {
 data() {
    return {
      auditlist:[],

      //聊天记录
      msglist:[],
      msg:"",
       websock: null, //建立的连接
      lockReconnect: false, //是否真正建立连接
      timeout: 3 * 1000, //30秒一次心跳
      timeoutObj: null, //外层心跳倒计时
      serverTimeoutObj: null, //内层心跳检测
      timeoutnum: null, //断开 重连倒计时
      active:0,
      channel:"channel_1"
     
    }
  },
  methods:{


    //切换频道
    change_channel:function(){


          if(this.active === 0){


                this.channel = "channel_1";

                var name = "channel";
          var value = "channel_1";

          

          }else{


              this.channel = "channel_2";

                var name = "channel";
          var value = "channel_2";


          }


          //清空聊天记录
          this.msglist = [];


          var d = new Date();
          d.setTime(d.getTime() + (24 * 60 * 60 * 1000));
          var expires = "expires=" + d.toGMTString();
          document.cookie = name + "=" + value + "; " + expires;


          this.reconnect();


    },
     initWebSocket() {
      //初始化weosocket
      const wsuri = "ws://localhost:8000/wb/";
      this.websock = new WebSocket(wsuri);
      this.websock.onopen = this.websocketonopen;
      this.websock.onmessage = this.websocketonmessage;
      this.websock.onerror = this.websocketonerror;
      this.websock.onclose = this.websocketclose;
    },

    reconnect() {
      //重新连接
      var that = this;
      if (that.lockReconnect) {
        // 是否真正建立连接
        return;
      }
      that.lockReconnect = true;
      //没连接上会一直重连,设置延迟避免请求过多
      that.timeoutnum && clearTimeout(that.timeoutnum);
      // 如果到了这里断开重连的倒计时还有值的话就清除掉
      that.timeoutnum = setTimeout(function() {
        //然后新连接
        that.initWebSocket();
        that.lockReconnect = false;
      }, 5000);
    },

     reset() {
      //重置心跳
      var that = this;
      //清除时间(清除内外两个心跳计时)
      clearTimeout(that.timeoutObj);
      clearTimeout(that.serverTimeoutObj);
      //重启心跳
      that.start();
    },

    start() {
      //开启心跳
      var self = this;
      self.timeoutObj && clearTimeout(self.timeoutObj);
      // 如果外层心跳倒计时存在的话,清除掉
      self.serverTimeoutObj && clearTimeout(self.serverTimeoutObj);
      // 如果内层心跳检测倒计时存在的话,清除掉
      self.timeoutObj = setTimeout(function() {
        // 重新赋值重新发送 进行心跳检测
        //这里发送一个心跳,后端收到后,返回一个心跳消息,
        if (self.websock.readyState == 1) {
          //如果连接正常
          // self.websock.send("heartCheck");
        } else {
          //否则重连
          self.reconnect();
        }
        self.serverTimeoutObj = setTimeout(function() {
          // 在三秒一次的心跳检测中如果某个值3秒没响应就关掉这次连接
          //超时关闭
         // self.websock.close();
        }, self.timeout);
      }, self.timeout);
      // 3s一次
    },

    websocketonopen(e) {
      //连接建立之后执行send方法发送数据
      console.log("成功");

     // this.websock.send("123");
      // this.websocketsend(JSON.stringify(actions));
    },
    websocketonerror() {
      //连接建立失败重连
      console.log("失败");
      this.initWebSocket();
    },
    websocketonmessage(e) {

      console.log(e);
      //数据接收
      //const redata = JSON.parse(e.data);
      const redata = e.data;

      //累加
      this.msglist.push(redata);

      console.log(redata);

     
    },
    websocketsend(Data) {
      //数据发送
      this.websock.send(Data);
    },
    websocketclose(e) {
      //关闭
      this.reconnect()
      console.log("断开连接", e);
    },

    //提交表单
    commit:function(){


        //发送请求

        this.myaxios("http://localhost:8000/send/","post",{"data":this.msg,channel:this.channel}).then(data =>{

          console.log(data);

        });



    },
  

  },

  mounted(){


      //连接后端websocket服务
      this.initWebSocket();



      var d = new Date();
          d.setTime(d.getTime() + (24 * 60 * 60 * 1000));
          var expires = "expires=" + d.toGMTString();
          document.cookie = "channel" + "=" + "channel_1" + "; " + expires;

    

  }

}
</script>


<style scoped>
  @import url("../assets/style.css");

  .chatbox{

      color:black;

  }

  .mymsg{

      background-color:green;

  }


</style>

    这里前端在线客户端定期向状态服务器发送心跳事件。如果服务端在特定时间内(例如x秒)从客户端接收到心跳事件,则认为用户处于联机状态。否则,它将处于脱机状态,脱机后在阈值时间内可以进行重新连接的动作。同时利用vant框架的标签页可以同步切换频道,切换后将频道标识写入cookie,便于后端服务识别后匹配推送。

    效果是这样的:

    诚然,功能业已实现,但是如果我们处在一个高并发场景之下呢?试想一下如果一个频道有10万人同时在线,每秒有100条新消息,那么后台tornado的websocket服务推送频率是100w*10/s = 1000w/s 。

    这样的系统架构如果不做负载均衡的话,很难抗住压力,那么瓶颈在哪里呢?没错,就是数据库redis,这里我们需要异步redis库aioredis的帮助:

代码语言:javascript
复制
pip3 install aioredis

    aioredis通过协程异步操作redis读写,避免了io阻塞问题,使消息的发布和订阅操作非阻塞。

    此时,可以新建一个异步订阅服务文件main_with_aioredis.py:

代码语言:javascript
复制
import asyncio
import aioredis
from tornado import web, websocket
from tornado.ioloop import IOLoop
import tornado.httpserver
import async_timeout

    之后主要的修改逻辑是,通过aioredis异步建立redis链接,并且异步订阅多个频道,随后通过原生协程的asyncio.create_task方法(也可以使用asyncio.ensure_future)注册订阅消费的异步任务reader:

代码语言:javascript
复制
async def setup():
    r = await aioredis.from_url("redis://localhost", decode_responses=True)
    pubsub = r.pubsub()

    print(pubsub)
    await pubsub.subscribe("channel_1","channel_2")

    #asyncio.ensure_future(reader(pubsub))
    asyncio.create_task(reader(pubsub))

    在订阅消费方法中,异步监听所订阅频道中的发布信息,同时和之前的同步方法一样,比对用户的频道属性并且进行按频道推送:

代码语言:javascript
复制
async def reader(channel: aioredis.client.PubSub):
    while True:
        try:
            async with async_timeout.timeout(1):
                message = await channel.get_message(ignore_subscribe_messages=True)
                if message is not None:
                    print(f"(Reader) Message Received: {message}")

                    for user in users:

                        if user.get_cookie("channel") == message["channel"]:

                            user.write_message(message["data"])
        
                await asyncio.sleep(0.01)
        except asyncio.TimeoutError:
            pass

    最后,利用tornado事件循环IOLoop传递中执行回调方法,将setup方法加入到事件回调中:

代码语言:javascript
复制
if __name__ == '__main__':

    # 监听端口
    application.listen(8000)

    loop = IOLoop.current()
    loop.add_callback(setup)
    loop.start()

    完整的异步消息发布、订阅、推送服务改造 main_aioredis.py:

代码语言:javascript
复制
import asyncio
import aioredis
from tornado import web, websocket
from tornado.ioloop import IOLoop
import tornado.httpserver
import async_timeout

users = []

# websocket协议
class WB(tornado.websocket.WebSocketHandler):


    # 跨域支持
    def check_origin(self,origin):

        return True

    # 开启链接
    def open(self):


        users.append(self)


    # 接收消息
    def on_message(self,message):

        self.write_message(message['data'])

    # 断开
    def on_close(self):

        users.remove(self)


class Msg(web.RequestHandler):


    # 重写父类方法
    def set_default_headers(self):

        # 设置请求头信息
        print("开始设置")
        # 域名信息
        self.set_header("Access-Control-Allow-Origin","*")
        # 请求信息
        self.set_header("Access-Control-Allow-Headers","x-requested-with")
        # 请求方式
        self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")


    # 发布信息
    async def post(self):

        data = self.get_argument("data",None)

        channel = self.get_argument("channel","channel_1")

        print(data)

        # 发布
        r = await aioredis.from_url("redis://localhost", decode_responses=True)

        await r.publish(channel,data)

        return self.write("ok")


async def reader(channel: aioredis.client.PubSub):
    while True:
        try:
            async with async_timeout.timeout(1):
                message = await channel.get_message(ignore_subscribe_messages=True)
                if message is not None:
                    print(f"(Reader) Message Received: {message}")

                    for user in users:

                        if user.get_cookie("channel") == message["channel"]:

                            user.write_message(message["data"])
        
                await asyncio.sleep(0.01)
        except asyncio.TimeoutError:
            pass


async def setup():
    r = await aioredis.from_url("redis://localhost", decode_responses=True)
    pubsub = r.pubsub()

    print(pubsub)
    await pubsub.subscribe("channel_1","channel_2")

    #asyncio.ensure_future(reader(pubsub))
    asyncio.create_task(reader(pubsub))


application = web.Application([
    (r'/send/',Msg),
    (r'/wb/', WB),
],debug=True)    


if __name__ == '__main__':

    # 监听端口
    application.listen(8000)

    loop = IOLoop.current()
    loop.add_callback(setup)
    loop.start()

    从程序设计角度上讲,充分利用了协程的异步执行思想,更加地丝滑流畅。

    结语:实践操作来看,Redis发布订阅模式,非常契合这种实时(websocket)通信聊天系统的场景,但是发布的消息如果没有对应的频道或者消费者,消息则会被丢弃,假如我们在生产环境在消费的时候,突然断网,导致其中一个订阅者挂掉了一段时间,那么当它重新连接上的时候,中间这一段时间产生的消息也将不会存在,所以如果想要保证系统的健壮性,还需要其他服务来设计高可用的实时存储方案,不过那就是另外一个故事了,最后奉上项目地址,与众乡亲同飨:https://github.com/zcxey2911/tornado_redis_vue3_chatroom

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-12-21 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档