上一篇介绍了Golang中封装WebSocket功能,让WebSocket更简单好用和稳定。
这里借助Redis自身的订阅和发布机制和WebSocket结合,实现轻量级的订阅发布和消息推送。本来消息订阅和推送打算用mqtt实现,但是这样还得有一个MqttBroker代理服务器,或采用网上开源的实现,或使用go语言自己实现个mqtt的broker。这都不够轻量级,这里介绍下借助redis的轻量级的实现。
大致框图如下:
涉及实时性和性能相关的服务可以直接在OnMessage里让客户端同后台业务直接交互实现。
关于提高性能的思考,首先看如果是高并发的情况下,瓶颈可能会在哪。
内部的两个redis客户端,一个负责发布,订阅,一个负责接收。当消息量大的情况下未必受用。那么首先负责发布的客户端,可考虑用redis的连接池实现。
消息的发布和订阅,固定为两个事件,一个是OnPublish,一个是OnSubcribe。并定义相关的报文结构如下:
收到的Publish事件,发布消息到Redis:
// 接收到发布消息事件
c.On("Publish", func(msg string) {
// 将消息打印到控制台
fmt.Printf("%s received publish: %s\n", c.Context().ClientIP(), msg)
pubMsg := websocket.PushMsg{ID: c.ID()}
err := json.Unmarshal([]byte(msg), &pubMsg)
if err != nil {
log.Printf("解析json串错误,err=", err)
return
}
if pubMsg.Type != "pub" {
log.Println("pub msg type error")
return
}
//发布消息到Redis
websocket.Publish(pubMsg.Topic, pubMsg.Payload)
})
收到的订阅事件,发布消息到Redis:
// 接收到订阅的事件
c.On("Subscribe", func(msg string) {
// 将消息打印到控制台,c .Context()是iris的http上下文。
fmt.Printf("%s received subscribe: %s\n", c.Context().ClientIP(), msg)
subMsg := websocket.SubMsg{ID: c.ID()}
err := json.Unmarshal([]byte(msg), &subMsg)
if err != nil {
log.Printf("解析json串错误,err=", err)
return
}
if pubMsg.Type != "pub" {
log.Println("pub msg type error")
return
}
//订阅到Redis
sub.Subscribe(subMsg.Topic, subMsg.ID)
})
开启一个Redis客户端,负责收到的消息:
func (c *Subscriber) Init(ws *Server) {
conn := RedisClient.Get()
c.client = redis.PubSubConn{conn}
c.Ws = ws
go func() {
for {
log.Println("redis wait...")
switch res := c.client.Receive().(type) {
case redis.Message:
fmt.Printf("receive:%#v\n", res)
topic := res.Channel
message := string(res.Data)
fnSubReceived(c.cbMap, topic, message)
case redis.Subscription:
fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
case error:
log.Println("error handle", res)
if IsConnError(res) {
conn, err := RedisClient.Dial()
if err != nil {
log.Printf("err=%s\n", err)
}
c.client = redis.PubSubConn{conn}
}
continue
}
}
}()
}
附完整实现:
package websocket
import (
"log"
)
//订阅的消息格式定义
type SubMsg struct {
ID string `json:"id"` //请求ID
Type string `json:"type"` //订阅时固定为sub,取消订阅时固定为unsub
Topic string `json:"topic"`
Param string `json:"param"`
}
//平台推送消息定义
type PushMsg struct {
ID string `json:"id"`
Type string `json:"type"` //发布,类型为pub
Topic string `json:"topic"`
Payload string `json:"payload"`
Result string `json:"result"`
}
func Publish(topic string, msg string) (interface{}, error) {
resp, err := Redo("Publish", topic, msg)
if err != nil {
log.Println(err)
}
return resp, err
}
package websocket
import (
"errors"
"fmt"
"io"
"log"
"strings"
"sync"
"time"
//"unsafe"
"github.com/gomodule/redigo/redis"
)
var (
// RD redis全局client
RedisClient *redis.Pool
)
// InitRedis 初始设置
func InitRedis(host string, auth string, db int) error {
// 连接Redis
RedisClient = &redis.Pool{
MaxIdle: 3,
MaxActive: 4000,
IdleTimeout: 180 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db))
if nil != err {
return nil, err
}
return c, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
}
rd := RedisClient.Get()
defer rd.Close()
c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db))
defer c.Close()
if err != nil {
fmt.Println("Connect to redis error", err)
return err
}
fmt.Println("Connect to redis ok")
return nil
}
func IsConnError(err error) bool {
var needNewConn bool
if err == nil {
return false
}
if err == io.EOF {
needNewConn = true
}
if strings.Contains(err.Error(), "use of closed network connection") {
needNewConn = true
}
if strings.Contains(err.Error(), "connect: connection refused") {
needNewConn = true
}
if strings.Contains(err.Error(), "connection closed") {
needNewConn = true
}
return needNewConn
}
// 在pool加入TestOnBorrow方法来去除扫描坏连接
func Redo(command string, opt ...interface{}) (interface{}, error) {
if RedisClient == nil {
return "", errors.New("error,redis client is null")
}
rd := RedisClient.Get()
defer rd.Close()
var conn redis.Conn
var err error
var maxretry = 3
var needNewConn bool
resp, err := rd.Do(command, opt...)
needNewConn = IsConnError(err)
if needNewConn == false {
return resp, err
} else {
conn, err = RedisClient.Dial()
}
for index := 0; index < maxretry; index++ {
if conn == nil && index+1 > maxretry {
return resp, err
}
if conn == nil {
conn, err = RedisClient.Dial()
}
if err != nil {
continue
}
resp, err := conn.Do(command, opt...)
needNewConn = IsConnError(err)
if needNewConn == false {
return resp, err
} else {
conn, err = RedisClient.Dial()
}
}
conn.Close()
return "", errors.New("redis error")
}
type SubscribeCallback func(topicMap sync.Map, topic, msg string)
type Subscriber struct {
client redis.PubSubConn
Ws *Server //websocket
cbMap sync.Map
CallBack interface {
OnReceive(SubscribeCallback)
}
}
var fnSubReceived SubscribeCallback
func (c *Subscriber) OnReceive(cb SubscribeCallback) {
fnSubReceived = cb
}
func (c *Subscriber) Init(ws *Server) {
conn := RedisClient.Get()
c.client = redis.PubSubConn{conn}
c.Ws = ws
go func() {
for {
log.Println("redis wait...")
switch res := c.client.Receive().(type) {
case redis.Message:
fmt.Printf("receive:%#v\n", res)
topic := res.Channel
message := string(res.Data)
fnSubReceived(c.cbMap, topic, message)
case redis.Subscription:
fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
case error:
log.Println("error handle", res)
if IsConnError(res) {
conn, err := RedisClient.Dial()
if err != nil {
log.Printf("err=%s\n", err)
}
c.client = redis.PubSubConn{conn}
}
continue
}
}
}()
}
func (c *Subscriber) Close() {
err := c.client.Close()
if err != nil {
log.Println("redis close error.")
}
}
func (c *Subscriber) Subscribe(channel interface{}, clientid string) {
err := c.client.Subscribe(channel)
if err != nil {
log.Println("redis Subscribe error.", err)
}
c.cbMap.Store(clientid, channel.(string))
}
func (c *Subscriber) PSubscribe(channel interface{}, clientid string) {
err := c.client.PSubscribe(channel)
if err != nil {
log.Println("redis PSubscribe error.", err)
}
c.cbMap.Store(clientid, channel.(string))
}
package main
import (
"fmt"
"log"
"sync"
//"github.com/gin-contrib/cors"
"encoding/json"
"github.com/gin-gonic/gin"
"net/http"
"strings"
"websockTest/websocket"
)
func main() {
ws := websocket.New(websocket.Config{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
})
ws.OnConnection(handleConnection)
// 初始化连接redis
var sub websocket.Subscriber
err := websocket.InitRedis("127.0.0.1:6379", "", 0)
if err != nil {
fmt.Printf("InitRedis error: %s\n", err)
} else {
sub.Init(ws)
//redis client收到的消息分发到websocket
sub.OnReceive(func(topicMap sync.Map, topic, msg string) {
fmt.Printf("sub msg received,topic=%s,msg=%s\n", topic, msg)
topicMap.Range(func(k, v interface{}) bool {
fmt.Println("topicMap:", k, v)
if v.(string) == topic {
conn := sub.Ws.GetConnection(k.(string))
if conn != nil {
conn.Write(1, []byte(msg))
}
}
return true
})
})
}
r := gin.Default()
//允许跨域
//config := cors.DefaultConfig()
//config.AllowOrigins = []string{"http://127.0.0.1:9090"}
//r.Use(Cors())
//静态资源
r.Static("/static", "./static")
r.LoadHTMLGlob("views/*")
r.GET("/ws", ws.Handler())
r.GET("/api/v3/device", ws.Handler())
r.GET("/test", func(c *gin.Context) {
c.HTML(http.StatusOK, "test.html", gin.H{
"title": "this is a test",
})
})
r.Run(":9090")
}
func handleConnection(c websocket.Connection) {
fmt.Println("client connected,id=", c.ID())
c.Write(1, []byte("welcome client"))
// 从浏览器中读取事件
c.On("chat", func(msg string) {
// 将消息打印到控制台,c .Context()是iris的http上下文。
fmt.Printf("%s chat sent: %s\n", c.Context().ClientIP(), msg)
// 将消息写回客户端消息所有者:
// c.Emit("chat", msg)
c.To(websocket.All).Emit("chat", msg)
})
c.OnMessage(func(msg []byte) {
fmt.Println("received msg:", string(msg))
c.Write(1, []byte("hello aa"))
c.To(websocket.All).Emit("chat", msg)
})
c.OnDisconnect(func() {
fmt.Println("client Disconnect,id=", c.ID())
})
// 接收到发布消息事件
c.On("Publish", func(msg string) {
// 将消息打印到控制台,c .Context()是iris的http上下文。
fmt.Printf("%s received publish: %s\n", c.Context().ClientIP(), msg)
pubMsg := websocket.PushMsg{ID: c.ID()}
err := json.Unmarshal([]byte(msg), &pubMsg)
if err != nil {
log.Printf("解析json串错误,err=", err)
return
}
if pubMsg.Type != "pub" {
log.Println("pub msg type error")
return
}
//发布消息到Redis
websocket.Publish(pubMsg.Topic, pubMsg.Payload)
})
// 接收到订阅的事件
c.On("Subscribe", func(msg string) {
// 将消息打印到控制台,c .Context()是iris的http上下文。
fmt.Printf("%s received subscribe: %s\n", c.Context().ClientIP(), msg)
subMsg := websocket.SubMsg{ID: c.ID()}
err := json.Unmarshal([]byte(msg), &subMsg)
if err != nil {
log.Printf("解析json串错误,err=", err)
return
}
if pubMsg.Type != "pub" {
log.Println("pub msg type error")
return
}
//订阅到Redis
sub.Subscribe(subMsg.Topic, subMsg.ID)
})
}