首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >NestJs @Sse -事件仅由一个客户端使用。

NestJs @Sse -事件仅由一个客户端使用。
EN

Stack Overflow用户
提问于 2022-04-06 09:29:35
回答 3查看 1.1K关注 0票数 4

我尝试了nest.js (28-SSE)提供的样例SSE应用程序,并修改了sse端点以发送计数器:

代码语言:javascript
运行
复制
  @Sse('sse')
  sse(): Observable<MessageEvent> {
    return interval(5000).pipe(
      map((_) => ({ data: { hello: `world - ${this.c++}` }} as MessageEvent)),
    );
  }

我希望侦听此SSE的每个客户端都会收到消息,但是当打开多个浏览器选项卡时,我可以看到每条消息只由一个浏览器使用,所以如果打开了三个浏览器,就会得到以下结果:

我怎样才能得到预期的行为呢?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2022-04-10 09:04:06

为了实现您期望的行为,您需要为每个连接创建一个单独的流,并按您的意愿推送数据流。

下面是一个可能的极简解决方案

代码语言:javascript
运行
复制
import { Controller, Get, MessageEvent, OnModuleDestroy, OnModuleInit, Res, Sse } from '@nestjs/common';
import { readFileSync } from 'fs';
import { join } from 'path';
import { Observable, ReplaySubject } from 'rxjs';
import { map } from 'rxjs/operators';
import { Response } from 'express';

@Controller()
export class AppController implements OnModuleInit, OnModuleDestroy {
  private stream: {
    id: string;
    subject: ReplaySubject<unknown>;
    observer: Observable<unknown>;
  }[] = [];
  private timer: NodeJS.Timeout;
  private id = 0;

  public onModuleInit(): void {
    this.timer = setInterval(() => {
      this.id += 1;
      this.stream.forEach(({ subject }) => subject.next(this.id));
    }, 1000);
  }

  public onModuleDestroy(): void {
    clearInterval(this.timer);
  }

  @Get()
  public index(): string {
    return readFileSync(join(__dirname, 'index.html'), 'utf-8').toString();
  }

  @Sse('sse')
  public sse(@Res() response: Response): Observable<MessageEvent> {
    const id = AppController.genStreamId();
    // Clean up the stream when the client disconnects
    response.on('close', () => this.removeStream(id));
    // Create a new stream
    const subject = new ReplaySubject();
    const observer = subject.asObservable();
    this.addStream(subject, observer, id);

    return observer.pipe(map((data) => ({
      id: `my-stream-id:${id}`,
      data: `Hello world ${data}`,
      event: 'my-event-name',
    }) as MessageEvent));
  }

  private addStream(subject: ReplaySubject<unknown>, observer: Observable<unknown>, id: string): void {
    this.stream.push({
      id,
      subject,
      observer,
    });
  }

  private removeStream(id: string): void {
    this.stream = this.stream.filter(stream => stream.id !== id);
  }

  private static genStreamId(): string {
    return Math.random().toString(36).substring(2, 15);
  }
}

您可以为它创建一个单独的服务,并使其更干净,并从不同的地方推送流数据,但作为一个示例,这将导致如下屏幕截图所示的结果。

票数 4
EN

Stack Overflow用户

发布于 2022-04-07 09:37:18

这种行为是正确的。每个SSE连接都是一个专用套接字,由专用服务器进程处理。这样每个客户端就可以接收到不同的数据。这不是一种广播一样的技术。

我怎样才能得到预期的行为呢?

将您想要发送到所有连接客户端的所需值的中央记录(例如,在SQL中)。

然后让每个SSE服务器进程监视或轮询中央记录,并在每次更改时发送一个事件。

票数 1
EN

Stack Overflow用户

发布于 2022-07-27 18:59:24

您只需为同一主题的每个sse连接生成一个新的可观察的

代码语言:javascript
运行
复制
 private events: Subject<MessageEvent> = new Subject();

 constuctor(){
   timer(0, 1000).pipe(takeUntil(this.destroy)).subscribe(async (index: any)=>{
            let event: MessageEvent = { 
                id: index, 
                type: 'test', 
                retry: 30000, 
                data: {index: index}
            } as MessageEvent;
            this.events.next(event);
   });
 }

 @Sse('sse')
 public sse(): Observable<MessageEvent> {
   return this.events.asObservable();
 }

注意:我跳过了控制器代码的其余部分。

致以敬意,

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71764270

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档