首先是调整检查点的相关配置,比如我们可以调整检查点的生成间隔,

首先是检查点生成的间隔配置

这一点可以用StreamExecutionEnvironment中获取进行配置

StreamExecutionEnvironment.enableCheckpoint(10000)

其次是从StreamExecutionEnvironment中获取CheckpointConfig来进行额外配置

Env.getCheckpointConfig

通过这个Conifg对象,可以设置不同的内部状态语义

Config.setCheckpointingMode(ChekcpointingMode.AT_LEAST_ONCE)

取决于自身的特征,状态大小,状态后端以及配置,生成一次检查点可能需要几分钟

而如果配置的时间小于生成检查点时间,遇到了冲突,则会进行延后检查点

为了避免这种问题,Flink提供了检查点之间的最小暂停时间,如果配置了30秒的最小暂停时间,那么在检查点生成完成后30秒不会生成新的检查点.

图片

或者就是遇到了检查点生成时间长,但是资源消耗不大,这样可以配置检查点的最大并发数

图片

或者认为检查点在失败的时候,会因为失败导致重启,可以禁用这个行为,在检查点出错后继续进行

图片

配置应用停止后,检查点的操作

默认应用手动停止后,检查点就会被清理,但是可以启用一个外化检查点的功能,在应用停止后保留检查点,也是通过配置项来进行启用

Config.enableExternalizedCheckpints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

可选的配置项有

.RETAIN_ON_CANCELLATION 完全失败和显式取消时保留检查点

.DELETE_ON_CANCELLATION 应用失败才会保留检查点,应用被显式取消,检查点会删除

配置状态后端

我们之前说过可以显式执行状态后端

默认的状态后端是MemoryStateBackend,将所有状态存在内存中,这样是易失且容易受到JVM约束的

对于配置状态后端

图片

通过setStateBackend来配置

比如我们创建一个MemoryStateBackend

图片

或者创建一个FsStateBackend,这需要传入一个检查点存储位置,并提供是否开启异步检查点

图片

RocksDBStateBackend也是需要一个检查点存储位置的参数

RocksDBStateBackend生成检查点的过程总是异步的.

图片

故障恢复时的配置

对于一个拥有检查点的应用发生故障的时候,会经历一系列的重启步骤,包括启动任务,恢复状态,继续处理

刚刚重启的应用会处于一个进度追赶的状态.这个状态会导致新来的数据一直在积累

为了追的上数据流得进度,数据处理的速度要高于数据新到来的速度,不然不可能达到正常处理新来数据的状态.

这就意味着,在正常处理的期间,不应该将资源设置的过小,过于精准,而是需要预留一些,方便在故障恢复后的追赶.

而在恢复过程中,除了资源因素,还有两个值得讨论的,重启策略和本地回复

对于重启策略,Flink提供了三种重启策略

Fixed-delay 会以固定间隔来将应用重启固定次数

failure-delay 设定一个故障率,为某个时间间隔内的最大故障次数

no-master 不会重启,一直失败

默认的配置为每10秒进行一次重启,上限约为无限.

配置方式如下

Env = StreamExecutionEnvironment.getExectionEnvironment

Env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,Time.of(30,TimeUnit.SECONDS)))

其次是本地恢复机制

因为大多数的状态后端,都会将检查点存到远程文件系统中,不但可以确保状存储的持久化,也方便其他节点取得这个状态,但在恢复期间,由于存储在了远程节点,所以可能读取速度比较慢.

所以Flink给出了一种新特性,本地回复,就是在写入检查点的时候,同时写入到本地节点

这样在重启的时候,就尽量选择之前的机器,并且尽可能的读取本地存储

本地恢复机制的出现,虽然带来了一定的开销,但是增加的恢复的速度

那么开启使用本地恢复,只需要在flink-conf.yaml文件中进行配置即可,可选的配置项有

state.backend.local-recovery: 用于启用和禁用本地恢复

taskmanager.state.local.root-dirs 这个参数用于指定本地存储的本地路径.

最后是日志相关的配置

默认情况下Flink使用SLF4J作为日志抽象,Log4J作为日志框架.

使用起来可以参考下面的代码

图片

如果需要修改Log4J的记录器属性,修改conf目录下的log4j.properties文件

比如下面修改日志级别

Log4j.rootLogger=WARN

如果想要在启动的时候覆盖某些配置

可以通过 -Dlog4j.configuration=paramter的方式传递给JVM

发表评论

邮箱地址不会被公开。 必填项已用*标注