接下来我们说一下如何Flink整体流程相关的代码

首先是最简单,自己从内存中写入数据

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 1.从集合中读取数据

final DataStreamSource<Integer> numberDS =

streamEnv.fromElements(1, 2, 3, 4, 5, 6);

这样就是直接获取了生成了一个DataSource

其次是文件读取数据,这个我们之前也有使用过

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 1.从集合中读取数据

DataStreamSource<String> fileDS = env.readTextFile(“input/sensor-data.log”);

而且Flink支持读取文件夹

在输入文件路径的时候,也是支持绝对路径和相对路径的

再其次是从Flink的好伙伴,Kafka中读取数据

图片

需要添加相关的依赖

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

读取的代码如下

Properties properties = new Properties();

properties.setProperty(“bootstrap.servers”, “linux1:9092”);

properties.setProperty(“group.id”, “consumer-group”);

properties.setProperty(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);

properties.setProperty(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);

properties.setProperty(“auto.offset.reset”, “latest”);

DataStreamSource<String> kafkaDS = env.addSource(

new FlinkKafkaConsumer<String>(

“flink-test”,

new SimpleStringSchema(),

properties)

);

仍然需要指定topic,gorupid等一系列参数,进行相关的使用

在最后,需要将数据源添加到环境中,也就是env.addSource

然后如果希望自己实现数据源,可以考虑实现SourceFuction函数

class MyDataSource implements SourceFunction<WaterSensor> {

private volatile boolean runFlg = true;

// TODO 生产数据

@Override

public void run(SourceContext<WaterSensor> ctx) throws Exception {

while ( runFlg ) {

WaterSensor ws = new WaterSensor(

“sensor_” + new Random().nextInt(5) + 1,

System.currentTimeMillis(),

new Random().nextInt(40)

);

ctx.collect(ws);

Thread.sleep(500);

}

}

// TODO 取消生产数据

@Override

public void cancel() {

runFlg = false;

}

}

在其中需要实现的函数有 run 和 cancel

然后在Source内部,利用一个volatile变量来确定数据源是否接入

其次是具体讲一下Flink中的Transformatior

主要是用于阐述数据的转换的,其实就是一种映射

最基础的就是其中的map函数,和Lambda中的map函数,就是将A数据类型转换为B数据类型

假设我们有一个简单的Integer数据集

final DataStreamSource<Integer> numberDS = streamEnv.fromElements(1, 2, 3, 4, 5, 6);

然后如果使用Lambda的方式书写mapHanshu

final SingleOutputStreamOperator<Integer> map = numberDS.map(

num -> {

return num * 2;

}

);

如果是实现Function的话

final SingleOutputStreamOperator<Integer> map = numberDS.map(

new MapFunction<Integer, Integer>() {

@Override

public Integer map(Integer in) throws Exception {

return in * 2;

}

}

);

Map函数还有着对应的富函数实现

final SingleOutputStreamOperator<String> map = numberDS.map(

new RichMapFunction<Integer, String>() {

@Override

public void open(Configuration parameters) throws Exception {

System.out.println(“open…”);

}

@Override

public String map(Integer value) throws Exception {

// getRuntimeContext方法可以获取运行时环境

final RuntimeContext runtimeContext = this.getRuntimeContext();

return runtimeContext.getTaskName() + “>>>>” + value;

}

@Override

public void close() throws Exception {

System.out.println(“close…”);

}

}

);

在富函数之中,可以通过环境上下文,获取一些额外的数据

富函数中的open表示这函数的开始,可以进行一些初始化,close表示函数的结束,进行一些收尾化操作

这是转换,除此外还可以flatmap,更为细化的进行拆分,消费一个元素来产生0到多个元素

final SingleOutputStreamOperator<Integer> flatMapDS = numberDS.flatMap(

new FlatMapFunction<List<Integer>, Integer>() {

@Override

public void flatMap(List<Integer> numbers, Collector<Integer> out) throws Exception {

// 将每一条数据取出来放置到采集器中

for (Integer num : numbers) {

out.collect(num * 2);

}

}

}

);

