Structured Streaming中的处理模式

我们之前拿Word Count为例子,看了Structured Streaming中的三要素,就是Source,处理算子以及Sink

这一章,我们主要讲述流处理中的处理模式

不过在此之前,我们需要明白Structured Streaming中的Trigger机制,也就是如何,何时的去触发数据在引擎中的计算

总结一下,其支持4种Trigger,如下所示

图片

我们可以从Trigger中看出,流处理中支持的计算模型主要有两种,分别是Batch mode和Continuous mode,对应的是流处理如何处理到来的数据

首先是Batch mode

对于Batch mode,将Spark中连续的数据流,切分为离散的数据流,也就是将大份数据集分为了小份数据集

然后每一份Mirco-batch都会触发一个Spark Job,这个Job就是一个普通的任务,不过会交给Spark SQL和Spark Core进行优化执行

在这种模型下,不同的Trigger,本质上就是如何划分不同的Micro-batch

比如我们想要以五秒作为间隔,进行切割,就可以使用Trigger.ProcessingTime(“5 seconds”),表示每隔5秒切割一个Micro-batch

而Continuous mode

Continuous mode并不切割数据流,而是以事件/消息为粒度,以连续的方式进行处理,可以是一个单词,一个画面帧,其处理的方式是使用一个常驻的任务,Long running job 来处理数据流中的每一条消息

图片

在这种模式下,Continuous mode的优势体现在了容错机制的方向上

虽然吞吐量比不上Batch mode,但是在容错机制上,Continuous mode支持的更广

在流处理的概念中,对于数据一致性分为了三个水平

At most once: 最多一次

At least once: 最少一次

Exactly once: 正好一次

对于Bacth mode,可以支持到At least once,这一点的实现是依赖于Checkpoint机制,在实际处理数据流的Micro-batch之前,Checkpoint会将所有元数据信息存到指定的路径中

这样当出现处理故障,就可以读取这些元数据从而重复执行Micro-batch

对于这样的一个Checkpoint目录设置,我们只需要设置writeStream API的option配置checkpointLocation

比如

df.writeStream

// 指定Sink为终端(Console)

.format(“console”)

// 指定输出选项

.option(“truncate”, false)

// 指定Checkpoint存储地址

.option(“checkpointLocation”, “path/to/HDFS”)

// 指定输出模式

.outputMode(“complete”)

//.outputMode(“update”)

// 启动流处理应用

.start()

// 等待中断指令

.awaitTermination()

这样我们就启动了Checkpoint机制,而在实际运行中,我们可以在Checkpoint存储目录下看到不同的子目录,比如offsets,sources,commit以及State

图片

这样在处理之前将元数据信息通过Checkpoint保存到日志文件中,称为Write Ahead Log

通过这样,我们增加了延迟,但是由于每一个子batch都会触发一个Spark任务,所以我们的吞吐量并不低

其次是Continuous mode

其利用了Epoch Marker进行容错的实现,所谓的Epoch Marker,可以理解为游标,对标着Flink中的WaterMarker,Spark会在源头不断的发出Epoch Marker,然后在Sink端接收到的时候,我们将Epoch中加载的数据写入日志,表示消费并处理过了

那么消息从Source产生之后,就会被Structured Steaming消费,等待完成之后,再将log写入到日志之中

图片

那么总结一下,我们这一章,我们学些了Structured Streaming中两种不同的计算模型,Batch mode和Continuous mode,都有着各自的特点,

Batch mode会将数据切割为一个个的Micro-batch

Continuous mode则是运行一个Long running job,来不断的消费Source的消息

而且在容错方面,两者一个在运行前保存运行元数据日志

一个在处理后利用Epoch Marker处理元数据日志

发表评论

邮箱地址不会被公开。 必填项已用*标注