我使用axon和spring-boot实现了CQRS+ES应用程序。我使用单独的查询模型和命令模型应用程序。我使用rabbitmq从命令模式发布事件。它工作正常。但是跟踪处理器实现在我的应用程序中是不起作用的。
这是我的查询模型
@SpringBootApplication
public class SeatQueryPart1Application {
public static void main(String[] args) {
SpringApplication.run(SeatQueryPart1Application.class, args);
}
@Bean
public SpringAMQPMessageSource statisticsQueue(Serializer serializer) {
return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)) {
@RabbitListener(exclusive = false, bindings = @QueueBinding(value = @Queue, exchange = @Exchange(value = "ExchangeTypesTests.FanoutExchange", type = ExchangeTypes.FANOUT), key = "orderRoutingKey"))
@Override
public void onMessage(Message arg0, Channel arg1) throws Exception {
super.onMessage(arg0, arg1);
}
};
}
@Autowired
public void conf(EventHandlingConfiguration configuration) {
configuration.registerTrackingProcessor("statistics");
}
}
这是一个事件处理程序类
@ProcessingGroup("statistics")
@Component
public class EventLoggingHandler {
private SeatReservationRepository seatReservationRepo;
public EventLoggingHandler(final SeatReservationRepository e) {
this.seatReservationRepo = e;
}
@EventHandler
protected void on(SeatResurvationCreateEvent event) {
Timestamp timestamp = new Timestamp(System.currentTimeMillis());
Seat seat=new Seat(event.getId(), event.getSeatId(), event.getDate(),timestamp ,true);
seatReservationRepo.save(seat);
}
}
这是yml配置
axon:
eventhandling:
processors:
statistics.source: statisticsQueue
我怎么才能做得对呢?(有没有人能推荐教程或代码示例)
发布于 2018-04-03 15:58:51
SpringAMQPMessageSource是一个SubscribableMessageSource。这意味着不能使用跟踪事件处理器来处理消息。它只与Subscribable事件处理器兼容。
删除configuration.registerTrackingProcessor("statistics");
并将其保留为默认值(订阅)应该可以做到这一点。
https://stackoverflow.com/questions/49593283
复制相似问题