Flink中异步的访问外部系统
在Flink中处理数据的时候,有时候需要和外部存储系统进行交互,比如在MapFunction中,处理数据的时候去查询外部的数据库,来获取某些特定消息
那么为了减少MapFunction的处理时间,提供了AsyncFunction来降低IO调用带来的延迟,能够提供多个查询并进行异步的处理.它可以通过配置选择对记录进行保序,也可以为了降低延迟而不进行报序.
而且其是内部提供的接口,故支持使用事件时间,能够保证记录不会被水位线超过,而且和检查点机制进行了集成,支持恢复时候重新发送请求
接口的内容如下:
函数中的IN 和 OUT定义了输入和输出的数据类型,对于每个输入记录都会使用两个参数来调用
AsyncInvoke() 方法,具体的使用方式如下
AsyncDataStream提供了两个方法来创建DataStream
orderedwait()和unorderedwait(),都提供了不同的重载方式
分别对应了顺序的调用和不顺序的调用
我们接下来实现一个AsyncFunction来通过JDBC来调用数据库
AsyncInvoke()方法将查询封装JDBC查询封装在Future对象中,然后调用onCompleete
讲结果传递给resultFuture
需要注意,AsyncFunction会对记录进行串行的调用,所以asyncInvoke()方法需要在异步请求后立刻返回,所以应避免使用阻塞式API.