我有一个Flink作业,其中我正在读取文件夹中的文件,并将其转储到数据库中。每天都会有新文件进入该文件夹。
我已经启用了检查点,以便如果由于任何原因Flink作业停止并且我需要重新启动,Flink作业不应该读取已经读取的文件。
我在代码中添加了以下代码行,但当我重新启动作业时,Flink作业再次读取所有文件。
env.setStateBackend(new FsStateBackend("file:///C://Users//folder"));
env.enableCheckpointing(10L);发布于 2019-01-23 19:09:00
检查点是一种在应用程序执行期间从故障中恢复的机制,而不是恢复显式取消的应用程序。
如果您有一个正在运行的应用程序,并且执行失败(无论是什么原因),Flink将尝试通过重新启动它并从上一个检查点初始化操作符的状态来恢复该应用程序。如果恢复失败(例如,因为没有足够的处理槽可用),则认为作业失败。
如果您手动取消应用程序并重新启动它,Flink将不会成为初始化运算符状态的检查点。事实上,当您取消应用程序时,Flink将(默认情况下)删除所有检查点。
您正在寻找的概念是保存点。保存点与检查点非常相似,但是由用户手动触发,并且不会在应用程序显式取消时自动删除。在启动应用程序时,您可以从保存点启动它,这意味着操作员状态是从保存点初始化的。
还有不同的重启策略可用于配置Flink尝试重启失败应用程序的频率和间隔。
发布于 2019-01-24 05:22:46
@fabian-hueske涵盖了你的“计划”重启的所有方面
您应该计划取消具有保存点的作业
flink cancel --withSavepoint ${SAVEPOINT_DIR} ${JOBID}使用上一步中的保存点重新启动新作业。
flink run -s ${SAVE_POINT} -p ${PARALLELISM} -d ${JOB_JAR} ${JOB_ARGS}https://stackoverflow.com/questions/54324827
复制相似问题