我们来说一个Flink中的框架CEP
由于对于数据处理,可以规划为一个状态机,只有满足了这个状态机的状态变化,达到了一定的规则匹配,才能得到用户想要的数据
常见的就是字符串的正则表达式匹配
比如下面,我们将事件的类型进行划分
只有满足两个事件同时是圆圈的,才会符合规则,进行输出
这就常用于监控这类需求
支持连续的条件和不连续的条件,而且可以进行时间上的限制,而且支持将超时的数据进行侧输出流输出
对于CEP类库的使用,需要先引用Maven相关的依赖
<dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-cep_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> |
而对于这样一个类库的使用,可以分为如下四个不走
1. 定义数据流
2. 定义规则
3. 应用规则
4. 获取结果
代码如下:
定义数据流
final DataStreamSource<LoginEvent> loginEventDS = streamEnv.fromElements(
new LoginEvent(“user1”, “1.0.0.1”, “success”, 1000L), new LoginEvent(“user2”, “1.0.0.2”, “fail”, 2000L), new LoginEvent(“user3”, “1.0.0.3”, “fail”, 3000L), new LoginEvent(“user4”, “1.0.0.4”, “success”, 4000L), new LoginEvent(“user5”, “1.0.0.5”, “fail”, 5000L) ); |
然后规则的定义
final Pattern<LoginEvent, LoginEvent> pattern = Pattern.<LoginEvent>begin(“start”).where(
new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent evt) throws Exception { return “success”.equals(evt.getResultType()); } } ); |
利用Pattern.begin 创建一个规则,where规则确定匹配type为success的事件
第三部,应用规则
final PatternStream<LoginEvent> ps = CEP.pattern(loginEventDS, pattern);
最后获取结果
ps.select(
new PatternSelectFunction<LoginEvent, String>() { @Override public String select(Map<String, List<LoginEvent>> map) throws Exception { final List<LoginEvent> start = map.get(“start”); return start.toString(); } } ).print(); |
而其中最为重要的匹配规则,可以分为几类
条件匹配
模式序列
量词
超时
其中简单条件匹配
可以有 where or 以及终止条件 until
比如我们生成一个如下的条件匹配
final Pattern<LoginEvent, LoginEvent> pattern =
Pattern.<LoginEvent>begin(“start”) .where( new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent evt) throws Exception { return “fail”.equals(evt.getResultType()); } } ) .oneOrMore().until( new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent value) throws Exception { return value.getEventTime() == 5000L; } } ).within(Time.seconds(10)); |
匹配其中状态为fail,且eventtime小于5000L
并且等待10s
并且可以从其中获取到超时输出流
OutputTag<String> tag = new OutputTag<String>(“timeout”){};
final SingleOutputStreamOperator<String> start = ps.select( tag, new PatternTimeoutFunction<LoginEvent, String>() { @Override public String timeout(Map<String, List<LoginEvent>> map, long l) throws Exception { return map.toString(); } }, new PatternSelectFunction<LoginEvent, String>() { @Override public String select(Map<String, List<LoginEvent>> map) throws Exception { final List<LoginEvent> start = map.get(“start”); return start.toString(); } } ); start.getSideOutput(tag).print(“timeout>>>>”); start.print(“normal>>>>”); |
更加复杂的模式序列,则是包含了联合条件,比如严格近邻,当数据为连续的满足的时候,才会命中
.begin[(String, String)](“start”)
.where(_._1 == “a”) .next(“next”) .where(_._1 == “b”) |
亦或者宽松近邻
.begin[(String, String)](“start”)
.where(_._1 == “a”) .followedBy(“followedBy”) .where(_._1 == “b”) |
亦或者次数匹配
Pattern
.begin[(String, String)](“start”) .where(_._1 == “sensor1”) .times(3) |