Flink中异步的访问外部系统

在Flink中处理数据的时候,有时候需要和外部存储系统进行交互,比如在MapFunction中,处理数据的时候去查询外部的数据库,来获取某些特定消息

那么为了减少MapFunction的处理时间,提供了AsyncFunction来降低IO调用带来的延迟,能够提供多个查询并进行异步的处理.它可以通过配置选择对记录进行保序,也可以为了降低延迟而不进行报序.

而且其是内部提供的接口,故支持使用事件时间,能够保证记录不会被水位线超过,而且和检查点机制进行了集成,支持恢复时候重新发送请求

接口的内容如下:

图片

函数中的IN 和 OUT定义了输入和输出的数据类型,对于每个输入记录都会使用两个参数来调用

AsyncInvoke() 方法,具体的使用方式如下

图片

AsyncDataStream提供了两个方法来创建DataStream

orderedwait()和unorderedwait(),都提供了不同的重载方式

分别对应了顺序的调用和不顺序的调用

我们接下来实现一个AsyncFunction来通过JDBC来调用数据库

AsyncInvoke()方法将查询封装JDBC查询封装在Future对象中,然后调用onCompleete

讲结果传递给resultFuture

需要注意,AsyncFunction会对记录进行串行的调用,所以asyncInvoke()方法需要在异步请求后立刻返回,所以应避免使用阻塞式API.

发表评论

邮箱地址不会被公开。 必填项已用*标注