我必须将mqtt入站适配器的消息转换为Flux。按照Java: MQTT MessageProducerSupport to Flux中Artem Bilan的建议,我使用了toReactivePublisher()。但是,我得到一个错误:
ERROR - Unhandled exception for GenericMessage ... nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=10, headers={mqtt_receivedRetained=false, mqtt_id=51, mqtt_duplicate=false, ..
我的Mqtt客户端工厂和适配器取自spring集成参考https://docs.spring.io/spring-integration/reference/html/mqtt.html#mqtt
@Slf4j
@SpringBootApplication
public class SpringMqttClient {
public static void main(String[] args) throws InterruptedException, MqttException {
new SpringApplicationBuilder(SpringMqttClient.class).run(args);
@Bean
public String topic() {
return "my/test/topic";
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://localhost:1883" });
options.setUserName("SpringClient");
options.setPassword("SpringClient".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("SpringClient", mqttClientFactory(),
topic());
adapter.setCompletionTimeout(10000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public Flux<Message<byte[]>> mqttInFlow(MessageProducerSupport adapter) {
return Flux.from(
IntegrationFlows.from(adapter).transform(p -> p + ", received from MQTT").log().toReactivePublisher());
}
只要我与Spring Integration Reference Guide中的处理程序()交换Bean Flux (),它就可以正常工作;但是我需要一个mqttInFlow,而不只是一个字符串。mqttInFlow() Bean出了什么问题?如何使用toReactivePublisher()方法创建通量?
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
log.info("SpringClient got message: " + message.getPayload());
发布于 2021-10-29 17:24:38
Flux.from(IntegrationFlows.from())
。框架将看不到用于解析和正确处理的IntegrationFlow
。@Bean
必须返回IntegrtionFlow
或toReactivePublisher()
结果。无需额外包装Flux
-您失去了与某个ApllicationContext
.DirectChannel
的连接,如果您使用该@ServiceActivator
并从IntegrationFlow
指向相同的mqttInbound
,您最终将获得该MessageProducerSupport
的两个轮询订阅者:MqttPahoMessageDrivenChannelAdapter
如果您进行被动的、按需的消费,则不能让该up自动启动。setAutoStartup(false)
必须存在。Publsiher<Message<byte[]>>
bean时,只有在那里您才能将其包装到Flux
中,以便进行可能的处理。因此,您的解决方案可能如下所示:
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("SpringClient", mqttClientFactory(),
topic());
adapter.setCompletionTimeout(10000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setAutoStartup(false);
return adapter;
}
@Bean
public Publisher<Message<byte[]>> mqttInFlow(MessageProducerSupport adapter) {
return IntegrationFlows.from(adapter)
.transform(p -> p + ", received from MQTT")
.log()
.toReactivePublisher();
}
然后,您必须在要使用Flux
的目标位置执行以下操作
@Autowired
MqttPahoMessageDrivenChannelAdapter adapter;
@Autowired
Publisher<Message<byte[]>> mqttInFlow;
...
Flux<Message<byte[]>> flux =
Flux.from(mqttInFlow)
.doOnSubscribe(s -> adapter.start())
.doOnTerminate(mqttInbound::stop);
在流应用程序项目中查看一些类似的解决方案:https://github.com/spring-cloud/stream-applications/blob/main/functions/supplier/mqtt-supplier/src/main/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierConfiguration.java
https://stackoverflow.com/questions/69769650
复制相似问题