前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >WebSocket降级策略(二)

WebSocket降级策略(二)

原创
作者头像
4cos90
发布2022-08-07 22:52:57
3360
发布2022-08-07 22:52:57
举报
文章被收录于专栏:随想随想

messageCache定义修改

代码语言:javascript
复制
// type messageCache struct {
// 	receiver string
// 	message  string
// 	time     time.Time
// }

type cacheValue struct {
	key   string
	value interface{}
	time  time.Time
}

把messageCache修改为更松散的key-value的结构,并且不再限定value的类型。

Set 方法修改

代码语言:javascript
复制
func (s *cache) Set(key string, kvalue interface{}) error {
	s.clearOutTimeCache()
	value := cacheValue{
		key:   key,
		value: kvalue,
		time:  time.Now(),
	}
	s.Cache = append(s.Cache, value)
	return nil
}

把time生成移到包内部,不再由外部传入,以免出现外部传入时未按时间顺序牌序,可能导致缓存清理时保留过多的数据。

集群同步策略

这边采用redis来记录一下最后更新的集群ip。外面包一层服务简单处理一下。

代码语言:javascript
复制
package cacheServer

import (
	"github.com/go-redis/redis"
)

type redisManager struct {
	Name   string
	client *redis.Client
}

type RedisServer interface {
	Get(key string) (string, error)
	Set(key string, value string) error
}

func NewClient(name string, addr string, password string, db int) RedisServer {
	client := redis.NewClient(&redis.Options{
		Addr:     addr,
		Password: password,
		DB:       db,
	})
	return &redisManager{
		Name:   name,
		client: client,
	}
}

func (s *redisManager) Get(key string) (string, error) {
	return s.client.Get(key).Result()
}

func (s *redisManager) Set(key string, value string) error {
	return s.client.Set(key, value, 0).Err()
}

同样我们的cacheServer在Get,Set方法中要加入对redis的读写,key取缓存服务名即可。

当set的时候,在redis中写入最后更新的缓存的IP,在get的时候,根据查出的ip更新缓存。

代码语言:javascript
复制
func (s *cache) SetLastWriterToRedis() error {
	return s.RedisClient.Set(s.Name, s.LocalIp)
}
代码语言:javascript
复制
func (s *cache) GetLastWriterFromRedis() {
	IP, err := s.RedisClient.Get(s.Name)
	if err == nil {
		if IP != s.LocalIp {
			resp, err := http.Get(IP + "/syncCache")
			if err != nil {
				return
			}
			defer resp.Body.Close()
			body, _ := ioutil.ReadAll(resp.Body)
			var res []cacheValue
			json.Unmarshal([]byte(body), &res)
			s.Cache = res
		}
	}
}

相应的在New我们的CacheServer的时候需要注入redisClinent实例,这里不传入redis参数在包内建立redis连接,因为可能会存在一个服务建立多个缓存服务的情况,不用在每个缓存服务里分别去新建连接。

代码语言:javascript
复制
func NewCache(name string, timeout time.Duration, localIp string, RedisClient RedisServer) CacheServer {
	return &cache{
		Name:        name,
		Cache:       make([]cacheValue, 0),
		Timeout:     timeout,
		RedisClient: RedisClient,
		LocalIp:     localIp,
	}
}

同样我们在每次Get前先从redis get一下,在set之后也调用一下redis里的set,这里make切片的地方也做了一下修改,原来1的话会多出一条空的数据。需要注意的是get的时候也更新了缓存,所以也需要set一下redis。

代码语言:javascript
复制
func (s *cache) GetAll() ([]cacheValue, error) {
	s.GetLastWriterFromRedis()
	s.clearOutTimeCache()
	return s.Cache, nil
}

func (s *cache) Get(key string) ([]cacheValue, error) {
	s.GetLastWriterFromRedis()
	s.clearOutTimeCache()
	rlt := make([]cacheValue, 0)
	newCache := make([]cacheValue, 0)
	for i := 0; i < len(s.Cache); i++ {
		if s.Cache[i].Key == key {
			rlt = append(rlt, s.Cache[i])
		} else {
			newCache = append(newCache, s.Cache[i])
		}
	}
	s.Cache = newCache
	s.SetLastWriterToRedis()
	return rlt, nil
}

func (s *cache) Set(key string, kvalue interface{}) error {
	s.GetLastWriterFromRedis()
	s.clearOutTimeCache()
	value := cacheValue{
		Key:   key,
		Value: kvalue,
		Time:  time.Now(),
	}
	s.Cache = append(s.Cache, value)
	s.SetLastWriterToRedis()
	return nil
}

