前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go WebSocket + Redis 实现轻量级的订阅和实时消息推送

Go WebSocket + Redis 实现轻量级的订阅和实时消息推送

作者头像
杨永贞
发布2021-01-26 16:18:58
4K0
发布2021-01-26 16:18:58
举报
文章被收录于专栏:独行猫a的沉淀积累总结

上一篇介绍了Golang中封装WebSocket功能,让WebSocket更简单好用和稳定。

这里借助Redis自身的订阅和发布机制和WebSocket结合,实现轻量级的订阅发布和消息推送。本来消息订阅和推送打算用mqtt实现,但是这样还得有一个MqttBroker代理服务器,或采用网上开源的实现,或使用go语言自己实现个mqtt的broker。这都不够轻量级,这里介绍下借助redis的轻量级的实现。

大致框图如下:

涉及实时性和性能相关的服务可以直接在OnMessage里让客户端同后台业务直接交互实现。

关于提高性能的思考,首先看如果是高并发的情况下,瓶颈可能会在哪。

内部的两个redis客户端,一个负责发布,订阅,一个负责接收。当消息量大的情况下未必受用。那么首先负责发布的客户端,可考虑用redis的连接池实现。

消息的发布和订阅,固定为两个事件,一个是OnPublish,一个是OnSubcribe。并定义相关的报文结构如下:

收到的Publish事件,发布消息到Redis:

代码语言:javascript
复制
// 接收到发布消息事件
	c.On("Publish", func(msg string) {
		// 将消息打印到控制台
		fmt.Printf("%s received publish: %s\n", c.Context().ClientIP(), msg)
		pubMsg := websocket.PushMsg{ID: c.ID()}
		err := json.Unmarshal([]byte(msg), &pubMsg)
		if err != nil {
			log.Printf("解析json串错误,err=", err)
			return
		}
		if pubMsg.Type != "pub" {
			log.Println("pub msg type error")
			return
		}
		//发布消息到Redis
		websocket.Publish(pubMsg.Topic, pubMsg.Payload)
	})

收到的订阅事件,发布消息到Redis:

代码语言:javascript
复制
// 接收到订阅的事件
	c.On("Subscribe", func(msg string) {
		// 将消息打印到控制台,c .Context()是iris的http上下文。
		fmt.Printf("%s received subscribe: %s\n", c.Context().ClientIP(), msg)
		subMsg := websocket.SubMsg{ID: c.ID()}
		err := json.Unmarshal([]byte(msg), &subMsg)
		if err != nil {
			log.Printf("解析json串错误,err=", err)
			return
		}
		if pubMsg.Type != "pub" {
			log.Println("pub msg type error")
			return
		}
		//订阅到Redis
		sub.Subscribe(subMsg.Topic, subMsg.ID)
	})

开启一个Redis客户端,负责收到的消息:

代码语言:javascript
复制
func (c *Subscriber) Init(ws *Server) {

	conn := RedisClient.Get()

	c.client = redis.PubSubConn{conn}
	c.Ws = ws
	go func() {
		for {
			log.Println("redis wait...")
			switch res := c.client.Receive().(type) {
			case redis.Message:
				fmt.Printf("receive:%#v\n", res)
				topic := res.Channel
				message := string(res.Data)
				fnSubReceived(c.cbMap, topic, message)
			case redis.Subscription:
				fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
			case error:
				log.Println("error handle", res)
				if IsConnError(res) {
					conn, err := RedisClient.Dial()
					if err != nil {
						log.Printf("err=%s\n", err)
					}
					c.client = redis.PubSubConn{conn}
				}
				continue
			}
		}
	}()

}

附完整实现:

代码语言:javascript
复制
package websocket

import (
	"log"
)

//订阅的消息格式定义
type SubMsg struct {
	ID    string `json:"id"`   //请求ID
	Type  string `json:"type"` //订阅时固定为sub,取消订阅时固定为unsub
	Topic string `json:"topic"`
	Param string `json:"param"`
}

//平台推送消息定义
type PushMsg struct {
	ID      string `json:"id"`
	Type    string `json:"type"` //发布,类型为pub
	Topic   string `json:"topic"`
	Payload string `json:"payload"`
	Result  string `json:"result"`
}

