上一篇文章我们讲到了,join页面前端和后端建立了websocket连接,在建立连接的同时我们需要做其他几件事,首先启动一个检测心跳的服务,然后需要启动一个管理消息广播的服务。为了理解这个文章的主要内容,我先画了个图,方面大家了解接下来要干什么。
从上面的图,需要仔细读才能了解需要干什么,我们从连接websocket开始讲,连接websocket,会把消息传送到全局变量subscribe,然后启动一个监听数据返回给前端的服务,然后当前进程则来接受前端返回的数据。下面我们看下代码:
func (this *WsController) WebSocket() {
username := this.GetString("username")
room := this.GetString("room")
if (len(username)) == 0 {
this.Redirect("/", 302)
return
}
if (len(room)) == 0 {
this.Redirect("/", 302)
return
}
func() {
//如果是同一个房间,名字一样的话,第二次进入会把第一次覆盖
clients.Mu.Lock()
defer clients.Mu.Unlock()
for client := range clients.Client {
if client.room == room && client.username == username {
delete(clients.Client, client)
client.conn.Close()
clients.RoomNum[client.room] = clients.RoomNum[client.room] - 1
msg := client.username + " 掉线了"
client.BroadcastInfo(1, msg)
}
}
//限制房间最大人数,得从新进房
if clients.RoomNum[room] >= 2 {
this.Redirect("/", 302)
return
}
}()
fmt.Println("clients.RoomNum[room]", clients.RoomNum[room])
//创建新的连接
conn, err := websocket.Upgrade(this.Ctx.ResponseWriter, this.Ctx.Request, nil, 1024, 1024)
if err != nil {
log.Println(err)
return
}
//这个是进入房间之后,前端和后端建立好连接,生成一个连接对象,我们这里叫订阅对象
subTmp := &Subscriber{username: username, room: room, conn: conn, messages: make(chan []byte, 5)}
subscribe <- subTmp
go subTmp.writePump()
subTmp.readPump(true)
}
后端会启动两个协程,一个来做全局广播管理以及全局在线人员管理,另外一个协程来监听心跳。代码如下:
心跳检测代码:
func checkClient() {
tick := time.NewTicker(1 * time.Second)
for {
select {
case <-tick.C:
func() {
clients.Mu.Lock()
defer clients.Mu.Unlock()
for client := range clients.Client {
err := client.conn.WriteMessage(websocket.TextMessage, []byte("heartbeat"))
fmt.Println("len(clients)", len(clients.Client))
if err != nil {
client.conn.Close()
delete(clients.Client, client)
clients.RoomNum[client.room] = clients.RoomNum[client.room] - 1
msg := client.username + " 掉线了"
client.BroadcastInfo(1, msg)
}
}
}()
fmt.Println("len(clients)", len(clients.Client))
}
}
}
全局广播管理和全局在线人员管理代码:
//主要是管理广播和进房的全局在线人员管理
func manager() {
clients = WebSocketClient{Client: map[*Subscriber]bool{}, RoomNum: map[string]int{}}
broadcast = make(chan []byte, 10)
subscribe = make(chan *Subscriber)
for {
select {
case tmpClient := <-broadcast:
fmt.Println("broadcast length:", len(broadcast), clients)
func() {
clients.Mu.Lock()
defer clients.Mu.Unlock()
for client := range clients.Client {
clientInfo := make(map[string]string)
json.Unmarshal([]byte(tmpClient), &clientInfo)
if clientInfo["room"] == client.room {
select {
case client.messages <- tmpClient:
default:
fmt.Println("clients", len(clients.Client))
close(client.messages)
delete(clients.Client, client)
clients.RoomNum[client.room] = clients.RoomNum[client.room] - 1
msg := client.username + " 掉线了"
client.BroadcastInfo(1, msg)
}
}
}
}()
case itemClient := <-subscribe:
func() {
clients.Mu.Lock()
defer clients.Mu.Unlock()
clients.Client[itemClient] = true
clients.RoomNum[itemClient.room] = clients.RoomNum[itemClient.room] + 1
}()
}
}
}
好了,核心代码其实和流程都在上面贴出来了,最后我分享一下这个源码。地址在github上。
github地址:
https://github.com/zengzhihai110/my_chat