Flink中有状态算子和用户函数都是流处理应用中常见组成部分,很多复杂的操作都需要将中间结果存储起来,比如Flink内置的DataStream算子,大多都是有状态的.

窗口函数中ReduceFunction保存中间聚合结果,ProcessFunction需要记住设定的计时器,数据输出算子需要提供准确一次的状态.

那么我们将讲解Flink中有状态的用户自定义函数,讨论性能和健壮性,以及不同类型的状态.

那么首先,需要了解的是如何在函数中使用状态

状态,基本可以分为键值分区的状态和算子状态

对于键值分区中的状态,需要确定对应的Stream是KeyedStream,整个函数的键值的状态都会分布在函数所在算子的所有并行任务上,每个函数的并行实例都会负责,一部分键值并维护对应的状态实例.

键值分区的状态看上去像是一个分布式键值映射(distributed key-value map)

而整个键值分区的类型有很多,类型在Flink中也被称为原语 primitive,支持的原语如下

ValueState[T]保存某个类型为T的值

ListState[T] 保存某个类型为T的元素列表.可以使用add addAll 或者使用get获取列表

MapState[K,V] 保存一组映射,跟Java Map方法类似.

ReducingState[T]和ListState类似,但是具有聚合的能力

AggregatingState[I,O] 和ReducingState行为类似,但是更加通用的AggregateFunction来聚合内部的值.

上述都支持使用State.clear方法来进行清除.

那么我们给出一个在键值分区流上使用ValueState的FlatMapFunction,其会在检测温度变化超过设定阈值之后进行警报

package com.bmw.op.pdp.redshift.system.service.impl.businesskpi;

import lombok.val;

class TemperatureAlertFunction(val threshold:Double)

extends RichFlatMapFunction[SensorReading,(String,Double,Double)]{

//状态引用 对象

private var lastTempState:ValueState[Double]=

override def open(parameters:Conguration):Unit={

//创建状态描述符

val lastTempDescriptor=

new ValueStateDescriptor[Double](“lastTemp”, classOf[Double])

//获得状态引用

lastTempState=getRuntimeContext.getState[Double](lastTempDescriptor)

override def flatMap(

reading:SensorReading,

out:Collector[(String,Double,Double)]):Unit={

//从状 态 中 获 取 上 一 次 的 温度

val lastTemp=lastTempState.value()

// 检查是否需要发出警报

val tempDiff=(reading.temperature-lastTemp).a bs

if(tempDiff>threshold){

// 温度变化超过阀值

out.collect((reading.id,reading.temperature,tempDiff))

}

//更新状态

this.lastTempState.update(reading.temperature)

}

}

为了创建一个状态对象,需要利用RichFunction中RuntimeContext在Flink运行时注册一个StateDescriptor

需要注意,在创建ReducingState和AggregatingState的时候需要传入一个ReduceFunction或者AggregateFunction对象

而且往往状态处理的数据类型可以通过Class或者TypeInfomation来指定,从而分配一个合适的序列化器.

创建的时机往往也是在对应Function的open()方法中初始化,其会在任意处理方法之前调用.我们一般将状态引用对象声明为函数类的普通成员变量.

如果有状态函数正在从某个检查点恢复重启,那么如果状态后端存储了函数相关的数据或者给定名称,Flink都会将二者进行关联,如果没有包含给定的状态,那么就会初始化为空.

而且在键值分区之中,某些特定的函数具有简洁简写的实现,比如ValueState的map或者flatMap,

下面就是flatMapWithState实现的简单逻辑

图片

在flatMapWithState的过程中,会接受一个Tuple2的参数,这个元组包含两个值,第一个值是flatMap的输入记录,第二个字段是一个Option对象,保存了当前的状态

如果没有初始化,那么会得到一个None.

最后需要返回一个Tuple2对象,第一个字段是flatMap的结果,第二个字段是状态的值

上面我们说了键值状态的实现

接下来我们说下如何实现算子状态.算子状态我们前文说过,包含三种算子状态,列表状态,联合列表状态以及广播状态.

对于算子列表状态,需要实现ListCheckpointed接口,这个接口不像ValueState和ListState直接注册就可以,而是需要将算子状态实现为成员变量并通过接口提供的回调函数和状态后端进行交互

比如ListCheckpointed提供的两个方法

图片

snapshotState()会在Flink触发有状态函数生成检查点的时候调用.传入一个检查点编号和一个时间戳,并返回一个算子状态列表.

restoreState()在初始化函数的时候调用,这个状态可能发生在状态启动或者故障恢复的时候,利用这个方法,初始化本函数中的对象.

图片

图片

上面维持一个算子状态来统计超过阈值的温度值数目

上面状态函数中,使用snapshotState()和restoreState()来实现自动化逻辑, snapshotState()方法能够将算子状态分为多个部分, restoreState()会利用一个或者多个部分来组装算子状态.

但是我们之前也讲过,由于状态列表在修改并行度后需要重新分配状态对象,那么就需要每个函数有自己的状态拆分和合并逻辑.

在保存的时候,各个算子将自己的状态返回到列表中,最终组装成为完整状态,在回复的时候,那么会按照规定规则进行分配各自算子.

广播状态算子

如果需要将所有信息发送到函数所有并行实例上,并将其作为可恢复的状态进行维护,

典型的就是有一个需要广播的规则流,和一个普通事件流,函数需要接收两个输入流.

一个简单使用广播流的应用如下

图片

通过调用.broadcast()方法创建一个BroadcastStream,通过传入一个Descriptor对象,来定义广播状态

讲BroadcastStream和一个DataStream或者KeyedStream联结起来,利用connect()方法进行连接.

最后应用一个函数,可以是KeyedBroadcastProcessFunction或者BroadcastProcessFunction

一个简单的KeyedBroadcastProcessFunction实现如下

图片

图片

需要注意,在广播状态算子Function中,两条流中元素的处理是非对称的,对于

ProcessElement传入的只读上下文, processBroadcastElement传入的是可读写上下文.

除此外和其他上下文对象类似,两个上下文对象支持访问事件时间戳,当前水位线,当前处理时间或者副输出.

最后关于算子联合列表状态,可以使用CheckpointedFunction接口

提供了用于维护键值分区状态和算子状态的钩子函数(hook)

内部定义了两个方法,initializeState()和snapshotState(),工作模式类似ListCheckpointed

initializeState()方法会在创建Function的时候被调用,会传入一个FunctionInitializationContext对象,利用其可以访问OperatorStateStore以及KeyedStateStore对象,这两个存储对象能够使用Flink运行时来注册函数状态并返回状态对象

注册状态的时候,需要提供一个范围内唯一的名称

而snapshotState()方法会在生成检查点之前调用,需要获取一个FunctionSnapshotContext作为参数

从FunctionSnapshotContext之中,可以获取检查点编号以及时间戳.snapshotState()方法目的是确保检查点开始之前所有状态已经更新外部

那么我们展示如何使用CheckpointedFunction接口来创建一个函数,这个函数分别利用了键值分区和算子状态,来统计每个键值分区中每个算子内有多少传感器读数超过阈值.

图片 图片

发表评论

邮箱地址不会被公开。 必填项已用*标注