DataStream中提供多个接口来供实现数据源连接器

SourceFunction 用于定义非并行数据源连接器

ParallelSourceFunction 定义可以运行多个任务实例的数据源连接器.

两者除了一个支持并行一个支持非并行之外,没有什么区别.都是提供了两个方法

Void run(SourceCotext ctx)

Void cancel()

Run方法用来执行具体的数据读入和接收工作,会将这些记录传入Flink应用中.

Run方法只会调用一次,利用其开启一个线程,来不断地读取数据并发出

而Flink会在应用被取消或者关闭的时候调用cancel()方法,为了关闭过程可以顺利完成,run线程应该在cancel方法调用后立刻终止.故可以考虑两者公用一个公共状态,比如下面代码

图片

而Flink为了支持数据一致性,往往是需要支持可重放数据,那么为了支持一个可重放的数据源机制.需要实现保存检查点的相关接口,即CheckpointedFunction接口,将所有偏移和相关元数据存入算子列表状态中.

而且需要保证在单独线程中运行的run方法不会再检查点生成时候运行,所以可以获取一个锁对象,来进行同步处理.

图片

自定义数据源还需要考虑的一点是时间戳和水位线问题

我们之前说过,在分配时间戳和生成水位线的时候,可以选择使用TimestampAssigner或者在数据源函数中完成.

如果想要在数据源中实现水位线生成和分配时间戳,则可以依靠SourceContext中提供的相关方法

def collectWithTimestamp(T record,long timestamp)

def emitWatermark(Watermark watermark)

collectWithTimestam用来发出记录和与之相关的时间戳,

emitWatermark() 用来发出传入的水位线

在数据眼中分配时间戳和生成水位线有着多种好处,可以在获取数据的时候实时分配

比如Kafka,由于往往是一个任务消费多个分区,那么如果直接将多个分区的记录混合发出来,可能导致事件时间出现乱序

那么完全可以独立生成水位线,每个分区应对一个水位线.最后以一个最小值来作为数据流的水位线来发出.

如何实现自定义数据输出函数

DataStream中提供了一个SinkFunction用于专门提供对外输出,内部仅包含一个方法

Void invoke(IN value,Context ctx)

如果只是简单的使用,那么并不困难,如果希望支持相关的失误,则需要考虑是实现WAL机制,还是通过事务性来保证.那么我们就看下两者如何实现.

如果是希望实现事务性,那么首先要确保具有幂等性,幂等性可以送过相关的外部输出操作来保证.

比如我们更新JDBC的时候,可以通过先尝试更新,然后对应的键值数据不存在的时候执行插入.

然后具体的事务性连接器,在Flink中提供了两个模板类,这两个模板都实现了checkpointListener,从而方便获取检查点完成通知,具体的数据输出算子,可以通过继承并实现这两个模板来完成.

GenericWriteAheadSink模板会定期的收集所有需要写出的记录,并写入检查点,方便在故障的时候恢复.当收到检查点完成的通知时候,就会将此次检查点周期的记录写入外部系统.

TwoPhaseCommitSinkFunction 模板利用了外部数据系统的事务功能,对于每个检查点,都开启一个新的事务,并以当前事务作为上下文进行写入数据输出系统,数据输出系统在收到检查点完成通知的时候才会提交事务.

那么我们分别介绍两个接口

GenericWriteAheadSink 结合利用了WAL机制,但是在极端情况下,可能会写出多次记录.

实现原理就是先将检查点后的记录写入到WAL中.

然后等待检查点完成通知的时候,将WAL中对应的记录发出.

提交的具体逻辑分为两步,首先将已经提交的信息持久化,然后从WAL中删除对应的记录.这里的持久化并不是发给外部系统,而是利用了一个名为CheckpointCommitter的可插拔组件来持久化的.

除此外,算子需要实现一个方法

Boolean sendValues(Inerable<IN> values,long checkpointId,long timestamp),其返回一个true或者false

负责将记录写入到外部系统中.

那么他为什么会出现多次写出的问题呢?

1.     常见于程序在运行sendValues()方法时候出现了故障,这样可能会导致恢复的时候重写

2.     所有记录写完了,但是CheckpointCommitter出现了故障,那么可能会重写一遍.

其二是TwoPhaseCommitSinkFunction

我们可以利用Flink内置的TwoPhaseCommitSinkFunction来实现端到端得一致性语义保障.但是这种数据输出算子需要依赖一个能够支持2PC的数据源.

整体的流程如下,数据输出任务在发出首个记录之前,先在外部系统中开启一个事务.然后所有在检查点提交前得数据都纳入到这个事务中,然后收到JobMnager的分隔符的时候,就会将状态准备为预提交,开启一轮新的事务.并向JobManager发送检查点完成消息,当JobManager收到确认消息的时候,就会将完成通知发给所有实现了Listener的确认消息,这时候就可以进行提交事务.将这一轮2PC的周期就完成了.

那么我们看下这个接口的详情

首先需要三个泛型来进行输出 [IN,TXN,CONTEXT]

IN指定输入记录的类型

TXN指定事务识别或者恢复的事务标识符类型

CONTEXT 指定一个可选的自定义上下文对象类型

需要实现的5个方法:

BeginTransaction() 启动一个新的事务并返回事务标识符.

Invoke() 将传入值写入到当前事务中

PreCommit() 预提交事务

Commit() 提交事务

Abort() 终止事务

使用这个接口需要注意的事项有

1.     外部数据输出系统必须支持事务

2.     如果出现了故障,可能会因为超时的原因导致事务关闭,那么所有没提交的数据都会丢失

3.     外部数据系统需要支持终止事务.

发表评论

邮箱地址不会被公开。 必填项已用*标注