首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Spring Cloud Bus事件( RemoteApplicationEvent )未发布到Kafka

Spring Cloud Bus事件( RemoteApplicationEvent )未发布到Kafka
EN

Stack Overflow用户
提问于 2019-07-24 17:32:41
回答 1查看 519关注 0票数 1

我正在使用spring cloud bus将事件发布到kafka,以便其他实例可以监听相同的事件。事件正在被触发,但未发布到kafka。我正在使用spring cloud bus和spring cloud stream。

版本: Spring Boot : 2.0,Spring cloud Bus : 2.0.0,Spring Cloud Stream : 2.0.1

application.yml:

代码语言:javascript
运行
复制
server:
  port: 7711
spring:
  application:
    index: ${random.uuid}
  cloud:
    bus:
      enabled: true
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        input:
          destination: EMPLOYEE-TOPIC-DEMO-R1-P1
          group: ali

pom.xml

代码语言:javascript
运行
复制
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

发布事件:

代码语言:javascript
运行
复制
@Autowired
private ApplicationContext context;

@StreamListener(ConsumerStream.INPUT)
public void messageConsumer(@Payload String jsonValue, @Headers MessageHeaders header) {

    try {
        log.info("Enter in Consumer->messageConsumer()");
        final String myUniqueId = context.getId();
        context.publishEvent(new MessagingEventBus(this,myUniqueId,header));
    } catch (Exception e) {
        log.error("Exception caught while processing the request :", e);
    }
}

事件类:

代码语言:javascript
运行
复制
@Slf4j
public class MessagingEventBus extends RemoteApplicationEvent {


    private MessageHeaders header;

    // Must supply a default constructor and getters/setters for deserialization
    public MessagingEventBus() {
    }

    public MessagingEventBus(Object source, String originService, MessageHeaders header) {
        // source is the object that is publishing the event
        // originService is the unique context ID of the publisher
        super(source, originService);
        this.header = header;
    }


}

事件监听器:

代码语言:javascript
运行
复制
@Component
@Slf4j
public class MessagingEventBusListener implements ApplicationListener<MessagingEventBus> {

    @Override
    public void onApplicationEvent(MessagingEventBus messagingEventBus) {
       log.info("Messaging Event Bus Listener called");
    }
}
EN

回答 1

Stack Overflow用户

发布于 2020-03-06 20:29:25

这是总线发送事件的过程。

//create event

  • applicationContext#publishEvent //publish local event

  • BusAutoConfiguration#acceptLocal //bus accept local RemoteApplicationEvent() //
  1. new RemoteApplicationEvent()//创建bus//发布本地bus //bus接受本地事件//哪两步bus//bus会判断该事件是自发的?如果是自身,则发送到出站通道(5 step)
  2. cloudBusOutboundChannel#send

问题是4步判断失败,你可以为事件originService使用BusProperties#getId,它是有效的

代码语言:javascript
运行
复制
@Autowired
BusProperties busProperties;

public void fire(){
    new RemoteApplicationEvent(this, busProperties.getId().......
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57179696

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档