Spark + Kafka

我们这一次就说下Kafka如何配合Structured Streaming

在实际生产中,Kafka和Spark这对组合还是比较常见的,这里我们就拿一个简单的实例看下如何将两者进行集成

在这里,我们进行资源利用率计算,我们搜集各个服务器上的资源利用率,将其写入Kafka,然后使用Spark的Structured Streaming进行消费,分析聚合后将其写入Kafka,或者打印到Console中

图片

在这里,我们直接先跳过介绍Kafka,直接看Kafka和Spark如何进行集成

我们假设已经存在了两个topic,分别对应着cpu和内存的检测,分别叫做cpu-monitor和mem-monitor两个topic

我们需要在Structured Streaming 中的readSteam上配置相关属性

import org.apache.spark.sql.DataFrame 2

// 依然是依赖readStream API

val dfCPU:DataFrame = spark.readStream

// format要明确指定Kafka

.format(“kafka”)

// 指定Kafka集群Broker地址,多个Broker用逗号隔开

.option(“kafka.bootstrap.servers”, “hostname1:9092,hostname2:9092,hostname3:9092”

// 订阅相关的Topic,这里以cpu-monitor为例

.option(“subscribe”, “cpu-monitor”)

.load()

在format中我们指定了输入源为kafka,然后配置kafka.bootstrap.servers的相关属性

需要配置多个Broker服务器地址,多个Broker之间需要以逗号进行分割

之后利用subscribe指定消费的topic名,明确Structured Stream要消费的Topic

之后我们就将其读取为了DataFrame,我们就可以利用Data Frame相关的处理算子进行处理,而且配合流处理的Window机制,可以以窗口为粒度进行统计

这里我们拿控制台作为输出进行下测试,看下输出格式

  import org.apache.spark.sql.streaming.{OutputMode, Trigger}

import scala.concurrent.duration._

dfCPU.writeStream

.outputMode(“Complete”)

// 以Console为Sink

.format(“console”)

// 每10秒钟,触发一次Micro-batch

.trigger(Trigger.ProcessingTime(10.seconds))

.start()

.awaitTermination()

这里我们获取到了Kafka中信息本体

图片

这样我们就可以对数据进行处理了

import org.apache.spark.sql.types.StringType 2

dfCPU

.withColumn(“clientName”, $”key”.cast(StringType))

.withColumn(“cpuUsage”, $”value”.cast(StringType))

// 按照服务器做分组

.groupBy($”clientName”)

// 求取均值

.agg(avg($”cpuUsage”).cast(StringType).alias(“avgCPUUsage”))

.writeStream

.outputMode(“Complete”)

// 以Console为Sink

.format(“console”)

// 每10秒触发一次Micro-batch

.trigger(Trigger.ProcessingTime(10.seconds))

.start()

.awaitTermination()

首先是代码中,我们将key和value设置为列String ,然后按照ClientName进行聚合

并且我们设置了每十秒触发一次,输出到控制台

或者我们修改下outputMode, 指定kafka为输出,代码如下

dfCPU

.withColumn(“key”, $”key”.cast(StringType))

.withColumn(“value”, $”value”.cast(StringType))

.groupBy($”key”)

.agg(avg($”value”).cast(StringType).alias(“value”))

.writeStream7 .outputMode(“Complete”) 8 // 指定Sink为Kafka

.format(“kafka”)

// 设置Kafka集群信息,本例中只有localhost一个Kafka Broker

.option(“kafka.bootstrap.servers”, “localhost:9092”)

// 指定待写入的Kafka Topic,需事先创建好Topic:cpu-monitor-agg-result

.option(“topic”, “cpu-monitor-agg-result”)

// 指定WAL Checkpoint目录地址

.option(“checkpointLocation”, “/tmp/checkpoint”)

.trigger(Trigger.ProcessingTime(10.seconds))

.start()

.awaitTermination()

我们通过format指定输出为Kafka,设置Kafka的集群环境 并且通过topic设置了输出topic

并且设置了WAL Checkpoint路径

这样我们就将聚合后的信息再一次的输出到了Kafka中

还需要注意两点,也就是写入写出的topic需要不一样,避免无限循环

一个是写出的Schema中必须要用到 key 和 value两个固定的字段

那么我们总结一下,我们手把手的实现了kafka和sprak的集成,完成了从Kafka读取数据,然后写回到Kafka的全过程

发表评论

邮箱地址不会被公开。 必填项已用*标注