开始测试!

缓存代码写完了那么接下来进行测试。直接把main贴上来,这边我们New两个Cache服务注册在两个端口上,模拟两个服务。

模拟消息全部发送在8080端口上,然后访问8081端口的接口,看看数据有没有正确同步。

代码语言:javascript
复制
package main

import (
	"cacheServer/cacheServer"
	"encoding/json"
	"fmt"
	"math/rand"
	"net/http"
	"strconv"
	"time"
)

func main() {
	RedisClient := cacheServer.NewClient("localhost:8080", "localhost:6379", "", 0)
	MessageCache1 := cacheServer.NewCache("MessageCache", time.Second*5, "http://localhost:8080", RedisClient)
	MessageCache2 := cacheServer.NewCache("MessageCache", time.Second*5, "http://localhost:8081", RedisClient)
	go Start(":8080", MessageCache1)
	go Start(":8081", MessageCache2)
	go MockWebSocketMessage(MessageCache1)
	select {}
}

func MockWebSocketMessage(cache cacheServer.CacheServer) {
	rand.Seed(time.Now().UnixNano())
	var i int = 0
	for {
		time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
		var receiver string
		if i%2 == 0 {
			receiver = "HYC"
		} else {
			receiver = "ZMN"
		}
		message := "send message " + strconv.Itoa(i) + " times"
		if err := cache.Set(receiver, message); err != nil {
			fmt.Printf("cache Set Error:%s \n", err)
		}
		i = i + 1
	}
}

func Start(Port string, cache cacheServer.CacheServer) error {
	mux := http.NewServeMux()
	mux.HandleFunc("/", health)
	mux.HandleFunc("/syncCache", syncCache(cache))
	mux.HandleFunc("/getCache", getCache(cache))
	svr := &http.Server{Addr: Port, Handler: mux}
	err := svr.ListenAndServe()
	return err
}

func health(w http.ResponseWriter, r *http.Request) {
	fmt.Fprintf(w, "server work")
}

func getCache(cache cacheServer.CacheServer) func(w http.ResponseWriter, r *http.Request) {
	return func(w http.ResponseWriter, r *http.Request) {
		key := GetUrlArg(r, "key")
		value, err := cache.Get(key)
		if err != nil {
			fmt.Printf("cache Get Error:%s \n", err)
		}
		fmt.Fprintf(w, "Cache Key:%s,%v \n", key, value)
	}
}

func syncCache(cache cacheServer.CacheServer) func(w http.ResponseWriter, r *http.Request) {
	return func(w http.ResponseWriter, r *http.Request) {
		value, err := cache.GetAll()
		if err != nil {
			fmt.Printf("cache GetAll Error:%s \n", err)
		}
		res, err := json.Marshal(value)
		if err != nil {
			fmt.Printf("cache json Error:%s \n", err)
		}
		w.Write(res)
	}
}

func GetUrlArg(r *http.Request, name string) string {
	var arg string
	values := r.URL.Query()
	arg = values.Get(name)
	return arg
}

当然我们也没有真的去启动一个redis服务。直接mock掉了redis的get set方法。因此在8081服务get之后,缓存不会同步回8080服务。

代码语言:javascript
复制
func (s *redisManager) Get(key string) (string, error) {
	return "http://localhost:8080", nil
	//return s.client.Get(key).Result()
}

func (s *redisManager) Set(key string, value string) error {
	return nil
	//return s.client.Set(key, value, 0).Err()
}
同步的真不戳
同步的真不戳

好那么缓存同步也完成了。

TODO

这回加上了缓存同步,那么问题来了,对于咱们的websocket消息缓存来说,真的需要在不同服务间同步缓存吗?可不可以不同步。当然是可以的。我们在消息进来时,就可以根据key对消息进行划分,存进不同的缓存,查找的时候也直接去对应的缓存查找就行了啊。(比如key都是数字id的话,我们有十台服务器集群,那么可以直接id % 10 这样去存缓存)。

第一个问题的提出是表明我们可以去针对业务进行优化。那么第二个问题就是我们现有的cacheService里的get方法,自动清理掉已读内容的操作,又是与业务强耦合的。

我们可以进一步的把业务逻辑从cacheServer中提取出去,然后新增一层业务层来调用cacheServer。这样我们就可以有一层通用的缓存处理逻辑。然后在业务层中去写业务逻辑。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • messageCache定义修改
  • Set 方法修改
  • 集群同步策略
  • 开始测试!
  • TODO
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档