首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >SSE 服务端消息推送

SSE 服务端消息推送

作者头像
默存
发布2022-12-03 12:03:04
发布2022-12-03 12:03:04
2.3K0
举报
文章被收录于专栏:默存默存

SSE(Server-sent events)

SSE 它是基于 HTTP 协议的,一般意义上的 HTTP 协议是无法做到服务端主动向客户端推送消息的。有一种变通方法,就是服务器向客户端声明,发送的是流信息,本质上,这种通信就是以流信息的方式。

SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是 text/event-stream 类型的数据流信息,在有数据变更时从服务器流式传输到客户端。

SSE 与 WebSocket 作用相似,都可以建立服务端与浏览器之间的通信,实现服务端向客户端推送消息,两者区别:

  • SSE 是基于 HTTP 协议的,不需要特殊的协议或服务器实现即可工作,WebSocket 需单独服务器来处理协议;
  • SSE 单向通信,只能由服务端向客户端单向通信,webSocket 全双工通信,即通信的双方可以同时发送和接受信息。
  • SSE 实现简单开发成本低,无需引入其他组件,WebSocket 传输数据需做二次解析,开发门槛高一些。
  • SSE 默认支持断线重连,WebSocket 则需要自己实现。
  • SSE 只能传送文本消息,二进制数据需要经过编码后传送,WebSocket 默认支持传送二进制数据。

SSE 具有 WebSockets 在设计上缺乏的多种功能,例如:自动重新连接、事件 ID 和发送任意事件的能力。

编码

1.SseEmitterUtils

代码语言:javascript
复制
package com.demo.utils; 
 
import cn.hutool.core.map.MapUtil; 
import lombok.extern.slf4j.Slf4j; 
import org.springframework.http.MediaType; 
import org.springframework.stereotype.Component; 
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; 
 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.Map; 
import java.util.Set; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.function.Consumer; 
 
/** 
 * @ClassName:SseEmitterUtils.java
 * @ClassPath:com.demo.utils.SseEmitterUtils.java
 * @Description:SSE 服务器发送事件 
 * @Author:tanyp
 * @Date:2022/9/13 11:03
 **/ 
@Slf4j 
@Component 
public class SseEmitterUtils { 
 
    // 当前连接数 
    private static AtomicInteger count = new AtomicInteger(0); 
    // 存储 SseEmitter 信息 
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>(); 
 
    /** 
     * @MonthName:connect
     * @Description: 创建用户连接并返回 SseEmitter
     * @Author:tanyp
     * @Date:2022/9/13 11:09
     * @Param: [userId] 
     * @return:org.springframework.web.servlet.mvc.method.annotation.SseEmitter
     **/ 
    public static SseEmitter connect(String key) { 
        if (sseEmitterMap.containsKey(key)) { 
            return sseEmitterMap.get(key); 
        } 
 
        try { 
            // 设置超时时间,0表示不过期。默认30秒 
            SseEmitter sseEmitter = new SseEmitter(0L); 
            // 注册回调 
            sseEmitter.onCompletion(completionCallBack(key)); 
            sseEmitter.onError(errorCallBack(key)); 
            sseEmitter.onTimeout(timeoutCallBack(key)); 
            sseEmitterMap.put(key, sseEmitter); 
            // 数量+1 
            count.getAndIncrement(); 
            return sseEmitter; 
        } catch (Exception e) { 
            log.info("创建新的SSE连接异常,当前连接Key为:{}", key); 
        } 
        return null; 
    } 
 
    /** 
     * @MonthName:sendMessage
     * @Description: 给指定用户发送消息 
     * @Author:tanyp
     * @Date:2022/9/13 11:10 
     * @Param: [userId, message] 
     * @return:void
     **/ 
    public static void sendMessage(String key, String message) { 
        if (sseEmitterMap.containsKey(key)) { 
            try { 
                sseEmitterMap.get(key).send(message); 
            } catch (IOException e) { 
                log.error("用户[{}]推送异常:{}", key, e.getMessage()); 
                remove(key); 
            } 
        } 
    } 
 
    /** 
     * @MonthName:groupSendMessage
     * @Description: 向同组人发布消息,要求:key + groupId
     * @Author:tanyp
     * @Date:2022/9/13 11:15
     * @Param: [groupId, message] 
     * @return:void
     **/ 
    public static void groupSendMessage(String groupId, String message) { 
        if (MapUtils.isNotEmpty(sseEmitterMap)) { 
            sseEmitterMap.forEach((k, v) -> { 
                try { 
                    if (k.startsWith(groupId)) { 
                        v.send(message, MediaType.APPLICATION_JSON); 
                    } 
                } catch (IOException e) { 
                    log.error("用户[{}]推送异常:{}", k, e.getMessage()); 
                    remove(k); 
                } 
            }); 
        } 
    } 
 
    /** 
     * @MonthName:batchSendMessage
     * @Description: 广播群发消息 
     * @Author:tanyp
     * @Date:2022/9/13 11:15
     * @Param: [message] 
     * @return:void
     **/ 
    public static void batchSendMessage(String message) { 
        sseEmitterMap.forEach((k, v) -> { 
            try { 
                v.send(message, MediaType.APPLICATION_JSON); 
            } catch (IOException e) { 
                log.error("用户[{}]推送异常:{}", k, e.getMessage()); 
                remove(k); 
            } 
        }); 
    } 
 
    /** 
     * @MonthName:batchSendMessage
     * @Description: 群发消息 
     * @Author:tanyp
     * @Date:2022/9/13 11:16
     * @Param: [message, ids] 
     * @return:void
     **/ 
    public static void batchSendMessage(String message, Set<String> ids) { 
        ids.forEach(userId -> sendMessage(userId, message)); 
    } 
 
