首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Flink错误键组不在KeyGroupRange中

Flink错误键组不在KeyGroupRange中
EN

Stack Overflow用户
提问于 2018-03-06 21:56:21
回答 1查看 1.7K关注 0票数 5

我正在使用RocksDB作为我的状态后端运行一个Flink图。对于图中的一个联接操作符,我得到以下异常。(实际组#的运行当然各不相同)。

java.lang.IllegalArgumentException:关键组45不在KeyGroupRange{startKeyGroup=0,endKeyGroup=42}中。

我的接线员不太好,如下所示

代码语言:javascript
运行
复制
Source1 -----> Map1A ---> KeyBy--->\___ >
        \----> Map1B ---> KeyBy--->-----> Join1AB ---->
                                                \____>
Source2 ----->------------KeyBy---> -----------------> Join2,1AB ---->

在Join2.1AB运算符中引发错误,该操作符将(a) Join1AB的结果与(键控) source2连接起来。

有什么原因吗?下面我有完整的堆栈跟踪,我知道这仍然非常模糊--但是任何正确方向的指针都是非常感谢的。

代码语言:javascript
运行
复制
Caused by: java.lang.IllegalArgumentException: Key group 45 is not in KeyGroupRange{startKeyGroup=0, endKeyGroup=42}.
        at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
        at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateData(RocksDBKeyedStateBackend.java:664)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:521)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:417)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
        at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.IllegalArgumentException: Key group 45 is not in KeyGroupRange{startKeyGroup=0, endKeyGroup=42}.]

编辑:如果我将状态后端更改为文件系统(即FsStateBackend),则得到以下堆栈跟踪。有关键组索引的东西。

代码语言:javascript
运行
复制
java.lang.IllegalArgumentException: Key group index out of range of key group range [43, 86).
    at org.apache.flink.runtime.state.heap.NestedMapsStateTable.setMapForKeyGroup(NestedMapsStateTable.java:104)
    at org.apache.flink.runtime.state.heap.NestedMapsStateTable.putAndGetOld(NestedMapsStateTable.java:218)
    at org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:207)
    at org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:145)
    at org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:72)

<snip user code stack trace>

org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:77)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:242)
    at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:745)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-03-06 23:29:20

问题是我的数据对象(POJO)有一个可变的哈希码。具体来说,哈希代码包含Enum。例如,如果我有一个车流,其中的哈希码是由汽车年和汽车类型(enum)组成的,如下所示。

代码语言:javascript
运行
复制
Car {
   private final CarType carType;
   private final int carYear

   public long hashCode() {
     int result = 17;
     result = 31 * result + carYear;
     result = 31 * result + carType.hasCode();  <---- This is mutable!
   }
}

枚举的hashCode本质上是Object.hashCode() (它依赖于内存地址)。随后,一台计算机(或进程)上的hashCode将与另一台计算机(或进程)上的不一样。这也解释了为什么我只在分布式环境中运行时才遇到这个问题,而不是在本地运行。

为了解决这个问题,我将hashCode()更改为不可变。执行String.hashCode()性能很差,所以我可能需要对其进行优化。但以下对汽车的定义将解决这个问题。

代码语言:javascript
运行
复制
Car {
   private final CarType carType;
   private final int carYear

   public long hashCode() {
     int result = 17;
     result = 31 * result + carYear;
     result = 31 * result + carType.name().hasCode();  <---- This is IMMUTABLE!
   }
}
票数 11
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49140654

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档