流和流 流与批

我们上节说了Structured Streaming建立在Spark SQL之上,可以使用Spark SQL的各种各样的算子,那么我们今天聚焦于其中的Join算子

关注Join的主要原因是在流处理中,因为存在着数据流失,时间窗口等问题,所以关联形式有所改变

首先可以说,在流计算中,数据关联的基本形式还是Inner Join,Left Join,Right Join,Semi Join,Anti Join 按照实现方式还可以是Nested Loop Join,Sort Merge Join,Hash Join

数据分发状态可以是Shuffle Join和Broadcast Join

但是从数据关联的角度来说,可以分为流批关联,双流关联

流批关联,其实就是一个数据流和另一个离线批数据关联的

而双流关联,就是参与关联的两个表都是来自不同的数据流

图片

我们接下来就针对两者的不同使用和特性进行讲解

我们首先介绍流批关联

流批关联,我们可以这么理解,假设我们有一个数据流,比如用户的行为数据,这时候我们想要将用户元数据信息和数据流进行一个关联,用户的元数据信息就是常见的批数据获取方式的

这里我们直接进行演示,我们假设用户的信息存储在文件系统中

同样数据流的数据来源也设置为文件系统,我们使用相关的读取数据流的函数,设置监听某一目录,这样当有新的文件到来的时候,就会进入Spark流处理中,除此之外我们将用户的元数据放入到另一个目录中,方便查看

对于用户的元数据信息,包含以下几个字段

图片

而用户的行为数据,则包含以下几个字段

图片

其中event是互动类型,还有视频编号,和事件时间

然后我们分别创建两个DataFrame

val staticDF: DataFrame = spark.read

.format(“csv”)

.option(“header”, true)

.load(s”${rootPath}/userProfile/userProfile.csv”)

我们首先创建出了一个离线数据集

val actionSchema = new StructType()

.add(“userId”, “integer”)

.add(“videoId”, “integer”)

.add(“event”, “string”)

.add(“eventTime”, “timestamp”)

var streamingDF: DataFrame = spark.readStream

// 指定文件格式

.format(“csv”)

.option(“header”, true)

// 指定监听目录

.option(“path”, s”${rootPath}/interactions”)

// 指定数据Schema

.schema(actionSchema)

.load

首先我们声明了一个schema供使用

然后创建了数据流,其中指定监听的目录

最后便是相关的join语句

streamingDF = streamingDF

.withWatermark(“eventTime”, “30 minutes”)

// 按照时间窗口、userId与互动类型event做分组

.groupBy(window(col(“eventTime”), “1 hours”), col(“userId”), col(“event”))

// 记录不同时间窗口,用户不同类型互动的计数

.count

/**

流批关联,对应流程图中的步骤5

可以看到,与普通的两个DataFrame之间的关联,看上去没有任何差别

*/

val jointDF: DataFrame = streamingDF.join(staticDF, streamingDF(“userId”) === staticDF(“userId”)

在流批关联之中,用法和DataFrame中的数据关联没有任何区别,如果流处理采用的Batch mode,那么每一个子批次都会触发一次扫描,这时候,我们只需要将离线数据集转变为广播数据集就可以了

这时候我们使用Console进行输出

jointDF.writeStream

// 指定Sink为终端(Console)

.format(“console”)

// 指定输出选项

.option(“truncate”, false) 6 // 指定输出模式

.outputMode(“update”)

// 启动流处理应用

.start()

// 等待中断指令

.awaitTermination()

除此之外就是双流关联,就比如用户创建了一条动态,这就有一条动态创建流,除此外其他用户可能对动态进行点赞,评论,这时候会有另一条流代表着动态互动流

我们这时候希望两条流可以进行合并,这就是双流合并

不过在流数据中,不能无限制的关联两条流,也就好比我们创建了一条动态,而这个动态的相关互动事件可能好几天后还有新事件,所以我们需要在关联条件中设置事件时间,以避免状态数据缓存过多的问题

我们直接贴上相关代码来查看

import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.types.StructType

// 保存staging、interactions、userProfile等文件夹的根目录

val rootPath: String = _

// 定义视频流Schema

val postSchema = new StructType().add(“id”, “integer”).add(“name”, “string”).add(

// 监听videoPosting目录,以实时数据流的方式,加载新加入的文件

val postStream: DataFrame = spark.readStream.format(“csv”).option(“header”, true)

// 定义Watermark,设置Late data容忍度

val postStreamWithWatermark = postStream.withWatermark(“postTime”, “5 minutes”)

// 定义互动流Schema

val actionSchema = new StructType().add(“userId”, “integer”).add(“videoId”, “inte

// 监听interactions目录,以实时数据流的方式,加载新加入的文件

val actionStream: DataFrame = spark.readStream.format(“csv”).option(“header”, tru

// 定义Watermark,设置Late data容忍度

val actionStreamWithWatermark = actionStream.withWatermark(“eventTime”, “1 hours”

// 双流关联

val jointDF: DataFrame = actionStreamWithWatermark

.join(postStreamWithWatermark,

expr(“””

// 设置Join Keys

videoId = id AND

// 约束Event time

eventTime >= postTime AND

eventTime <= postTime + interval 1 hour “””))

我们首先创建了两个数据流,分别设置了水位线,最后设置了关联代码

我们不仅设置了关联主键,还对两个表的事件时间进行了约束,声明只关心一小时内的数据

那么我们总结一下

本章我们说了两个不同的关联模式,分别是流批关联和双流关联

流批关联和普通的DataFrame关联类似,直接用join完成

而在双流关联中,我们一般设置了Watermark和关联条件的时间适用范围,双保险来避免出现系统性能问题

发表评论

邮箱地址不会被公开。 必填项已用*标注