func Publish(topic string, msg string) (interface{}, error) {
	resp, err := Redo("Publish", topic, msg)
	if err != nil {
		log.Println(err)
	}
	return resp, err
}
代码语言:javascript
复制
package websocket

import (
	"errors"
	"fmt"
	"io"
	"log"
	"strings"
	"sync"
	"time"
	//"unsafe"

	"github.com/gomodule/redigo/redis"
)

var (
	// RD redis全局client
	RedisClient *redis.Pool
)

// InitRedis 初始设置
func InitRedis(host string, auth string, db int) error {

	// 连接Redis
	RedisClient = &redis.Pool{
		MaxIdle:     3,
		MaxActive:   4000,
		IdleTimeout: 180 * time.Second,
		Dial: func() (redis.Conn, error) {
			c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db))
			if nil != err {
				return nil, err
			}
			return c, nil
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			if time.Since(t) < time.Minute {
				return nil
			}
			_, err := c.Do("PING")
			return err
		},
	}
	rd := RedisClient.Get()
	defer rd.Close()

	c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db))
	defer c.Close()
	if err != nil {
		fmt.Println("Connect to redis error", err)
		return err
	}
	fmt.Println("Connect to redis ok")
	return nil

}

func IsConnError(err error) bool {
	var needNewConn bool

	if err == nil {
		return false
	}

	if err == io.EOF {
		needNewConn = true
	}
	if strings.Contains(err.Error(), "use of closed network connection") {
		needNewConn = true
	}
	if strings.Contains(err.Error(), "connect: connection refused") {
		needNewConn = true
	}
	if strings.Contains(err.Error(), "connection closed") {
		needNewConn = true
	}
	return needNewConn
}

// 在pool加入TestOnBorrow方法来去除扫描坏连接
func Redo(command string, opt ...interface{}) (interface{}, error) {
	if RedisClient == nil {
		return "", errors.New("error,redis client is null")
	}
	rd := RedisClient.Get()
	defer rd.Close()

	var conn redis.Conn
	var err error
	var maxretry = 3
	var needNewConn bool

	resp, err := rd.Do(command, opt...)
	needNewConn = IsConnError(err)
	if needNewConn == false {
		return resp, err
	} else {
		conn, err = RedisClient.Dial()
	}

	for index := 0; index < maxretry; index++ {
		if conn == nil && index+1 > maxretry {
			return resp, err
		}
		if conn == nil {
			conn, err = RedisClient.Dial()
		}
		if err != nil {
			continue
		}

		resp, err := conn.Do(command, opt...)
		needNewConn = IsConnError(err)
		if needNewConn == false {
			return resp, err
		} else {
			conn, err = RedisClient.Dial()
		}
	}

	conn.Close()
	return "", errors.New("redis error")
}

type SubscribeCallback func(topicMap sync.Map, topic, msg string)

type Subscriber struct {
	client   redis.PubSubConn
	Ws       *Server //websocket
	cbMap    sync.Map
	CallBack interface {
		OnReceive(SubscribeCallback)
	}
}

var fnSubReceived SubscribeCallback

func (c *Subscriber) OnReceive(cb SubscribeCallback) {
	fnSubReceived = cb
}

func (c *Subscriber) Init(ws *Server) {

	conn := RedisClient.Get()

	c.client = redis.PubSubConn{conn}
	c.Ws = ws
	go func() {
		for {
			log.Println("redis wait...")
			switch res := c.client.Receive().(type) {
			case redis.Message:
				fmt.Printf("receive:%#v\n", res)
				topic := res.Channel
				message := string(res.Data)
				fnSubReceived(c.cbMap, topic, message)
			case redis.Subscription:
				fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
			case error:
				log.Println("error handle", res)
				if IsConnError(res) {
					conn, err := RedisClient.Dial()
					if err != nil {
						log.Printf("err=%s\n", err)
					}
					c.client = redis.PubSubConn{conn}
				}
				continue
			}
		}
	}()

}

func (c *Subscriber) Close() {
	err := c.client.Close()
	if err != nil {
		log.Println("redis close error.")
	}
}

func (c *Subscriber) Subscribe(channel interface{}, clientid string) {
	err := c.client.Subscribe(channel)
	if err != nil {
		log.Println("redis Subscribe error.", err)
	}
	c.cbMap.Store(clientid, channel.(string))
}

