首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如果拓扑中发生异常,请跳过记录

如果拓扑中发生异常,请跳过记录
EN

Stack Overflow用户
提问于 2020-10-02 09:58:32
回答 1查看 852关注 0票数 3

我们正在编写一个Kafka流拓扑,它聚合并实时显示数据。我们希望使显示尽可能健壮-理想情况下,记录记录并继续为任何异常。

根据文档,我们做了几次测试

Kafka流很好地支持处理在生产者或反序列化过程中发生的异常。提供的LogAndContinueExceptionHandler准确地给出了我们想要的行为。但是,我们的主要问题是在处理过程中出现异常(例如在.mapValues().leftJoin()中)。

我们的想法基本上是为了验证先决条件

  1. 在反序列化过程中,如果不满足,抛出一个DeserializationException (以及日志和继续)。
  2. 如果无法执行计算,则检查处理函数以返回默认值(/ by zero error等)。

然而,如果数据中有什么不可预见的东西,异常仍然可能会出现,并且拓扑也会关闭。

Kafka提供了一个UncaughtExceptionHandler,但是它是在线程死后调用的,因此不能用来防止拓扑关闭。

有什么方法可以编写跳过记录的UncaughtExceptionHandler吗?或者换一种机制,跳过处理函数中的try-catch块中的当前记录?

EN

回答 1

Stack Overflow用户

发布于 2020-10-15 13:43:28

我认为最好的解决方案是编写处理操作(例如: Mapper、Filter等),这样就不会抛出任何异常。为此,您可以使用一个包装器对象,它可以是错误的成功(例如:scala中的Either类型)。之后,您可以使用branch()方法获得两个流:一个用于成功记录,另一个用于错误。

下面的代码显示了基本思想:

代码语言:javascript
复制
    public static void main(String[] args) {
        var builder = new StreamsBuilder();
        KStream<Object, Result<Object>> stream = builder.stream("my-topic")
            .map((k, v) -> {
                try {
                    // unsafe operation, i.e that may throw an exception
                    return KeyValue.pair(k, new Success<>(v));
                } catch (Exception e) {
                    return KeyValue.pair(k, new Error<>(e));
                }
            });
        KStream<Object, Result<Object>>[] branch = stream.branch((k, v) -> !v.hasError(), (k, v) -> v.hasError());

        // Handle the success steam
        KStream<Object, Result<Object>> successStream = branch[0];

        // Handle the error steam, e.g:  log errors, write errors to a Dead Letter Queue
        KStream<Object, Result<Object>> errorStream = branch[1];
        
    }

    public interface Result<T> {
        T get() throws Exception;
        Exception exception();
        boolean hasError();
    }

    public static class Success<T> implements Result<T> {

        private final T value;

        public Success(T value) {
            this.value = value;
        }

        @Override
        public T get() throws Exception {
            return value;
        }

        @Override
        public Exception exception() {
            return null;
        }

        @Override
        public boolean hasError() {
            return false;
        }
    }

    public static class Error<T> implements Result<T> {

        private final Exception error;

        public Error(Exception error) {  this.error = error; }

        @Override
        public T get() throws Exception{
            throw error;
        }

        @Override
        public Exception exception() {
            return error;
        }

        @Override
        public boolean hasError() {
            return true;
        }
    }

此外,对于您提到的反序列化异常,项目Azkarra溪流提供了一些可以帮助您的方便的java类(例如。SafeSerdes,DeadLetterTopicExceptionHandler):GitHub

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64169657

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档