首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Publisher使用确认

Publisher使用确认
EN

Stack Overflow用户
提问于 2020-06-05 16:17:41
回答 1查看 954关注 0票数 0

我对使用Publisher很感兴趣,在一些生产者中确认我们在一个项目中使用。我试过做一个小的PoC,但它不起作用。据我在文档中看到的,这对于Asyncrhonous Publisher确认来说是可能的,它应该和接下来的更改一样容易:

在application.yml中添加confirmAckChannel并启用errorChannelEnabled属性。

代码语言:javascript
运行
复制
spring.cloud.stream:
  binders:
    rabbitDefault:
      defaultCandidate: false
      type: rabbit
      environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}
 ....
  bindings:    
    testOutput:
      destination: test
      binder: rabbitDefault
      content-type: application/json    
  rabbit.bindings:   
    testOutput.producer:
      confirmAckChannel: "testAck"
      errorChannelEnabled: true

然后是一个由端点触发的简单服务,其中我插入与事件相关的errorChannel头。

代码语言:javascript
运行
复制
@Service
@RequiredArgsConstructor
public class TestService {

    private final TestPublisher testPublisher;

    public void sendMessage() {

        testPublisher.send(addHeaders());
    }

    private Message<Event<TestEvent>> addHeaders() {
        return withPayload(new Event<>(TestEvent.builder().build()))
                .setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")
                .build();
    }
}

然后是RabbitMQ的发行者

代码语言:javascript
运行
复制
@Component
@RequiredArgsConstructor
public class TestPublisher {

    private final MessagingChannels messagingChannels;

    public boolean send(Message<Event<TestEvent>> message) {
        return messagingChannels.test().send(message);
    }
}

其中MessagingChannels被实现为

代码语言:javascript
运行
复制
public interface MessagingChannels {

    @Input("testAck")
    MessageChannel testAck();

    @Input("errorChannelTest")
    MessageChannel testError();


    @Output("testOutput")
    MessageChannel test();
}

之后,我实现了两个监听器,一个用于errorChannelTest输入,另一个用于testAck。

代码语言:javascript
运行
复制
@Slf4j
@Component
@RequiredArgsConstructor
class TestErrorListener {

    @StreamListener("errorChannelTest")
    void onCommandReceived(Event<Message> message) {

        log.info("Message error received: " + message);
    }
}
代码语言:javascript
运行
复制
@Slf4j
@Component
@RequiredArgsConstructor
class TestAckListener {

    @StreamListener("testAck")
    void onCommandReceived(Event<Message> message) {

        log.info("Message ACK received: " + message);
    }
}

然而,在这两个侦听器中,我没有收到RabbitMQ的任何ACK或NACK,事件被正确地发送到RabbitMQ并由交换管理,但是我没有收到来自RabbitMQ的任何响应。

我是不是遗漏了什么?我也检查过这两个属性,但它也不起作用

代码语言:javascript
运行
复制
spring:
  rabbitmq:
    publisher-confirm-type: CORRELATED
    publisher-returns: true

我用的是spring stream 3.0.1.rabbit和spring-云-启动器-流-兔子3.0.1

----EDITED------

这是根据Gary Russell的建议更新的示例工作。

Application.yml

代码语言:javascript
运行
复制
spring.cloud.stream:
  binders:
   rabbitDefault:
      defaultCandidate: false
      type: rabbit
      environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}   
  bindings:   
    testOutput:
      destination: exchange.output.test
      binder: rabbitDefault
      content-type: application/json
    testOutput.producer:
      errorChannelEnabled: true  
  rabbit.bindings:   
    testOutput.producer:
      confirmAckChannel: "testAck"
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true

TestService

代码语言:javascript
运行
复制
@Service
@RequiredArgsConstructor
public class TestService {

    private final TestPublisher testPublisher;

    public void sendMessage() {

        testPublisher.send(addHeaders());
    }

    private Message<Event<TestEvent>> addHeaders(Test test) {
        return withPayload(new Event<>(TestEvent.builder().test(test).build()))
                .build();
    }
}

TestService由下一个简单控制器中的端点触发,以检查此PoC。

代码语言:javascript
运行
复制
@RestController
@RequiredArgsConstructor
public class TestController {

    private final TestService testService;

    @PostMapping("/services/v1/test")
    public ResponseEntity<Object> test(@RequestBody Test test) {

        testService.sendMessage(test);
        return ResponseEntity.ok().build();
    }
}

然后是RabbitMQ的发行者和两个ServiceActivators

代码语言:javascript
运行
复制
@Component
@RequiredArgsConstructor
public class TestPublisher {

    private final MessagingChannels messagingChannels;

    public boolean send(Message<Event<TestEvent>> message) {

        log.info("Message for Testing Publisher confirms sent: " + message);
        return messagingChannels.test().send(message);
    }

    @ServiceActivator(inputChannel = TEST_ACK)
    public void acks(Message<?> ack) {
        log.info("Message ACK received for Test: " + ack);
    }

    @ServiceActivator(inputChannel = TEST_ERROR)
    public void errors(Message<?> error) {
        log.info("Message error for Test received: " + error);
    }
}

其中MessagingChannels被实现为

代码语言:javascript
运行
复制
public interface MessagingChannels {

    @Input("testAck")
    MessageChannel testAck();

    @Input("testOutput.errors")
    MessageChannel testError();


    @Output("testOutput")
    MessageChannel test();
}

这是应用程序的主要部分(我也检查过@EnableIntegration )。

代码语言:javascript
运行
复制
@EnableBinding(MessagingChannels.class)
@SpringBootApplication
@EnableScheduling
public class Main {

    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-06-05 16:58:53

testAck不应该是一个绑定;它应该是一个@ServiceActivator

代码语言:javascript
运行
复制
.setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")

这在这个上下文中是行不通的;错误被发送到一个名为testOutput.errors的通道;同样,这需要一个@ServiceActivator,而不是绑定。

您将errorChannelEnabled放在错误的位置;它是一个常见的生产者属性,而不是特定于兔子的。

代码语言:javascript
运行
复制
@SpringBootApplication
@EnableBinding(Source.class)
public class So62219823Application {

    public static void main(String[] args) {
        SpringApplication.run(So62219823Application.class, args);
    }

    @InboundChannelAdapter(channel = "output")
    public String source() {
        return "foo";
    }

    @ServiceActivator(inputChannel = "acks")
    public void acks(Message<?> ack) {
        System.out.println("Ack: " + ack);
    }

    @ServiceActivator(inputChannel = "output.errors")
    public void errors(Message<?> error) {
        System.out.println("Error: " + error);
    }

}
代码语言:javascript
运行
复制
spring:
  cloud:
    stream:
      bindings:
        output:
          producer:
            error-channel-enabled: true
      rabbit:
        bindings:
          output:
            producer:
              confirm-ack-channel: acks
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62219823

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档