首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >从Apache Kafka到Apache Kafka的Apache Camel路由出现中断异常

从Apache Kafka到Apache Kafka的Apache Camel路由出现中断异常
EN

Stack Overflow用户
提问于 2020-06-10 15:30:05
回答 2查看 402关注 0票数 0

我有一个正在运行的Apache Kafka消息代理实例,我希望将其用作camel路由的起点/终点。当开始路由时-这似乎工作得很好-我得到了一个InterruptedException,我不知道如何修复它:

代码语言:javascript
运行
复制
09:06:33.840 [Camel (camel-1) thread #1 - KafkaConsumer[OLOG_INBOUND]] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=KafkaTrafoDataRoute, groupId=8563046a-15fa-48fe-858f-cfd68c7b921c] Completed connection to node -1. Fetching API versions.
09:06:33.840 [Camel (camel-1) thread #1 - KafkaConsumer[OLOG_INBOUND]] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=KafkaTrafoDataRoute, groupId=8563046a-15fa-48fe-858f-cfd68c7b921c] Initiating API versions fetch from node -1.
09:06:33.849 [Camel (camel-1) thread #1 - KafkaConsumer[OLOG_INBOUND]] WARN org.apache.camel.component.kafka.KafkaConsumer - Interrupted while consuming OLOG_INBOUND-Thread 0 from kafka topic. Caused by: [org.apache.kafka.common.errors.InterruptException - java.lang.InterruptedException]
org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:504)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:287)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:293)
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:215)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.InterruptedException: null
    ... 16 common frames omitted
09:06:33.850 [Camel (camel-1) thread #1 - KafkaConsumer[OLOG_INBOUND]] INFO org.apache.camel.component.kafka.KafkaConsumer - Unsubscribing OLOG_INBOUND-Thread 0 from topic OLOG_INBOUND
09:06:33.850 [Camel (camel-1) thread #1 - KafkaConsumer[OLOG_INBOUND]] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=KafkaTrafoDataRoute, groupId=8563046a-15fa-48fe-858f-cfd68c7b921c] Unsubscribed all topics or patterns and assigned partitions
09:06:33.850 [Camel (camel-1) thread #1 - KafkaConsumer[OLOG_INBOUND]] DEBUG org.apache.camel.component.kafka.KafkaConsumer - Closing OLOG_INBOUND-Thread 0

该路由是从类似这样的main例程中调用的(注意:我尽量不使用Spring,因为在我的例子中它会导致太多问题):

代码语言:javascript
运行
复制
public static void main(String[] args) {
    CamelContext camelContext = new DefaultCamelContext();

    try {
        camelContext.addRoutes(new MyKafkaRoute());
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    try {
        camelContext.start();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

这是MyKafkaRoute的代码:

代码语言:javascript
运行
复制
public class MyKafkaRoute extends RouteBuilder {

    private String consumerEndpoint = "kafka:TOPIC_NAME?"    //
            + "brokers=server:port"                   //
            + "&clientId=myKafkaRoute";

    private String emitterEndpoint  = "kafka:TOPIC_NAME?"   //
            + "brokers=server:port"                   //
            + "&clientId=myKafkaRoute";

    @Override
    public void configure() throws Exception {

        from(consumerEndpoint) //
                .process(... processing ...) //
                .to(emitterEndpoint) //
                .onException(Exception.class) //
                .useOriginalMessage() //
                .handled(true) //
                .to("stream:out");
    }
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-06-10 21:10:05

我的问题是我自己的错:

在上面的主程序中,我通过以下方式启动消费者

代码语言:javascript
运行
复制
camelContext.start();

但在这条语句之后,我的测试程序立即结束,导致InterruptedExecption。我增强了代码:

代码语言:javascript
运行
复制
try {
    camelContext.start();
} catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

try {
    Thread.sleep(5 * 60 * 1000);
} catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

try {
    camelContext.stop();
} catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

现在我可以看到发送给代理的消息了。

票数 0
EN

Stack Overflow用户

发布于 2020-06-10 15:49:45

看起来(至少)一个端点可能不可用。除此之外,将驼峰路径修剪为from()。... .to()。... .to()以更清楚地了解要发送给谁的内容。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62298056

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档