前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink实时写入StarRocks NullPointerException问题解决

Flink实时写入StarRocks NullPointerException问题解决

原创
作者头像
用户6404053
发布2023-05-22 23:38:49
6110
发布2023-05-22 23:38:49
举报
文章被收录于专栏:CatororyCatorory

问题

最近出现很多任务经常跑着跑着就failed了,也不怎么重启,翻了下异常信息如下,大概意思就是进行stream load的时候失败了,然后回滚了下,然后就空指针了

代码语言:javascript
复制
2023-00-00 16:02:28,037 ERROR com.starrocks.data.load.stream.DefaultStreamLoadManager      [] - catch exception, wait rollback 
java.lang.NullPointerException: null
	at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:257) ~[flink-connector-starrocks-1.2.5_flink-1.13_2.11.jar:?]
	at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:113) ~[flink-connector-starrocks-1.2.5_flink-1.13_2.11.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_302]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_302]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_302]
	at java.lang.Thread.run(Thread.java:877) [?:1.8.0_302]
2023-00-00 16:02:28,038 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Sink: Unnamed with job vertex id 3e7208d04a005e77687e0b389f9e3814 (1/1)#10 (346a2afcaa8b762453dd8152cd1915b3) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: java.lang.NullPointerException
	at com.starrocks.data.load.stream.DefaultStreamLoadManager.AssertNotException(DefaultStreamLoadManager.java:337)
	at com.starrocks.data.load.stream.DefaultStreamLoadManager.write(DefaultStreamLoadManager.java:196)
	at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.invoke(StarRocksDynamicSinkFunctionV2.java:155)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:135)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:799)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:586)
	at java.lang.Thread.run(Thread.java:877)
	Suppressed: java.lang.RuntimeException: java.lang.NullPointerException
		at com.starrocks.data.load.stream.DefaultStreamLoadManager.AssertNotException(DefaultStreamLoadManager.java:337)
		at com.starrocks.data.load.stream.DefaultStreamLoadManager.flush(DefaultStreamLoadManager.java:267)
		at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.close(StarRocksDynamicSinkFunctionV2.java:179)
		at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
		at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
		at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:865)
		at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:844)
		at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:757)
		at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:663)
		... 4 more
	Caused by: java.lang.NullPointerException
		at com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:257)
		at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$2(DefaultStreamLoader.java:113)
		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
		... 1 more
Caused by: [CIRCULAR REFERENCE: java.lang.NullPointerException]

原因分析

1.根据经验,NPE的问题一般是数据异常导致的,但是这里没打出来数据,所以无法判断是不是数据问题

2.注意关键字rollback,意思是写入失败了在回滚,问题是StarRocks版本是2.3.x,StarRocks的事务是2.4才支持的,有啥好回滚的,根本不支持,再想一下最近升级了connector版本到1.2.5,另外sink的执行的是StarRocksDynamicSinkFunctionV2。

翻了下文档,这个V2是1.2.4版本增加的事务接口,不支持事务的sr版本默认用的非事务接口,也就是v1,有点不对,我们的sr版本明明不支持事务,为啥还是用了v2版本呢。

源码分析

1.我们找到StarRocks connector的1.2.5版本源码的SinkFunctionFactory.java类,可以看到核心方法是StarRocksDynamicSinkFunctionBase

2.这个方法的第一步是getSinkVersion,从option中判断是v1、v2、还是auto,我们从低版本升级来的,低版本connector完全没有v1、v2的概念,这里肯定是走的auto了

代码语言:java
复制
/** Create sink function according to the configuration. */
public class SinkFunctionFactory {

    private static final Logger LOG = LoggerFactory.getLogger(SinkFunctionFactory.class);

    enum SinkVersion {
        // Implement exactly-once using stream load which has a
        // poor performance. All versions of StarRocks are supported
        V1,
        // Implement exactly-once using transaction load since StarRocks 2.4
        V2,
        // Select sink version automatically according to whether StarRocks
        // supports transaction load
        AUTO
    }

    public static boolean isStarRocksSupportTransactionLoad(StarRocksSinkOptions sinkOptions) {
        String host = ConnectionUtils.selectAvailableHttpHost(
                sinkOptions.getLoadUrlList(), sinkOptions.getConnectTimeout());
        if (host == null) {
            throw new RuntimeException("Can't find an available host in " + sinkOptions.getLoadUrlList());
        }

        String beginUrlStr = "http://" + StreamLoadConstants.getBeginUrl(host);
        HttpPost httpPost = new HttpPost(beginUrlStr);
        httpPost.addHeader(HttpHeaders.AUTHORIZATION,
                StreamLoadUtils.getBasicAuthHeader(sinkOptions.getUsername(), sinkOptions.getPassword()));
        httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build());
        LOG.info("Transaction load probe post {}", httpPost);