func (c *Subscriber) PSubscribe(channel interface{}, clientid string) {
	err := c.client.PSubscribe(channel)
	if err != nil {
		log.Println("redis PSubscribe error.", err)
	}

	c.cbMap.Store(clientid, channel.(string))
}
代码语言:javascript
复制
package main

import (
	"fmt"
	"log"
	"sync"
	//"github.com/gin-contrib/cors"
	"encoding/json"
	"github.com/gin-gonic/gin"
	"net/http"
	"strings"
	"websockTest/websocket"
)

func main() {
	ws := websocket.New(websocket.Config{
		ReadBufferSize:  1024,
		WriteBufferSize: 1024,
	})

	ws.OnConnection(handleConnection)

	// 初始化连接redis
	var sub websocket.Subscriber
	err := websocket.InitRedis("127.0.0.1:6379", "", 0)
	if err != nil {
		fmt.Printf("InitRedis error: %s\n", err)
	} else {
		sub.Init(ws)
		//redis client收到的消息分发到websocket
		sub.OnReceive(func(topicMap sync.Map, topic, msg string) {
			fmt.Printf("sub msg received,topic=%s,msg=%s\n", topic, msg)
			topicMap.Range(func(k, v interface{}) bool {
				fmt.Println("topicMap:", k, v)
				if v.(string) == topic {
					conn := sub.Ws.GetConnection(k.(string))
					if conn != nil {
						conn.Write(1, []byte(msg))
					}
				}
				return true
			})
		})

	}

	r := gin.Default()
	//允许跨域
	//config := cors.DefaultConfig()
	//config.AllowOrigins = []string{"http://127.0.0.1:9090"}
	//r.Use(Cors())
	//静态资源
	r.Static("/static", "./static")
	r.LoadHTMLGlob("views/*")
	r.GET("/ws", ws.Handler())
	r.GET("/api/v3/device", ws.Handler())
	r.GET("/test", func(c *gin.Context) {
		c.HTML(http.StatusOK, "test.html", gin.H{
			"title": "this is a test",
		})
	})
	r.Run(":9090")
}

func handleConnection(c websocket.Connection) {
	fmt.Println("client connected,id=", c.ID())
	c.Write(1, []byte("welcome client"))
	// 从浏览器中读取事件
	c.On("chat", func(msg string) {
		// 将消息打印到控制台,c .Context()是iris的http上下文。
		fmt.Printf("%s chat sent: %s\n", c.Context().ClientIP(), msg)
		// 将消息写回客户端消息所有者:
		// c.Emit("chat", msg)
		c.To(websocket.All).Emit("chat", msg)
	})

	c.OnMessage(func(msg []byte) {
		fmt.Println("received msg:", string(msg))
		c.Write(1, []byte("hello aa"))
		c.To(websocket.All).Emit("chat", msg)
	})

	c.OnDisconnect(func() {
		fmt.Println("client Disconnect,id=", c.ID())
	})

	// 接收到发布消息事件
	c.On("Publish", func(msg string) {
		// 将消息打印到控制台,c .Context()是iris的http上下文。
		fmt.Printf("%s received publish: %s\n", c.Context().ClientIP(), msg)
		pubMsg := websocket.PushMsg{ID: c.ID()}
		err := json.Unmarshal([]byte(msg), &pubMsg)
		if err != nil {
			log.Printf("解析json串错误,err=", err)
			return
		}
		if pubMsg.Type != "pub" {
			log.Println("pub msg type error")
			return
		}
		//发布消息到Redis
		websocket.Publish(pubMsg.Topic, pubMsg.Payload)
	})

	// 接收到订阅的事件
	c.On("Subscribe", func(msg string) {
		// 将消息打印到控制台,c .Context()是iris的http上下文。
		fmt.Printf("%s received subscribe: %s\n", c.Context().ClientIP(), msg)
		subMsg := websocket.SubMsg{ID: c.ID()}
		err := json.Unmarshal([]byte(msg), &subMsg)
		if err != nil {
			log.Printf("解析json串错误,err=", err)
			return
		}
		if pubMsg.Type != "pub" {
			log.Println("pub msg type error")
			return
		}
		//订阅到Redis
		sub.Subscribe(subMsg.Topic, subMsg.ID)
	})
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/01/18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis®
腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档