我们来讲解一些使用有状态算子的额外知识.
1.Flink利用检查点机制,来进行数据的一致性保存,得益于检查点机制,Flink性能并不会因为要保存一致性数据而拖慢.
如果希望能够在算子中得到感知点是否已经创建完成,可以通过实现CheckpointListener接口,来注册Listener,在所有的算子任务都完成了状态存储之后,感知到检查点创建成功.
Listener其中有着一个notifyCheckpointComplete方法,来供JobManager在检查点完成后调用.
不过需要注意,Flink并不保证对每个完成的检查点都调用notifyCheckpointComplete()方法,所以需要考虑错过通知的可能性.
2.如何开启故障恢复,连续运行的流式应用需要具备从故障中恢复的能力,而Flink提供的故障恢复能力是通过提供检查点功能,那么Flink提供了固定间隔创建检查点的能力,通过在StreamExecutionEnvironment中启用周期性检查点机制.
Val env = StreamExecutionEnvironment.getExecutionEnvironment
Env.enableCheckpointing(10000L)
不过需要注意较短的间隔会为常规处理带来较大的开销,但是恢复的时候重新处理数据量小,恢复速度会更快.
3.确保与状态应用的可维护性
为了可以对应用进行维护,比如修复bug,添加,删除调整功能,为了方便进行重启,或者迁移状态,
需要设计算子唯一标识和最大并行度,下面我们介绍如何设置
应该为了每一个算子指定一个唯一标识,这个标识会作为元数据和算子的实际状态一起写入保存点,当应用从保存点启动的时候,会利用唯一标示来讲保存点中状态进行映射.
对于唯一标识,可以利用uid()方法为应用中的每个算子来分配标识
其次是制定一个最大并行度,我们可以通过StreamExecutionEnvironment为应用中算子设置最大并行度或者利用算子的setMaxParallelism()来为每个算子单独设置.
那么如何重启一个有状态应用呢?
如果我们有了保存点,并且确保每一个算子都有一个唯一标识.在启动的时候,会通过算子标识和状态名称从保存点中查询对应的状态来进行初始化.
那么在重启的时候,可能面对不同的场景,比如
添加有状态或者无状态的算子
从应用中移除某个状态
通过改变状态源于来修改某个算子的状态原语
如果添加某几个有状态或者无状态的算子
这一点还好,可以正常启动,只不过从保存点启动的时候,这些状态会被初始化为空
从应用中移除状态
除了向应用中添加状态,还可以删除一些状态,这些状态可以是函数中的某个状态,这样当新版本的应用从一个旧版本中启动的时候,保存点中部分状态将无法映射,如果算子中唯一标识或者状态名称发生了改变,也会出现这种问题,默认如果无法全部将保存点中的状态恢复的应用,是无法在Flink中启动的,但是可以通过禁用相关检查来启动.
如果是修改算子的状态
如果是修改状态的数据类型,比如将ValueState[Int]改为ValueState[Double]
虽然可以,但数据发生改变的时候,序列化器可能发生改变,比如使用DoubleSerializer来反序列化一个StringSerializer序列化的二进制数据就会失败,那么修改数据类型仅限某些特定情况.
而修改数据原语,比如将VlaueState[List[Stirng]]转换为ListState[String],现在还不支持.