首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >springboot2 +webflux + websocket

springboot2 +webflux + websocket
EN

Stack Overflow用户
提问于 2019-01-18 13:46:27
回答 1查看 437关注 0票数 0

我在JDK 11上使用带有Webflux的Spring boot 2。我编写了以下配置类:

代码语言:javascript
运行
复制
@Configuration
public class WebSocketConfiguration {

    @Autowired
    @Bean
    public HandlerMapping webSocketMapping(final MyWebSocketHandler server) {
        final Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/echo", server);

        final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
        mapping.setUrlMap(map);
        return mapping;
    }
    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

和下面的WebSocketHandler方法:

代码语言:javascript
运行
复制
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    return webSocketSession.send(webSocketSession.receive().
            map(msg -> webSocketSession
                    .textMessage("response:jack ->" + msg.getPayloadAsText())));
}

现在,我可以接收我发送的任何内容,例如:

客户端发送:4545

客户端接收:响应:jack ->4545

我想知道在客户端没有给我发消息的情况下,我如何向客户端推送消息,我随时需要推送消息!

如何随时发送自定义消息,而不是使用相同的输入消息进行响应?

EN

回答 1

Stack Overflow用户

发布于 2019-08-09 18:47:37

你可以在我的博客文章http://kojotdev.com/2019/08/spring-webflux-websocket-with-vue-js/中了解到这一点。

您需要将WebSocketHandler更改为:

代码语言:javascript
运行
复制
private final GreetingsPublisher greetingsPublisher;
private final Flux<String> publisher;

public ReactiveWebSocketHandler(GreetingsPublisher greetingsPublisher) {
    this.greetingsPublisher = greetingsPublisher;
    this.publisher = Flux.create(greetingsPublisher).share();
}

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    final Flux<WebSocketMessage> message = publisher
            .map(greetings -> webSocketSession.textMessage(greetings));

    return webSocketSession.send(message);
}

并添加GreetingPublisher

代码语言:javascript
运行
复制
@Component
public class GreetingsPublisher implements Consumer<FluxSink<String>> {
    private static final Logger log = LoggerFactory.getLogger(GreetingsPublisher.class);

    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    private final Executor executor = Executors.newSingleThreadExecutor();

    public boolean push(String greeting) {
        return queue.offer(greeting);
    }

    @Override
    public void accept(FluxSink<String> sink) {
        this.executor.execute(() -> {
            while (true) {
                Try.of(() -> {
                    final String greeting = queue.take();
                    return sink.next(greeting);
                })
                        .onFailure(ex -> log.error("Could not take greeting from queue", ex));

            }
        });
    }
}

它是一个bean,因此无论您在何处注入它并调用push方法,它都将使用WebSocket发送消息。例如:

代码语言:javascript
运行
复制
@Controller
public class GreetingsController {

    private final GreetingsPublisher greetingsPublisher;

    public GreetingsController(GreetingsPublisher greetingsPublisher) {
        this.greetingsPublisher = greetingsPublisher;
    }

    @Bean
    RouterFunction<ServerResponse> pushMessage() {
        return route(GET("/push"),
                request -> {
                    greetingsPublisher.push("Send a new message with WebSocket");
                    return ServerResponse.ok().body(fromObject("websocket message sent"));
                });
    }
}

首先连接WebSocket,打开本地主机:8080/push上的浏览器。消息应该被发送。

请注意,这似乎是Spring Boot 2.1.7的一个bug,我在我的博客文章中提到了它。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54248336

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档