前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >WebSocket降级策略(一)

WebSocket降级策略(一)

原创
作者头像
4cos90
发布2022-08-04 23:47:48
7810
发布2022-08-04 23:47:48
举报
文章被收录于专栏:随想随想

问题背景

项目上前后端采用websocket通信,但是websocket连接经常会断开,虽然有重连机制,但是在重连的过程中,以及重连失败时,会影响前端数据的即时刷新。我们也不可能每次一出现问题就要求用户重启浏览器。因此需要设计一个websocket降级方案。

降级思路

前端处理:当前端websocket断开或者超过一定时间没有收到消息时,将会自动切换为轮询,主动查询服务器最近是否有发送给前端的websocket消息。当websocket重连成功并收到消息后,取消轮询。

后端处理:当后端发送websocket请求时,对发送的消息进行缓存,当前端进行查询时,返回发送给该前端的消息。同时将超过一定时长的消息(过期消息),或者前端已查询过的消息(确保已经收到了),从缓存中剔除,避免oom。

实现过程

这个方案前端的实现比较简单,不再赘述。下面着重写一下后端的实现。

首先我们要定义一个缓存服务接口,他需要实现的基本方法显然有三个,Get,Set,GetAll。

代码语言:javascript
复制
type CacheServer interface {
 Get(key string) []messageCache
 Set(receiver string, message string, time time.Time) error
 GetAll() []messageCache
}

定义服务结构,包括服务名,服务缓存,缓存超时时间。

代码语言:javascript
复制
type cache struct {
    Name    string
    Cache   []messageCache
    Timeout time.Duration
}

定义缓存结构。包含接收者,消息体,消息发送时间。

代码语言:javascript
复制
type messageCache struct {
    receiver string
    message  string
    time     time.Time
}

其实定义完接口和结构体,任务就完成了一半了。接下来只要按照接口定义填一些实现。

Set方法实现,这里没有直接传入messageCache 类型的数据,也是因为不想把包内的数据类型扩散出去,外部调用不必知道包内的数据结构。每当读写数据时,先清理掉已经超时的数据。

代码语言:javascript
复制
func (s *cache) Set(receiver string, message string, sendtime time.Time) error {
	s.clearOutTimeCache()
	value := messageCache{
		receiver: receiver,
		message:  message,
		time:     sendtime,
	}
	s.Cache = append(s.Cache, value)
	return nil
}

GetAll 方法实现,清理完超时数据后,直接返回剩余的全部缓存。

代码语言:javascript
复制
func (s *cache) GetAll() []messageCache {
	s.clearOutTimeCache()
	return s.Cache
}

Get方法实现,在GetAll的基础上,加入了key值的判断,并且在用户获取完数据后,清理掉该key值的缓存。

代码语言:javascript
复制
func (s *cache) Get(key string) []messageCache {
	s.clearOutTimeCache()
	rlt := make([]messageCache, 1)
	newCache := make([]messageCache, 1)
	for i := 0; i < len(s.Cache); i++ {
		if s.Cache[i].receiver == key {
			rlt = append(rlt, s.Cache[i])
		} else {
			newCache = append(newCache, s.Cache[i])
		}
	}
	s.Cache = newCache
	return rlt
}

clearOutTimeCache实现,把超时的缓存剔除出去。

代码语言:javascript
复制
func (s *cache) clearOutTimeCache() {
	for startindex := 0; startindex < len(s.Cache); startindex++ {
		if time.Now().Sub(s.Cache[startindex].time) < s.Timeout {
			s.Cache = s.Cache[startindex:]
			break
		} else if startindex == (len(s.Cache) - 1) {
			s.Cache = make([]messageCache, 0)
		}
	}
}

我们的包还得暴露一个新建实例的方法给外部,一共两个参数,实例名,超时时间。

代码语言:javascript
复制
func NewCache(name string, timeout time.Duration) CacheServer {
	return &cache{
		Name:    name,
		Cache:   make([]messageCache, 1),
		Timeout: timeout,
	}
}

好了这样我们的一个简单的cacheServer包就完成了。下面写一个测试代码来看一下效果。

直接把main文件贴上来了。两个goroutine,一个启动服务,一个模拟websocket消息发送。启动的服务三个接口,一个health页面,一个根据key获取缓存message,一个获取所有缓存。好运行下代码看看。

代码语言:javascript
复制
package main

import (
	"cacheServer/cacheServer"
	"fmt"
	"math/rand"
	"net/http"
	"strconv"
	"time"
)

func main() {
	MessageCache := cacheServer.NewCache("MessageCache", time.Second*5)
	go Start(":8080", MessageCache)
	go MockWebSocketMessage(MessageCache)
	select {}
}

func MockWebSocketMessage(cache cacheServer.CacheServer) {
	rand.Seed(time.Now().UnixNano())
	var i int = 0
	for {
		time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
		var receiver string
		if i%2 == 0 {
			receiver = "HYC"
		} else {
			receiver = "ZMN"
		}
		message := "send message " + strconv.Itoa(i) + " times"
		cache.Set(receiver, message, time.Now())
		i = i + 1
	}
}

func Start(Port string, cache cacheServer.CacheServer) error {
	mux := http.NewServeMux()
	mux.HandleFunc("/", health)
	mux.HandleFunc("/syncCache", syncCache(cache))
	mux.HandleFunc("/getCache", getCache(cache))
	svr := &http.Server{Addr: Port, Handler: mux}
	err := svr.ListenAndServe()
	return err
}

func health(w http.ResponseWriter, r *http.Request) {
	fmt.Fprintf(w, "server work")
}

func getCache(cache cacheServer.CacheServer) func(w http.ResponseWriter, r *http.Request) {
	fmt.Printf("getCache Success\n")
	return func(w http.ResponseWriter, r *http.Request) {
		key := GetUrlArg(r, "key")
		fmt.Fprintf(w, "Cache Key:%s,%v", key, cache.Get(key))
	}
}

func syncCache(cache cacheServer.CacheServer) func(w http.ResponseWriter, r *http.Request) {
	fmt.Printf("syncCache Success\n")
	return func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintf(w, "All Cache:%v \n", cache.GetAll())
	}
}

func GetUrlArg(r *http.Request, name string) string {
	var arg string
	values := r.URL.Query()
	arg = values.Get(name)
	return arg
}

先查询全部缓存,等一会查询一次,确保缓存数据正常被清除。咱们NewCache时传入的超时时间是5秒,模拟随机发送消息是1-1000毫秒,因此预期查到的缓存数量在8-12条之间。随着刷新可以看到过期的缓存被清除。

查询全部缓存
查询全部缓存

再试试根据key查询,因为模拟了两个人在发消息,因此单个人预计在4-6条左右,并且清理掉已查询的数据,快速刷新两次应该第二次只能查到0-2条。

根据key查询
根据key查询
快速刷新两次第二次只查到两条
快速刷新两次第二次只查到两条

TODO

这样一个简单的cacheServer就完成了,测试下来数据也正确。那么还有哪些未完成的工作呢。

首先我们的服务大概率是集群部署的,在服务内部使用缓存不可避免的存在同步问题。之前可能大家会疑惑为啥通过key获取的缓存要清除掉,获取所有的缓存就不用清理掉。因为获取所有缓存的接口是准备留给服务器之间同步用的。我们不会允许用户去获取其他用户收到的消息。

其次是cacheServer里messageCache不是一个良好的定义,receiver和message与websocket消息的含义耦合太紧,可以换为更松散的定义。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题背景
  • 降级思路
  • 实现过程
  • TODO
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档