        HttpClientBuilder clientBuilder = HttpClients.custom()
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                });

        try (CloseableHttpClient client = clientBuilder.build()) {
            CloseableHttpResponse response = client.execute(httpPost);
            String responseBody = EntityUtils.toString(response.getEntity());
            LOG.info("Transaction load probe response {}", responseBody);

            JSONObject bodyJson = JSON.parseObject(responseBody);
            String status = bodyJson.getString("status");
            String msg = bodyJson.getString("msg");

            // If StarRocks does not support transaction load, FE's NotFoundAction#executePost
            // will be called where you can know how the response json is constructed
            if ("FAILED".equals(status) && "Not implemented".equals(msg)) {
                return false;
            }
            return true;
        } catch (IOException e) {
            String errMsg = "Failed to probe transaction load for " + host;
            LOG.warn("{}", errMsg, e);
            throw new RuntimeException(errMsg, e);
        }
    }

    public static SinkVersion chooseSinkVersionAutomatically(StarRocksSinkOptions sinkOptions) {
        try {
            if (StarRocksSinkSemantic.AT_LEAST_ONCE.equals(sinkOptions.getSemantic())) {
                LOG.info("Choose sink version V2 for at-least-once.");
                return SinkVersion.V2;
            }

            boolean supportTransactionLoad = isStarRocksSupportTransactionLoad(sinkOptions);
            if (supportTransactionLoad) {
                LOG.info("StarRocks supports transaction load, and choose sink version V2");
                return SinkVersion.V2;
            } else {
                LOG.info("StarRocks does not support transaction load, and choose sink version V1");
                return SinkVersion.V1;
            }
        } catch (Exception e) {
            LOG.warn("Can't decide whether StarRocks supports transaction load, and sink version V2 " +
                    "will be used by default. If your StarRocks does not support transaction load, please " +
                    "configure '{}' to 'V1' manually", StarRocksSinkOptions.SINK_VERSION.key());
            return SinkVersion.V2;
        }
    }

    public static SinkVersion getSinkVersion(StarRocksSinkOptions sinkOptions) {
        String sinkTypeOption = sinkOptions.getSinkVersion().trim().toUpperCase();
        SinkVersion sinkVersion;
        if (SinkVersion.V1.name().equals(sinkTypeOption)) {
            sinkVersion = SinkVersion.V1;
        } else if (SinkVersion.V2.name().equals(sinkTypeOption)) {
            sinkVersion = SinkVersion.V2;
        } else if (SinkVersion.AUTO.name().equals(sinkTypeOption)) {
            sinkVersion = chooseSinkVersionAutomatically(sinkOptions);
        } else {
            throw new UnsupportedOperationException("Unsupported sink type " + sinkTypeOption);
        }
        LOG.info("Choose sink version {}", sinkVersion.name());
        return sinkVersion;
    }

    public static <T> StarRocksDynamicSinkFunctionBase<T> createSinkFunction(
            StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer<T> rowTransformer) {
        SinkVersion sinkVersion = getSinkVersion(sinkOptions);
        switch (sinkVersion) {
            case V1:
                return new StarRocksDynamicSinkFunction<>(sinkOptions, schema, rowTransformer);
            case V2:
                return new StarRocksDynamicSinkFunctionV2<>(sinkOptions, schema, rowTransformer);
            default:
                throw new UnsupportedOperationException("Unsupported sink type " + sinkVersion.name());
        }
    }

    public static <T> StarRocksDynamicSinkFunctionBase<T> createSinkFunction(StarRocksSinkOptions sinkOptions) {
        SinkVersion sinkVersion = getSinkVersion(sinkOptions);
        switch (sinkVersion) {
            case V1:
                return new StarRocksDynamicSinkFunction<>(sinkOptions);
            case V2:
                return new StarRocksDynamicSinkFunctionV2<>(sinkOptions);
            default:
                throw new UnsupportedOperationException("Unsupported sink type " + sinkVersion.name());
        }
    }
}

3.auto执行了一个chooseSinkVersionAutomatically方法,然后调用了isStarRocksSupportTransactionLoad判断是否支持事务,这个方法向fe发送一个http请求,根据返回值来判断是否支持事务

4.我们的StarRocks是在云上的,和本地网络不通,debug有点困难,我照着isStarRocksSupportTransactionLoad中的逻辑构造了一个http请求发送给fe,返回值长这样

代码语言:javascript
复制
{"msg":"Not implemented","status":,"FAILED"}

再和代码中对比一下,正常返回的情况下,用msg和status就能判断出当前的服务端不支持事务了,connector判断逻辑是没问题的

代码语言:javascript
复制
    try (CloseableHttpClient client = clientBuilder.build()) {
            CloseableHttpResponse response = client.execute(httpPost);
            String responseBody = EntityUtils.toString(response.getEntity());
            LOG.info("Transaction load probe response {}", responseBody);

            JSONObject bodyJson = JSON.parseObject(responseBody);
            String status = bodyJson.getString("status");
            String msg = bodyJson.getString("msg");

            // If StarRocks does not support transaction load, FE's NotFoundAction#executePost
            // will be called where you can know how the response json is constructed
            if ("FAILED".equals(status) && "Not implemented".equals(msg)) {
                return false;
            }
            return true;
        } catch (IOException e) {
            String errMsg = "Failed to probe transaction load for " + host;
            LOG.warn("{}", errMsg, e);
            throw new RuntimeException(errMsg, e);
        }

5.既然判断逻辑没问题,为啥我们还是走了v2版本呢,再看看发送http请求外层还有个try-catch,如果访问fe失败,就无法判断了,无法判断服务端是否支持事务时默认是使用v2版本来sink,根本原因找到了,就是走了这个默认值所以回滚失败了

解决

原因确定了,解决方案其实很简单,在sink option中加上默认用v1版本就好了。改完代码,重新上线,问题解决。

思考

如果我是StarRocks connector的开发者,有没有更好的办法避免这个问题。想了一下是有的,现在的版本实际上是每次sink触发都会去判断到底用v2还是v1,这个其实很没有必要,设置一个全局变量,在任务启动的时候判断一次是否支持事务决定v1还是v2,以后的sink直接用上面的结论,如果启动时http请求就失败了,直接启动失败,比运行一段时间之后再失败会好一些。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题
  • 原因分析
  • 源码分析
  • 解决
  • 思考
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档