首先是调整检查点的相关配置,比如我们可以调整检查点的生成间隔,
首先是检查点生成的间隔配置
这一点可以用StreamExecutionEnvironment中获取进行配置
StreamExecutionEnvironment.enableCheckpoint(10000)
其次是从StreamExecutionEnvironment中获取CheckpointConfig来进行额外配置
Env.getCheckpointConfig
通过这个Conifg对象,可以设置不同的内部状态语义
Config.setCheckpointingMode(ChekcpointingMode.AT_LEAST_ONCE)
取决于自身的特征,状态大小,状态后端以及配置,生成一次检查点可能需要几分钟
而如果配置的时间小于生成检查点时间,遇到了冲突,则会进行延后检查点
为了避免这种问题,Flink提供了检查点之间的最小暂停时间,如果配置了30秒的最小暂停时间,那么在检查点生成完成后30秒不会生成新的检查点.
或者就是遇到了检查点生成时间长,但是资源消耗不大,这样可以配置检查点的最大并发数
或者认为检查点在失败的时候,会因为失败导致重启,可以禁用这个行为,在检查点出错后继续进行
配置应用停止后,检查点的操作
默认应用手动停止后,检查点就会被清理,但是可以启用一个外化检查点的功能,在应用停止后保留检查点,也是通过配置项来进行启用
Config.enableExternalizedCheckpints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
可选的配置项有
.RETAIN_ON_CANCELLATION 完全失败和显式取消时保留检查点
.DELETE_ON_CANCELLATION 应用失败才会保留检查点,应用被显式取消,检查点会删除
配置状态后端
我们之前说过可以显式执行状态后端
默认的状态后端是MemoryStateBackend,将所有状态存在内存中,这样是易失且容易受到JVM约束的
对于配置状态后端
通过setStateBackend来配置
比如我们创建一个MemoryStateBackend
或者创建一个FsStateBackend,这需要传入一个检查点存储位置,并提供是否开启异步检查点
RocksDBStateBackend也是需要一个检查点存储位置的参数
RocksDBStateBackend生成检查点的过程总是异步的.
故障恢复时的配置
对于一个拥有检查点的应用发生故障的时候,会经历一系列的重启步骤,包括启动任务,恢复状态,继续处理
刚刚重启的应用会处于一个进度追赶的状态.这个状态会导致新来的数据一直在积累
为了追的上数据流得进度,数据处理的速度要高于数据新到来的速度,不然不可能达到正常处理新来数据的状态.
这就意味着,在正常处理的期间,不应该将资源设置的过小,过于精准,而是需要预留一些,方便在故障恢复后的追赶.
而在恢复过程中,除了资源因素,还有两个值得讨论的,重启策略和本地回复
对于重启策略,Flink提供了三种重启策略
Fixed-delay 会以固定间隔来将应用重启固定次数
failure-delay 设定一个故障率,为某个时间间隔内的最大故障次数
no-master 不会重启,一直失败
默认的配置为每10秒进行一次重启,上限约为无限.
配置方式如下
Env = StreamExecutionEnvironment.getExectionEnvironment
Env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,Time.of(30,TimeUnit.SECONDS)))
其次是本地恢复机制
因为大多数的状态后端,都会将检查点存到远程文件系统中,不但可以确保状存储的持久化,也方便其他节点取得这个状态,但在恢复期间,由于存储在了远程节点,所以可能读取速度比较慢.
所以Flink给出了一种新特性,本地回复,就是在写入检查点的时候,同时写入到本地节点
这样在重启的时候,就尽量选择之前的机器,并且尽可能的读取本地存储
本地恢复机制的出现,虽然带来了一定的开销,但是增加的恢复的速度
那么开启使用本地恢复,只需要在flink-conf.yaml文件中进行配置即可,可选的配置项有
state.backend.local-recovery: 用于启用和禁用本地恢复
taskmanager.state.local.root-dirs 这个参数用于指定本地存储的本地路径.
最后是日志相关的配置
默认情况下Flink使用SLF4J作为日志抽象,Log4J作为日志框架.
使用起来可以参考下面的代码
如果需要修改Log4J的记录器属性,修改conf目录下的log4j.properties文件
比如下面修改日志级别
Log4j.rootLogger=WARN
如果想要在启动的时候覆盖某些配置
可以通过 -Dlog4j.configuration=paramter的方式传递给JVM