首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >version_conflict_engine_exception与_update_by_query

version_conflict_engine_exception与_update_by_query
EN

Stack Overflow用户
提问于 2020-01-17 10:38:34
回答 1查看 1.6K关注 0票数 1

我在flink中使用了ElasticSearch update,flink的并行性是1。但是我得到了version_conflict_engine_exception,这是我在flink RichSinkFunction中的代码,如下所示:

代码语言:javascript
运行
复制
        UpdateByQueryRequestBuilder builder = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
        builder.abortOnVersionConflict(true);
        builder.source(indexName);
        builder.filter(filter);
        builder.setMaxRetries(MAX_RETRIES);
        builder.refresh(true);

        String updateTime = Instant.ofEpochMilli(ts).atZone(ZoneId.systemDefault())
                .format(ELASTIC_SEARCH_DATE_TIME_FORMATTER);

        Map<String, Object> params = Maps.newHashMap();
        params.put("fieldName", fieldName);
        params.put("updateTime", updateTime);
        params.put("model", this.transformMap(JacksonUtils.convertValue(model, new TypeReference<Map<String, Object>>() {
        })));

        builder.script(new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, UPDATE_BY_MODEL_PAINLESS_CODE, params));
        BulkByScrollResponse response = builder.get();

我可以肯定的是,只有这个应用程序才能访问Elasticsearch,flink并行性就像在单线程调用update一样吗?为什么我有version_conflict_engine_exception?怎么才能做到一次呢?

EN

回答 1

Stack Overflow用户

发布于 2020-01-17 12:28:56

我认为有两种可能性:

document.

  • Flink's elasticsearch接收器提供了至少一次的保证,这意味着在发生故障时,接收器有时会在恢复期间执行重复写操作。这可能会导致使用过时的版本号.

更新文档的尝试。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59785698

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档