排查过JobManager日志提示心跳超时,现场人员曾经多次尝试调大过超时时间。...从发过来的业务代码对应的FlinkUI界面来看,确实很简单: 综合考虑现场情况:业务作业数据量小、逻辑简单、未引入任何第三方的jar,那么问题直指Flink框架自身了。...Kafka没有业务数据,Flink作业KafkaSourceReader类中的SynchronizedSortedMap仍然存在大量数据,详见如下: 从相关源码来看,每次checkpoint触发时调用snapshotState...方法会使用offsetsToCommit存储当前批次checkpoint期间已经完成读取Kafka的分区及对应offsets。...然后,在每次checkpoint成功时调用notifyCheckpointComplete方法、commitOffsets方法将offsets信息提交到Kafka。
3.8 Kafka消费者开始位置配置 Flink Kafka Consumer允许配置如何确定Kafka分区的起始位置。...对于每个分区,时间戳大于或等于指定时间戳的记录将用作起始位置。如果分区的最新记录早于时间戳,则只会从最新记录中读取分区。在此模式下,Kafka中的已提交偏移将被忽略,不会用作起始位置。...请注意,当作业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每个Kafka分区的起始位置由存储在保存点或检查点中的偏移量确定。...启用此函数后,Flink的检查点将在检查点成功之前等待检查点时的任何动态记录被Kafka确认。这可确保检查点之前的所有记录都已写入Kafka。...如果Flink应用程序崩溃和完成重启之间的时间较长,那么Kafka的事务超时将导致数据丢失(Kafka将自动中止超过超时时间的事务)。考虑到这一点,请根据预期的停机时间适当配置事务超时。
连接失败后,尝试连接Kafka的时间间隔,默认50ms 11.reconnect.backoff.max.ms 尝试连接到Kafka,生产者客户端等待的最大时间,默认1000ms 12.max.block.ms...10.auto.offset.reset 如果分区没有初始偏移量,或者当前偏移量服务器上不存在时,将使用的偏移量设置,earliest从头开始消费,latest从最近的开始消费,none抛出异常 11...消费者客户端一次请求从Kafka拉取消息的最大数据量,默认50MB 13.fetch.max.wait.ms 从Kafka拉取消息时,在不满足fetch.min.bytes条件时,等待的最大时间,...默认30000ms 31.default.api.timeout.ms 设置消费者api超时时间,默认60000ms 32.interceptor.classes 自定义拦截器 33.exclude.internal.topics...费到 HW (High Watermark)处的位置 其他Kafka文章: 微服务同时接入多个Kafka
如果分区没有初始偏移量,或者当前偏移量服务器上不存在时,将使用的偏移量设置,earliest从头开始消费,latest从最近的开始消费,none抛出异常 11.fetch.min.bytes 消费者客户端一次请求从...Kafka拉取消息的最小数据量,如果Kafka返回的数据量小于该值,会一直等待,直到满足这个配置大小,默认1b 12.fetch.max.bytes 消费者客户端一次请求从Kafka拉取消息的最大数据量...,默认50MB 13.fetch.max.wait.ms 从Kafka拉取消息时,在不满足fetch.min.bytes条件时,等待的最大时间,默认500ms 14.metadata.max.age.ms...,-1将使用操作系统的设置 18.client.id 消费者客户端的id 19.reconnect.backoff.ms 连接失败后,尝试连接Kafka的时间间隔,默认50ms 20.reconnect.backoff.max.ms...,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数将抛异常,默认30000ms 31.default.api.timeout.ms 设置消费者api超时时间,默认60000ms 32.interceptor.classes
该异常在 Flink AM 向 YARN NM 申请启动 token 已超时的 Container 时抛出,通常原因是 Flink AM 从 YARN RM 收到这个 Container 很久之后(超过了...参数设置,设置retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做3次尝试: kafkaProducerConfig { "bootstrap.servers...MySQL CDC源等待超时 在扫描表期间,由于没有可恢复的位置,因此无法执行checkpoints。为了不执行检查点,MySQL CDC源将保持检查点等待超时。...作业在扫描 MySQL 全量数据时,checkpoint 超时,出现作业 failover,如下图: 原因:Flink CDC 在 scan 全表数据(我们的实收表有千万级数据)需要小时级的时间(受下游聚合反压影响...尽管 Flink 可以开启 Kafka 分区自动发现机制(在 Configuration 里设置 flink.partition-discovery.interval-millis 值),但分区发现仍然需要一定时间
连接超时/失败 上下游地址、库表是每个 DDL 语句的配置参数必填项。在【语法检查】时,平台并不会检查 DDL 配置参数的正确性,这些检查通常在程序运行时检查。...实际上 Oceanus 平台已经内置了 Flink 相关的 JAR 包,用户在打包时不用将这些 JAR 打进去,只需要在 POM 里面 将scope设置为provided 即可,例如: 在正式运行之前请检查: 类名是否有拼写错误 确定是否将相关的业务代码依赖打进 JAR 包中 基础运维 作业监控 流计算 Oceanus 提供强大的作业监控能力,我们可以通过【监控】项查看作业的各项指标...需尝试增加作业的算子并行度(CU)数和优化内存占用,避免内存泄露。...总结 本文首先对出现的最基础的、用户可以自己解决的常见报错做了一些总结,这些错误常常出现在作业启动之前,所以在作业正式启动之前,用户需要自己检查好这些类型的错误,保证作业能够顺利的启动。
如果消息发送到缓存区的速度比发送到broker的速度快,那么生产者会被阻塞(根据max.block.ms配置的时间,默认为60000ms=1分钟,在0.9.0.0版本之前使用block.on.buffer.full..., 比如修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current 策略就可以帮你实现这个功能 Specified-Offset...如果 follower 提供读服务, 有可能发生 consumer 首先从 follower1 拉取消息, 然后从 follower2 拉取消息, 可能会看到第一次消费的消息在第二次消费时不见了(同步延迟...中获取 Remote Replica: 每个 replica 都保存一组 HW 和 LEO 值, 在 Leader 上还保存了其他 follower 的 LEO 值(主要是为了确定整个分区的 HW,...当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。
参数设置,设置retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做3次尝试: kafkaProducerConfig {...在处理包含无限多键的数据时,要考虑到 keyed 状态保留策略(通过 TTL 定时器来在给定的时间之后清理未使用的数据)是很重要的。...如果你的 keyed 状态包含在某个 Flink 的默认窗口中,则将是安全的:即使未使用 TTL,在处理窗口的元素时也会注册一个清除计时器,该计时器将调用 clearAllState 函数,并删除与该窗口关联的状态及其元数据...检查flink程序有没有数据倾斜,可以通过 flink 的 ui 界面查看每个分区子节点处理的数据量。 13....However, migration for MapState currently isn't supported 在1.9之前的Flink版本中,如果我们使用RocksDB状态后端,并且更改了自用MapState
2.2 起始位置配置 Flink Kafka Consumer 可以配置如何确定 Kafka 分区的起始位置。...当作业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每个 Kafka 分区的起始位置由存储在保存点或检查点中的偏移量确定。...如果作业失败,Flink 会从最新检查点的状态恢复流处理程序,并从保存在检查点中的偏移量重新开始消费来自 Kafka 的记录。 因此,检查点间隔定义了程序在发生故障时最多可以回退多少。...当使用 Flink 1.3.x 之前的版本,消费者从保存点恢复时,无法在恢复的运行启用分区发现。如果要启用,恢复将失败并抛出异常。...每当我们使用事务写入 Kafka 时,请不要忘记为所有使用 Kafka 记录的应用程序设置所需的隔离等级(read_committed 或 read_uncommitted,后者为默认值)。
Flink 的 kafka consumer 集成了 checkpoint 机制以提供精确一次的处理语义 在具体的实现过程中,Flink 不依赖于 kafka 内置的消费组位移管理,而是在内部自行记录和维护...group offset 开始读,即从消费者组(group.id)提交到 kafka broker 上的位移开始读取分区数据(对于老版本而言,位移是提交到 zookeeper 上)。...在恢复时,每个 kafka 分区的起始位移都是由保存在 savepoint 或者 checkpoint 中的位移来决定的 DeserializationSchema 反序列化 如何将从 kafka 中获取的字节流转换为...n (用 Sn 表示),在 apache kafka 中,这个变量表示某个分区最后一次消费的偏移量。...只有当 operator 从最后一个流中提取到 barrier n 时,operator 才会继续发射出所有等待向后发送的数据,然后发送 snapshot n 所属的 barrier。
我们比较选型的消息系统有三个:Kafka、RocketMQ 和 Pulsar。测试之前,我们通过网上的公开数据,对三者的性能和功能进行了简单的对比,表 2 为对比结果。...利用各级缓存机制实现低延迟投递:生产者发送消息时,将消息写入 broker 缓存中;实时消费时(追尾读),首先从 broker 缓存中读取数据,避免从持久层 bookie 中读取,从而降低投递延迟。...我们将获取到的 table schema 发送并存储在指定的 Schema topic 中。...如果在异步超时重发消息时,出现消息重复,可以通过开启自动去重功能进行处理;其它情况下出现的消息发送超时,需要单独处理,我们将这些消息存储在异常 topic 中,后续通过对账程序从源库直接获取终态数据。...从目前的使用情况来看,Pulsar Flink Connector 的性能和稳定性均表现良好。 ? 图 17.
akka.ask.timeout Job Manager和Task Manager通信连接的超时时间。如果网络拥挤经常出现超时错误,可以增大该配置值。...job manager的重启尝试次数。...high-availability.storageDir: hdfs://nameservice/flink/ha/ job manager元数据在文件系统储存的位置,zookeeper仅保存了指向该目录的指针...当数据在缓存中的时间超过linger.ms时,无论缓存中数据是否达到批量大小,都会被强制发送出去。 ack 数据源是否需要kafka得到确认。...Kafka topic分区数和Flink并行度的关系 Flink kafka source的并行度需要和kafka topic的分区数一致。最大化利用kafka多分区topic的并行读取能力。
本文将以Flink DataStream API为核心,带你从0到1实现“从Kafka消费数据并输出到日志”的完整流程,掌握Flink Source的核心用法。...(Exactly-Once)消费语义:通过Kafka偏移量(Offset)管理和Flink检查点(Checkpoint)机制保证数据一致性;动态分区发现:自动感知Kafka主题的分区变化(如新增分区),...无需重启任务;灵活的消费模式:支持从指定偏移量、时间戳或最新位置开始消费。...、拉取消息;反序列化器(Deserializer):将Kafka消息的字节数组(byte[])转换为Flink可处理的数据类型(如String、POJO、Row等);偏移量管理:记录已消费的Kafka消息位置...连接超时问题问题现象:程序启动后报 org.apache.kafka.common.errors.TimeoutException解决方案:检查Kafka服务是否正常运行:ps -ef | grep kafka
我们比较选型的消息系统有三个:Kafka、RocketMQ 和 Pulsar。测试之前,我们通过网上的公开数据,对三者的性能和功能进行了简单的对比,表 2 为对比结果。...利用各级缓存机制实现低延迟投递:生产者发送消息时,将消息写入 broker 缓存中;实时消费时(追尾读),首先从 broker 缓存中读取数据,避免从持久层 bookie 中读取,从而降低投递延迟。...我们将获取到的 table schema 发送并存储在指定的 Schema topic 中。...如果在异步超时重发消息时,出现消息重复,可以通过开启自动去重功能进行处理;其它情况下出现的消息发送超时,需要单独处理,我们将这些消息存储在异常 topic 中,后续通过对账程序从源库直接获取终态数据。...从目前的使用情况来看,Pulsar Flink Connector 的性能和稳定性均表现良好。 图 17.
参数设置,设置retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做3次尝试: kafkaProducerConfig {...在处理包含无限多键的数据时,要考虑到 keyed 状态保留策略(通过 TTL 定时器来在给定的时间之后清理未使用的数据)是很重要的。...检查flink程序有没有数据倾斜,可以通过 flink 的 ui 界面查看每个分区子节点处理的数据量。...>' are missing 在Flink内使用Java Lambda表达式时,由于类型擦除造成的副作用,注意调用returns()方法指定被擦除的类型。...However, migration for MapState currently isn't supported 在1.9之前的Flink版本中,如果我们使用RocksDB状态后端,并且更改了自用MapState
参数设置,设置retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做3次尝试: kafkaProducerConfig {...在处理包含无限多键的数据时,要考虑到keyed状态保留策略(通过TTL定时器来在给定的时间之后清理未使用的数据)是很重要的。...如果你的keyed状态包含在某个Flink的默认窗口中,则将是安全的:即使未使用TTL,在处理窗口的元素时也会注册一个清除计时器,该计时器将调用clearAllState函数,并删除与该窗口关联的状态及其元数据...检查flink程序有没有数据倾斜,可以通过flink的ui界面查看每个分区子节点处理的数据量。...>' are missing 在Flink内使用Java Lambda表达式时,由于类型擦除造成的副作用,注意调用returns()方法指定被擦除的类型。
在Flink中,一个常见的实际用例是维护Kafka源中Kafka分区的当前偏移量。...即使Kafka源实际上总是一个分区偏移量的列表,之前返回的状态对象对于Flink来说是一个黑盒子,因此不能被重新分配。...从key到operator的映射是通过对key进行哈希分区确定地计算出来的。...虽然这种方法可以从顺序读模式中受益,但每个子任务都可能读取大量不相关的状态数据,分布式文件系统接收大量并行读请求。 另一种方法是建立一个索引,跟踪检查点中每个键的状态位置。...这是如何运作的呢?key-groups的数量必须在作业启动之前确定,并且(目前)在作业启动之后不能更改。