// type messageCache struct {
// receiver string
// message string
// time time.Time
// }
type cacheValue struct {
key string
value interface{}
time time.Time
}
把messageCache修改为更松散的key-value的结构,并且不再限定value的类型。
func (s *cache) Set(key string, kvalue interface{}) error {
s.clearOutTimeCache()
value := cacheValue{
key: key,
value: kvalue,
time: time.Now(),
}
s.Cache = append(s.Cache, value)
return nil
}
把time生成移到包内部,不再由外部传入,以免出现外部传入时未按时间顺序牌序,可能导致缓存清理时保留过多的数据。
这边采用redis来记录一下最后更新的集群ip。外面包一层服务简单处理一下。
package cacheServer
import (
"github.com/go-redis/redis"
)
type redisManager struct {
Name string
client *redis.Client
}
type RedisServer interface {
Get(key string) (string, error)
Set(key string, value string) error
}
func NewClient(name string, addr string, password string, db int) RedisServer {
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
})
return &redisManager{
Name: name,
client: client,
}
}
func (s *redisManager) Get(key string) (string, error) {
return s.client.Get(key).Result()
}
func (s *redisManager) Set(key string, value string) error {
return s.client.Set(key, value, 0).Err()
}
同样我们的cacheServer在Get,Set方法中要加入对redis的读写,key取缓存服务名即可。
当set的时候,在redis中写入最后更新的缓存的IP,在get的时候,根据查出的ip更新缓存。
func (s *cache) SetLastWriterToRedis() error {
return s.RedisClient.Set(s.Name, s.LocalIp)
}
func (s *cache) GetLastWriterFromRedis() {
IP, err := s.RedisClient.Get(s.Name)
if err == nil {
if IP != s.LocalIp {
resp, err := http.Get(IP + "/syncCache")
if err != nil {
return
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
var res []cacheValue
json.Unmarshal([]byte(body), &res)
s.Cache = res
}
}
}
相应的在New我们的CacheServer的时候需要注入redisClinent实例,这里不传入redis参数在包内建立redis连接,因为可能会存在一个服务建立多个缓存服务的情况,不用在每个缓存服务里分别去新建连接。
func NewCache(name string, timeout time.Duration, localIp string, RedisClient RedisServer) CacheServer {
return &cache{
Name: name,
Cache: make([]cacheValue, 0),
Timeout: timeout,
RedisClient: RedisClient,
LocalIp: localIp,
}
}
同样我们在每次Get前先从redis get一下,在set之后也调用一下redis里的set,这里make切片的地方也做了一下修改,原来1的话会多出一条空的数据。需要注意的是get的时候也更新了缓存,所以也需要set一下redis。
func (s *cache) GetAll() ([]cacheValue, error) {
s.GetLastWriterFromRedis()
s.clearOutTimeCache()
return s.Cache, nil
}
func (s *cache) Get(key string) ([]cacheValue, error) {
s.GetLastWriterFromRedis()
s.clearOutTimeCache()
rlt := make([]cacheValue, 0)
newCache := make([]cacheValue, 0)
for i := 0; i < len(s.Cache); i++ {
if s.Cache[i].Key == key {
rlt = append(rlt, s.Cache[i])
} else {
newCache = append(newCache, s.Cache[i])
}
}
s.Cache = newCache
s.SetLastWriterToRedis()
return rlt, nil
}
func (s *cache) Set(key string, kvalue interface{}) error {
s.GetLastWriterFromRedis()
s.clearOutTimeCache()
value := cacheValue{
Key: key,
Value: kvalue,
Time: time.Now(),
}
s.Cache = append(s.Cache, value)
s.SetLastWriterToRedis()
return nil
}
缓存代码写完了那么接下来进行测试。直接把main贴上来,这边我们New两个Cache服务注册在两个端口上,模拟两个服务。
模拟消息全部发送在8080端口上,然后访问8081端口的接口,看看数据有没有正确同步。
package main
import (
"cacheServer/cacheServer"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"strconv"
"time"
)
func main() {
RedisClient := cacheServer.NewClient("localhost:8080", "localhost:6379", "", 0)
MessageCache1 := cacheServer.NewCache("MessageCache", time.Second*5, "http://localhost:8080", RedisClient)
MessageCache2 := cacheServer.NewCache("MessageCache", time.Second*5, "http://localhost:8081", RedisClient)
go Start(":8080", MessageCache1)
go Start(":8081", MessageCache2)
go MockWebSocketMessage(MessageCache1)
select {}
}
func MockWebSocketMessage(cache cacheServer.CacheServer) {
rand.Seed(time.Now().UnixNano())
var i int = 0
for {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
var receiver string
if i%2 == 0 {
receiver = "HYC"
} else {
receiver = "ZMN"
}
message := "send message " + strconv.Itoa(i) + " times"
if err := cache.Set(receiver, message); err != nil {
fmt.Printf("cache Set Error:%s \n", err)
}
i = i + 1
}
}
func Start(Port string, cache cacheServer.CacheServer) error {
mux := http.NewServeMux()
mux.HandleFunc("/", health)
mux.HandleFunc("/syncCache", syncCache(cache))
mux.HandleFunc("/getCache", getCache(cache))
svr := &http.Server{Addr: Port, Handler: mux}
err := svr.ListenAndServe()
return err
}
func health(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "server work")
}
func getCache(cache cacheServer.CacheServer) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
key := GetUrlArg(r, "key")
value, err := cache.Get(key)
if err != nil {
fmt.Printf("cache Get Error:%s \n", err)
}
fmt.Fprintf(w, "Cache Key:%s,%v \n", key, value)
}
}
func syncCache(cache cacheServer.CacheServer) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
value, err := cache.GetAll()
if err != nil {
fmt.Printf("cache GetAll Error:%s \n", err)
}
res, err := json.Marshal(value)
if err != nil {
fmt.Printf("cache json Error:%s \n", err)
}
w.Write(res)
}
}
func GetUrlArg(r *http.Request, name string) string {
var arg string
values := r.URL.Query()
arg = values.Get(name)
return arg
}
当然我们也没有真的去启动一个redis服务。直接mock掉了redis的get set方法。因此在8081服务get之后,缓存不会同步回8080服务。
func (s *redisManager) Get(key string) (string, error) {
return "http://localhost:8080", nil
//return s.client.Get(key).Result()
}
func (s *redisManager) Set(key string, value string) error {
return nil
//return s.client.Set(key, value, 0).Err()
}
好那么缓存同步也完成了。
这回加上了缓存同步,那么问题来了,对于咱们的websocket消息缓存来说,真的需要在不同服务间同步缓存吗?可不可以不同步。当然是可以的。我们在消息进来时,就可以根据key对消息进行划分,存进不同的缓存,查找的时候也直接去对应的缓存查找就行了啊。(比如key都是数字id的话,我们有十台服务器集群,那么可以直接id % 10 这样去存缓存)。
第一个问题的提出是表明我们可以去针对业务进行优化。那么第二个问题就是我们现有的cacheService里的get方法,自动清理掉已读内容的操作,又是与业务强耦合的。
我们可以进一步的把业务逻辑从cacheServer中提取出去,然后新增一层业务层来调用cacheServer。这样我们就可以有一层通用的缓存处理逻辑。然后在业务层中去写业务逻辑。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。