还有一个Filter函数,用于过滤数据,留下符合条件的数据

 final SingleOutputStreamOperator<Integer> filter = numberDS.filter(

new FilterFunction<Integer>() {

@Override

public boolean filter(Integer num) throws Exception {

return num % 2 != 0;

}

}

);

然后就是类似SQL 中group by的函数,keyBy

final DataStreamSource<WaterSensor> waterSensorDS =

streamEnv.fromElements(

new WaterSensor(“1001”, System.currentTimeMillis(), 30),

new WaterSensor(“1002”, System.currentTimeMillis(), 30),

new WaterSensor(“1001”, System.currentTimeMillis(), 40),

new WaterSensor(“1002”, System.currentTimeMillis(), 40)

);

 final KeyedStream<WaterSensor, String> waterSensorStringKeyedStream = waterSensorDS.keyBy(

ws -> {

return ws.getId();

}

);

以及将数据打乱,随机分布到下游的api-shuffle

DataStream<String> streamShuffle = stream.shuffle();

还有着Fork-Join框架不可缺少的Join操作,对应到Flink中,就是reduce操作

将一个分组数据流的聚合操作,合并当前的元素,返回的数据流中包含每一次聚合的结果,并不只返回最后一次聚合的结果

final SingleOutputStreamOperator<WaterSensor> reduce = waterSensorKS.reduce(

new ReduceFunction<WaterSensor>() {

@Override

public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {

value1.setVc(value1.getVc() + value2.getVc());

return value1;

}

}

//                (ws1, ws2) -> {

//                    return ws1;

//                }

);

利用两两聚合来达到最终的结果

然后就是process

和reduce的定义类似,也是两两的方式处理数据,不过可以在处理过程中获取环境相关信息

final SingleOutputStreamOperator<WaterSensor> process = waterSensorKS.process(

new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {

@Override

// processElement方法是数据进行流后进行处理操作,其中有3个参数

//  第一个参数:到流的数据

//  第二个参数:当前的流环境对象

//  第三个参数:数据采集器,用于向后面的流传递数据

public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {

//out.collect(ctx.getCurrentKey() + ” => ” + value.getVc());

out.collect(value);

}

}

);

在执行process中,泛型需要三个参数,分别表示分流数据的类型,分流后的数据,修改完成的数据类型

然后函数中相关的参数有,当前流的数据,上下文,收集器

在Flink的整个流程中,除了数据源,转换函数,还有就是输出,这一点我们之前说过,Flink中称之为Sink

下沉,其实更像是一种导出的机制

之前我们使用DataSource的print

其实也是在这个DataSource中添加了一个输出到命令行的Sink

public DataStreamSink<T> print() {

PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();

return addSink(printFunction).name(“Print to Std. Out”);

}

官方其实提供了一部分支持的输出

图片

图片

比如我们先拿一个Kafka Sink进行举例

首先创建一个本地的DataSource

final DataStreamSource<String> waterSensorDS =

streamEnv.fromElements(

“Hello”,

“Flink”,

“Kafka”

);

然后便是创建Kakfa相关的Producer,并进行相关的存储

Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “linux1:9092”);

FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(

“topic-1”

new KafkaSerializationSchema<String>() {

@Override

public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {

return new ProducerRecord(“topic-1”, s.getBytes(StandardCharsets.UTF_8));

}

},

props,

FlinkKafkaProducer.Semantic.EXACTLY_ONCE

);

最后在dataSource中添加Sink

waterSensorDS.addSink(producer);

之后便可以测试Kafka生产者和Flink的集成

除此之外,如果有自定义的需求

可以通过继承相关的类来自定义Sink

class MySink extends RichSinkFunction<String> {

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

}

// invoke调用的意思,数据来了就可以进行处理

@Override

public void invoke(String value, Context context) throws Exception {

System.out.println(“value = ” + value);

}

@Override

public void close() throws Exception {

super.close();

}

}

发表评论

邮箱地址不会被公开。 必填项已用*标注