    /** 
     * @MonthName:remove
     * @Description: 移除连接 
     * @Author:tanyp
     * @Date:2022/9/13 11:17
     * @Param: [userId] 
     * @return:void
     **/ 
    public static void remove(String key) { 
        sseEmitterMap.remove(key); 
        // 数量-1 
        count.getAndDecrement(); 
        log.info("移除连接:{}", key); 
    } 
 
    /** 
     * @MonthName:getIds
     * @Description: 获取当前连接信息 
     * @Author:tanyp
     * @Date:2022/9/13 11:17
     * @Param: [] 
     * @return:java.util.List<java.lang.String> 
     **/ 
    public static List<String> getIds() { 
        return new ArrayList<>(sseEmitterMap.keySet()); 
    } 
 
    /** 
     * @MonthName:getUserCount
     * @Description: 获取当前连接数量 
     * @Author:tanyp
     * @Date:2022/9/13 11:18
     * @Param: [] 
     * @return:int
     **/ 
    public static int getCount() { 
        return count.intValue(); 
    } 
 
    private static Runnable completionCallBack(String key) { 
        return () -> { 
            log.info("结束连接:{}", key); 
            remove(key); 
        }; 
    } 
 
    private static Runnable timeoutCallBack(String key) { 
        return () -> { 
            log.info("连接超时:{}", key); 
            remove(key); 
        }; 
    } 
 
    private static Consumer<Throwable> errorCallBack(String key) { 
        return throwable -> { 
            log.info("连接异常:{}", key); 
            remove(key); 
        }; 
    } 
 
} 
 

2.服务端

代码语言:javascript
复制
package com.demo.controller; 
 
import com.demo.utils.SseEmitterUtils; 
import io.swagger.annotations.Api; 
import io.swagger.annotations.ApiOperation; 
import lombok.extern.slf4j.Slf4j; 
import org.springframework.http.MediaType; 
import org.springframework.web.bind.annotation.GetMapping; 
import org.springframework.web.bind.annotation.PathVariable; 
import org.springframework.web.bind.annotation.RequestMapping; 
import org.springframework.web.bind.annotation.RestController; 
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; 
 
import javax.servlet.http.HttpServletRequest; 
 
/** 
 * @ClassName:SSEController.java
 * @ClassPath:com.demo.controller.SSEController.java
 * @Description:SSE消息推送 
 * @Author:tanyp
 * @Date:2022/9/13 11:29
 **/ 
@Slf4j 
@RestController 
@RequestMapping("/sse") 
@Api(value = "sse", tags = "SSE消息推送") 
public class SSEController { 
 
    @ApiOperation(value = "订阅消息", notes = "订阅消息") 
    @GetMapping(path = "subscribe/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE}) 
    public SseEmitter subscribe(@PathVariable String id) { 
        return SseEmitterUtils.connect(id); 
    } 
 
    @ApiOperation(value = "发布消息", notes = "发布消息") 
    @GetMapping(path = "push") 
    public void push(String id, String content) { 
        SseEmitterUtils.sendMessage(id, content); 
    } 
 
    @ApiOperation(value = "清除连接", notes = "清除连接") 
    @GetMapping(path = "close") 
    public void close(String id, HttpServletRequest request) { 
        request.startAsync(); 
        SseEmitterUtils.remove(id); 
    } 
 
} 

3.浏览器端

代码语言:javascript
复制
<!DOCTYPE html> 
<html lang="en"> 
    <head> 
        <title>SSE</title> 
        <meta charset="UTF-8"> 
        <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script> 
        <script> 
            let source = null; 
            const id = "k000001"; 
            if (window.EventSource) { 
                // 建立连接 
                source = new EventSource('http://localhost:8000/sse/subscribe/' + id); 
                setMessageInnerHTML("连接key:" + id); 
                /** 
                 * 连接一旦建立,就会触发open事件 
                 * 另一种写法:source.onopen = function (event) {} 
                 */ 
                source.addEventListener('open', function (e) { 
                    setMessageInnerHTML("建立连接。。。"); 
                }, false); 
     
                /** 
                 * 客户端收到服务器发来的数据 
                 * 另一种写法:source.onmessage = function (event) {} 
                 */ 
                source.addEventListener('message', function (e) { 
                    setMessageInnerHTML(e.data); 
                }); 
     
                /** 
                 * 如果发生通信错误(比如连接中断),就会触发error事件 
                 * 另一种写法:source.onerror = function (event) {} 
                 */ 
                source.addEventListener('error', function (e) { 
                    if (e.readyState === EventSource.CLOSED) { 
                        setMessageInnerHTML("连接关闭"); 
                    } else { 
                        console.log(e); 
                    } 
                }, false); 
            } else { 
                setMessageInnerHTML("浏览器不支持SSE"); 
            } 
     
            // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据 
            window.onbeforeunload = function () { 
                source.close(); 
                const httpRequest = new XMLHttpRequest(); 
                httpRequest.open('GET', 'http://localhost:8000/sse/close/' + id, true); 
                httpRequest.send(); 
                console.log("close"); 
            }; 
     
            // 将消息显示在网页上 
            function setMessageInnerHTML(innerHTML) { 
                $("#contentDiv").append("<br/>" + innerHTML); 
            } 
        </script> 
    </head> 
     
    <body> 
        <div> 
            <div> 
                <div id="contentDiv" style="height:800px; width:1000px; overflow:scroll; background:#ccc;"> 
                </div> 
            </div> 
        </div> 
    </body> 
</html> 

注:SSE 是基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-09-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 全栈客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SSE(Server-sent events)
  • 编码
    • 1.SseEmitterUtils
    • 2.服务端
    • 3.浏览器端
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档