首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Apache camel-apache kafka集成

Apache camel-apache kafka集成
EN

Stack Overflow用户
提问于 2020-07-01 00:07:41
回答 1查看 274关注 0票数 0

我正在学习如何将kafka与apache camel集成,我遇到了以下错误。我在C:/收件箱文件夹中创建了一个文件,并希望使用kafka使用者使用其中的文本。我使用的是camel.Below 3.1.0版本的代码

代码语言:javascript
运行
复制
package com.javainuse;

import org.apache.camel.builder.RouteBuilder;

public class SimpleRouteBuilder extends RouteBuilder {

@Override
public void configure() throws Exception {

    String topicName = "test123";
    String kafkaServer = "kafka:localhost:9092";
    String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181";
    String serializerClass = "serializerClass=kafka.serializer.StringEncoder";
    String toKafka = "kafka:localhost:9092;kafka:test123?brokers=localhost:9092;zookeeperHost=localhost;zookeeperPort=2181;groupId=group1";

    from("file:C:/inbox?noop=true").to(toKafka);
}
}

下面是我得到的错误

代码语言:javascript
运行
复制
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
org.apache.camel.FailedToStartRouteException: Failed to start route route1 because of Route(route1)[From[file:C:/inbox?noop=true] -> [To[kafka:loc...
    at org.apache.camel.impl.engine.BaseRouteService.warmUp(BaseRouteService.java:133)
    at org.apache.camel.impl.engine.AbstractCamelContext.doWarmUpRoutes(AbstractCamelContext.java:3246)
    at org.apache.camel.impl.engine.AbstractCamelContext.safelyStartRouteServices(AbstractCamelContext.java:3139)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStartOrResumeRoutes(AbstractCamelContext.java:2925)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:2725)
    at org.apache.camel.impl.engine.AbstractCamelContext.lambda$doStart$2(AbstractCamelContext.java:2527)
    at org.apache.camel.impl.engine.AbstractCamelContext.doWithDefinedClassLoader(AbstractCamelContext.java:2544)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:2525)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2421)
    at com.javainuse.MainApp.main(MainApp.java:12)
Caused by: org.apache.camel.RuntimeCamelException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException(RuntimeCamelException.java:52)
    at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:67)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:87)
    at org.apache.camel.processor.channel.DefaultChannel.doStart(DefaultChannel.java:144)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:73)
    at org.apache.camel.processor.Pipeline.doStart(Pipeline.java:154)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.support.processor.DelegateAsyncProcessor.doStart(DelegateAsyncProcessor.java:78)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.impl.engine.BaseRouteService.startChildService(BaseRouteService.java:339)
    at org.apache.camel.impl.engine.BaseRouteService.doWarmUp(BaseRouteService.java:189)
    at org.apache.camel.impl.engine.BaseRouteService.warmUp(BaseRouteService.java:131)
    ... 10 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:119)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.impl.engine.AbstractCamelContext.internalAddService(AbstractCamelContext.java:1455)
    at org.apache.camel.impl.engine.AbstractCamelContext.addService(AbstractCamelContext.java:1391)
    at org.apache.camel.processor.SendProcessor.doStart(SendProcessor.java:240)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:87)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.doStart(RedeliveryErrorHandler.java:1454)
    at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:60)
    ... 25 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: localhost:9092;zookeeperHost=localhost;zookeeperPort=2181;groupId=group1
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:58)
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:407)
    ... 37 more

Process finished with exit code 0```
EN

回答 1

Stack Overflow用户

发布于 2020-07-01 13:27:30

错误堆栈跟踪表明您的Kafka消费者URI是无效的(位于堆栈跟踪的底部)。确实是这样。

正确的形式是kafka:[topicname]?[options] (请检查Camel-Kafka docs)

因此,当我查看您的URI时,它可能应该是

代码语言:javascript
运行
复制
kafka:test123?brokers=localhost:9092&groupId=group1

您的URI存在以下问题:

camel-kafka It包含2次options

  • Zookeeper is

  • 其中一个kafka:[topicname] is kafka:[brokers], it
  • Semicolons (;)而不是&为旧版本的camel-kafka分隔无效选项,删除它们<代码>H218<代码>F219

顺便说一句:堆栈跟踪顶部的SLF4J: Defaulting to no-operation (NOP) logger implementation行表明您使用了SLF4J日志记录接口,但是您没有向项目中添加任何实现。

如果您使用Maven,您可以添加以下依赖项来将Maven以及Logback as实现添加到您的项目中。

代码语言:javascript
运行
复制
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
</dependency>
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62661631

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档