首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >取消用WebFlux创建的SSE流的正确方法

取消用WebFlux创建的SSE流的正确方法
EN

Stack Overflow用户
提问于 2018-03-20 11:19:21
回答 1查看 3.6K关注 0票数 4

首先,这里提供了完整运行的代码示例(客户端和服务器):https://github.com/kklepacz/webflux-cancel-subscription

案例说明:

我有生成数据的遗留系统(我在github示例中使用SampleEmitter来模拟它)。我可以为它提供侦听器,当新数据到达时会通知它。我希望将这些侦听器调用“转换”为数据流。这个流需要是无限的(只要系统正常工作,就会有新的值)。准确地说,它也需要是热源(遵循Reactor项目的术语),所以每当有人订阅时,他只会收到当前的值。

在这里,翻译是怎样的:

代码语言:javascript
运行
复制
class ReactiveRepoImpl implements ReactiveRepo {
private static final Logger log = LoggerFactory.getLogger(ReactiveRepoImpl.class);
private final UnicastProcessor<MyObject> hotProcessor = UnicastProcessor.create();
private final FluxSink<MyObject> fluxSink = hotProcessor.sink(FluxSink.OverflowStrategy.LATEST);
private final Flux<MyObject> hotFlux = hotProcessor.publish().autoConnect();

@Override
public void save(MyObject elem) {
    fluxSink.next(elem);
}

@Override
public Flux<MyObject> findAll() {
    return hotFlux;
}
}

在这一点上,我可以像这样通过RouterFunctions公开它:

代码语言:javascript
运行
复制
@Bean
public RouterFunction routerFunction(ReactiveRepo repo) {
    return RouterFunctions.route(GET("/objects"), serverRequest -> {
        log.info("Subscribing for GET /objects");
        return ok()
                .contentType(MediaType.TEXT_EVENT_STREAM)
                .body(repo.findAll(), MyObject.class);
    });
}

如您所见,我正在这里生成服务器附带事件。

现在让我们使用它:

代码语言:javascript
运行
复制
 WebClient.create("http://localhost:8080")
                .get()
                .uri("/objects")
                .retrieve()
                .bodyToFlux(MyObject.class)
                .subscribe(n -> log.info("Next: {}", n.getId()),
                        e -> log.error("Error: {}", e),
                        () -> log.info("Completed"));

问题是:当我要杀死客户端或调用Disposable.dispose()时,我在服务器应用程序中收到以下错误:

代码语言:javascript
运行
复制
2018-03-20 12:00:50.926 ERROR 11944 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter    : Unhandled failure: An established connection was aborted by the software in your host machine, response already set (status=200)
2018-03-20 12:00:50.941 ERROR 11944 --- [ctor-http-nio-2] o.s.h.s.r.ReactorHttpHandlerAdapter      : Handling completed with error

java.io.IOException: An established connection was aborted by the software in your host machine
at sun.nio.ch.SocketDispatcher.writev0(Native Method) ~[na:1.8.0_144]
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55) ~[na:1.8.0_144]
at sun.nio.ch.IOUtil.write(IOUtil.java:148) ~[na:1.8.0_144]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) ~[na:1.8.0_144]
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:418) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1376) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:533) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at reactor.ipc.netty.channel.ChannelOperationsHandler$PublisherSender.onComplete(ChannelOperationsHandler.java:535) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:122) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:245) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:184) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:59) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:6873) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:461) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:191) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.access$1500(AbstractChannelHandlerContext.java:38) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1129) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]

2018-03-20 12:00:50.957 ERROR 11944 --- [ctor-http-nio-2] r.ipc.netty.channel.ChannelOperations    : [HttpServer] Error processing connection. Requesting close the channel

java.io.IOException: An established connection was aborted by the software in your host machine
at sun.nio.ch.SocketDispatcher.writev0(Native Method) ~[na:1.8.0_144]
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55) ~[na:1.8.0_144]
at sun.nio.ch.IOUtil.write(IOUtil.java:148) ~[na:1.8.0_144]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) ~[na:1.8.0_144]
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:418) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1376) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:533) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at reactor.ipc.netty.channel.ChannelOperationsHandler$PublisherSender.onComplete(ChannelOperationsHandler.java:535) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:122) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:245) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:184) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:59) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:6873) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:461) [reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:191) [reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.access$1500(AbstractChannelHandlerContext.java:38) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1129) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-common-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-common-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) [netty-common-4.1.22.Final.jar:4.1.22.Final]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]

我怎么才能避免呢?怎样才能正确地创建这样的热源流并正确地处理多个客户端断开/流取消和连接/订阅?

给你一个完整的图片为什么我需要这样的功能-我有一个带标签的web应用程序。在一个选项卡上,我显示了从服务器接收到的所有数据。当我切换选项卡,我不希望我的流继续,我想取消订阅和再次订阅时,我将回到这个选项卡。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-04-09 19:47:31

TLDR;

dispose()是从流中取消订阅的正确方法。

完整答案:

我在网上找不到答案,所以我问了源- Spring团队。下面是我创建的一个JIRA:https://jira.spring.io/browse/SPR-16688

因此,根据上面的注释,上面的堆栈跟踪是由于在使用UndertowTomcatNetty时没有涵盖特定于Windows环境的异常。它们不应该发生在Linux/Unix或使用Jetty时。根据JIRA的说法,Spring将在下一个版本中微调这个版本--可能是5.0.6和/或5.1 RC1

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49382861

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档