我们来说一个Flink中的框架CEP

由于对于数据处理,可以规划为一个状态机,只有满足了这个状态机的状态变化,达到了一定的规则匹配,才能得到用户想要的数据

常见的就是字符串的正则表达式匹配

比如下面,我们将事件的类型进行划分

图片

只有满足两个事件同时是圆圈的,才会符合规则,进行输出

这就常用于监控这类需求

支持连续的条件和不连续的条件,而且可以进行时间上的限制,而且支持将超时的数据进行侧输出流输出

对于CEP类库的使用,需要先引用Maven相关的依赖

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-cep_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

而对于这样一个类库的使用,可以分为如下四个不走

1. 定义数据流

2. 定义规则

3. 应用规则

4. 获取结果

代码如下:

定义数据流

        final DataStreamSource<LoginEvent> loginEventDS = streamEnv.fromElements(

new LoginEvent(“user1”, “1.0.0.1”, “success”, 1000L),

new LoginEvent(“user2”, “1.0.0.2”, “fail”, 2000L),

new LoginEvent(“user3”, “1.0.0.3”, “fail”, 3000L),

new LoginEvent(“user4”, “1.0.0.4”, “success”, 4000L),

new LoginEvent(“user5”, “1.0.0.5”, “fail”, 5000L)

);

然后规则的定义

final Pattern<LoginEvent, LoginEvent> pattern = Pattern.<LoginEvent>begin(“start”).where(

new SimpleCondition<LoginEvent>() {

@Override

public boolean filter(LoginEvent evt) throws Exception {

return “success”.equals(evt.getResultType());

}

}

);

利用Pattern.begin 创建一个规则,where规则确定匹配type为success的事件

第三部,应用规则

final PatternStream<LoginEvent> ps = CEP.pattern(loginEventDS, pattern);

最后获取结果

ps.select(

new PatternSelectFunction<LoginEvent, String>() {

@Override

public String select(Map<String, List<LoginEvent>> map) throws Exception {

final List<LoginEvent> start = map.get(“start”);

return start.toString();

}

}

).print();

而其中最为重要的匹配规则,可以分为几类

条件匹配

模式序列

量词

超时

其中简单条件匹配

可以有 where  or 以及终止条件 until

比如我们生成一个如下的条件匹配

final Pattern<LoginEvent, LoginEvent> pattern =

Pattern.<LoginEvent>begin(“start”)

.where(

new SimpleCondition<LoginEvent>() {

@Override

public boolean filter(LoginEvent evt) throws Exception {

return “fail”.equals(evt.getResultType());

}

}

)

.oneOrMore().until(

new SimpleCondition<LoginEvent>() {

@Override

public boolean filter(LoginEvent value) throws Exception {

return value.getEventTime() == 5000L;

}

}

).within(Time.seconds(10));

匹配其中状态为fail,且eventtime小于5000L

并且等待10s

并且可以从其中获取到超时输出流

OutputTag<String> tag = new OutputTag<String>(“timeout”){};

final SingleOutputStreamOperator<String> start = ps.select(

tag,

new PatternTimeoutFunction<LoginEvent, String>() {

@Override

public String timeout(Map<String, List<LoginEvent>> map, long l) throws Exception {

return map.toString();

}

},

new PatternSelectFunction<LoginEvent, String>() {

@Override

public String select(Map<String, List<LoginEvent>> map) throws Exception {

final List<LoginEvent> start = map.get(“start”);

return start.toString();

}

}

);

start.getSideOutput(tag).print(“timeout>>>>”);

start.print(“normal>>>>”);

更加复杂的模式序列,则是包含了联合条件,比如严格近邻,当数据为连续的满足的时候,才会命中

.begin[(String, String)](“start”)

.where(_._1 == “a”)

.next(“next”)

.where(_._1 == “b”)

亦或者宽松近邻

.begin[(String, String)](“start”)

.where(_._1 == “a”)

.followedBy(“followedBy”)

.where(_._1 == “b”)

亦或者次数匹配

Pattern

.begin[(String, String)](“start”)

.where(_._1 == “sensor1”)

.times(3)

发表评论

邮箱地址不会被公开。 必填项已用*标注