首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >消费者在kafka生产者/消费者重新启动后不会接收消息

消费者在kafka生产者/消费者重新启动后不会接收消息
EN

Stack Overflow用户
提问于 2017-09-12 12:59:34
回答 1查看 1.4K关注 0票数 0

我们有一个生产者,一个消费者和一个分区。使用者/生产者都是spring引导应用程序。消费者应用程序运行在我的本地机器上,而生产者和卡夫卡&动物园管理员在一台远程机器上。

在开发过程中,我重新部署了我的生产者应用程序,并做了一些更改。但在那之后,我的消费者没有收到任何信息。我试着重新启动消费者,但没有运气。问题可以是什么和/或如何解决?

消费者Config:

代码语言:javascript
复制
spring:
  cloud:
    stream:
      defaultBinder: kafka
      bindings:
        input:
          destination: sales
          content-type: application/json
      kafka:
        binder:
          brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
          zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
          defaultZkPort: 2181
          defaultBrokerPort: 9092
server:
  port: 0

生产者Config

代码语言:javascript
复制
cloud:
stream:
  defaultBinder: kafka
  bindings:
    output:
      destination: sales
      content-type: application/json
  kafka:
    binder:
      brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
      zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
      defaultZkPort: 2181
      defaultBrokerPort: 9092

EDIT2

5分钟后,该用户应用程序除以下情况外将死亡:

代码语言:javascript
复制
2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2017-09-12 18:14:47,255  WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-09-12 18:14:47,256  INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel
2017-09-12 18:14:47,258  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-09-14 11:49:17

好吧,看起来已经有一个错误报告了,spring-cloud-stream-binder-kafka声明resetOffset属性没有效果。因此,在使用者上总是以latest的形式请求带有偏移量的消息。

正如在git问题上提到的,唯一的解决办法是通过kafka消费者CLI工具修复这个问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46177081

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档