我有一个无界流的flink(v1.13.3)应用程序(使用kafka)。我的一条小溪很忙。而且繁忙值(我可以在UI上看到)随着时间的推移而增加。当我刚启动flink应用程序时:
sum by(task_name) (flink_taskmanager_job_task_busyTimeMsPerSecond{job="Flink", task_name="MyProcessFunction"})返回300-450 ms five++ hours sum by(task_name) (flink_taskmanager_job_task_busyTimeMsPer
我们的使用是,我们希望使用flink流的去复制作业,它从源读取它的数据(kafka主题),并将独特的记录写入hdfs文件接收器。Kafka主题可能有重复的数据,可以使用复合键(adserver_id,unix_timestamp of the record)来识别这些数据。
因此,我决定使用flink键状态流来实现去复制。
val messageStream: DataStream[String] = env.addSource(flinkKafkaConsumer)
messageStream
.map{
record =>
val key = record.
我在Flink上运行了许多作业,后端使用rocksDB,我的一个作业出错了,整晚都在重启,
错误消息如下:
java.lang.IllegalStateException: Could not initialize keyed state backend.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:330)
at org.apache.flink.streaming.api.operators.Abst
当flink作业集群(deployment/ pod )在kubernetes上运行时,我们删除了jobmanager和taskmanager(kubectl delete Pod XXX)。我们发现,在pod运行正常后,从PVC挂载rocksDB和检查点文件路径的pod中缺少该状态。在pod运行后,是否有恢复状态的建议?我仔细检查了代码。我发现检查点未启用。是不是作业无法恢复的根本原因?
环境设置如下
RocksDBStateBackend backend = new RocksDBStateBackend(checkPointDataUri + "/checkpoint",
当我想要更新值状态(queueState.update(Queue))时,捕捉这个异常: org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB
at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
at xxx.xxx.xxx.CleanTimedOutPartialMatches.processElement(CleanTimedO
我在flink中有一个应用程序,它可以消除多个流的重复。它对一个字符串字段执行键操作,并使用值状态对其进行还原。
在RichFilterFunction中使用值状态。
public class DedupeWithState extends RichFilterFunction<Tuple2<String, Message>> {
private ValueState<Boolean> seen;
private final ValueStateDescriptor<Boolean> desc;
public Dedup
我将在processBroadcastElement()函数中执行一些状态管理。 final val actvTagsMapValue = new MapStateDescriptor[String, List[String]]("actvTagsMapValue", classOf[String], classOf[List[String]])
override def processBroadcastElement(...): Unit {
val actvTagMap = getRuntimeContext.getMapState(actvTagsMapVal
我使用的是Flink v.1.4.0。
我已经实现了一个模块,作为我正在开发的包的一部分,它的作用是对流进行重复数据删除。这个模块非常简单:
public class RemoveDuplicateFilter<T> extends RichFlatMapFunction<T, T> {
static final ValueStateDescriptor<Boolean> SEEN_DESCRIPTOR = new ValueStateDescriptor<>("seen", Boolean.class);
private Va
我的要求是基于2个事件(EVT_A和EVT_B独立于订单)生成触发器。这是期待
1. EVT_A arrived. --> No action
2. EVT_B arrived --> Should Trigger
3. EVT_B arrived --> should Trigger since A was received previously (o/p should include A and current B)
4. EVT_A arrived --> should Trigger since B was received previously (o/p