首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Java Spring MQTT订阅者toReactivePublisher()

Java Spring MQTT订阅者toReactivePublisher()
EN

Stack Overflow用户
提问于 2021-10-29 13:23:25
回答 1查看 80关注 0票数 1

我必须将mqtt入站适配器的消息转换为Flux。按照Java: MQTT MessageProducerSupport to Flux中Artem Bilan的建议,我使用了toReactivePublisher()。但是,我得到一个错误:

代码语言:javascript
运行
复制
ERROR - Unhandled exception for GenericMessage ... nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=10, headers={mqtt_receivedRetained=false, mqtt_id=51, mqtt_duplicate=false, ..

我的Mqtt客户端工厂和适配器取自spring集成参考https://docs.spring.io/spring-integration/reference/html/mqtt.html#mqtt

代码语言:javascript
运行
复制
@Slf4j
@SpringBootApplication
public class SpringMqttClient {

public static void main(String[] args) throws InterruptedException, MqttException {
    new SpringApplicationBuilder(SpringMqttClient.class).run(args);

@Bean
public String topic() {
    return "my/test/topic";
}

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[] { "tcp://localhost:1883" }); 
    options.setUserName("SpringClient");
    options.setPassword("SpringClient".toCharArray());
    factory.setConnectionOptions(options);
    return factory;
}

@Bean
public MessageProducerSupport mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("SpringClient", mqttClientFactory(),
            topic());
    adapter.setCompletionTimeout(10000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    adapter.setOutputChannel(mqttInputChannel());
    return adapter;
}

@Bean
public MessageChannel mqttInputChannel() {
    return new DirectChannel();
}

@Bean
public Flux<Message<byte[]>> mqttInFlow(MessageProducerSupport adapter) {
    return Flux.from(
            IntegrationFlows.from(adapter).transform(p -> p + ", received from MQTT").log().toReactivePublisher());
}

只要我与Spring Integration Reference Guide中的处理程序()交换Bean Flux (),它就可以正常工作;但是我需要一个mqttInFlow,而不只是一个字符串。mqttInFlow() Bean出了什么问题?如何使用toReactivePublisher()方法创建通量?

代码语言:javascript
运行
复制
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            log.info("SpringClient got message: " + message.getPayload());
EN

回答 1

Stack Overflow用户

发布于 2021-10-29 17:24:38

  1. 你不能做Flux.from(IntegrationFlows.from())。框架将看不到用于解析和正确处理的IntegrationFlow@Bean必须返回IntegrtionFlowtoReactivePublisher()结果。无需额外包装Flux -您失去了与某个ApllicationContext.
  2. The DirectChannel的连接,如果您使用该@ServiceActivator并从IntegrationFlow指向相同的mqttInbound,您最终将获得该MessageProducerSupport的两个轮询订阅者:MqttPahoMessageDrivenChannelAdapter如果您进行被动的、按需的消费,则不能让该up自动启动。setAutoStartup(false)必须存在。
  3. 当您获取(注入) Publsiher<Message<byte[]>> bean时,只有在那里您才能将其包装到Flux中,以便进行可能的处理。

因此,您的解决方案可能如下所示:

代码语言:javascript
运行
复制
@Bean
public MessageProducerSupport mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("SpringClient", mqttClientFactory(),
            topic());
    adapter.setCompletionTimeout(10000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    adapter.setAutoStartup(false);
    return adapter;
}


@Bean
public Publisher<Message<byte[]>> mqttInFlow(MessageProducerSupport adapter) {
    return IntegrationFlows.from(adapter)
              .transform(p -> p + ", received from MQTT")
              .log()
              .toReactivePublisher();
}

然后,您必须在要使用Flux的目标位置执行以下操作

代码语言:javascript
运行
复制
@Autowired
MqttPahoMessageDrivenChannelAdapter adapter;

@Autowired
Publisher<Message<byte[]>> mqttInFlow;

...

Flux<Message<byte[]>> flux = 
        Flux.from(mqttInFlow)
            .doOnSubscribe(s -> adapter.start())
            .doOnTerminate(mqttInbound::stop);

在流应用程序项目中查看一些类似的解决方案:https://github.com/spring-cloud/stream-applications/blob/main/functions/supplier/mqtt-supplier/src/main/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierConfiguration.java

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69769650

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档