Apache FlinkCEP 实现超时状态监控的步骤详解

发布时间:2019-10-28 09:34 来源:互联网 当前栏目:网站服务器

 

CEP - Complex Event Processing复杂事件处理。

订单下单后超过一定时间还未进行支付确认。

打车订单生成后超过一定时间没有确认上车。

外卖超过预定送达时间一定时限还没有确认送达。

Apache FlinkCEP API

CEPTimeoutEventJob

FlinkCEP源码简析

DataStream和PatternStream

DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。

PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。

CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:

public
 
static
 <IN, OUT> 
SingleOutputStreamOperator
<OUT> createPatternStream(...){...}
public

static
 <IN, OUT1, OUT2> 
SingleOutputStreamOperator
<OUT1> createTimeoutPatternStream(...){...}

final
 
SingleOutputStreamOperator
<OUT> patternStream;

SingleOutputStreamOperator

@Public

public
 
class
 
SingleOutputStreamOperator
<T> 
extends
 
DataStream
<T> {...}

PatternStream的构造方法:

PatternStream
(
final
 
DataStream
<T> inputStream, 
final
 
Pattern
<T, ?> pattern) {

  
this
.inputStream = inputStream;

  
this
.pattern = pattern;

  
this
.comparator = 
null
;

}



PatternStream
(
final
 
DataStream
<T> inputStream, 
final
 
Pattern
<T, ?> pattern, 
final
 
EventComparator
<T> comparator) {

  
this
.inputStream = inputStream;

  
this
.pattern = pattern;

  
this
.comparator = comparator;

}

Pattern、Quantifier和EventComparator

Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。

如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。

public
class
Pattern
<T, F 
extends
 T> {
/** 模式名称 */
private
final
String
 name;
/** 前面一个模式 */
private
final
Pattern
<T, ? 
extends
 T> previous;
/** 一个事件如果要被当前模式匹配到,必须满足的约束条件 */
private
IterativeCondition
<F> condition;
/** 时间窗口长度,在时间长度内进行模式匹配 */
private
Time
 windowTime;
/** 模式量词,意思是一个模式匹配几个事件等 默认是匹配到一个 */
private
Quantifier
 quantifier = 
Quantifier
.one(
ConsumingStrategy
.STRICT);
/** 停止将事件收集到循环状态时,事件必须满足的条件 */
private
IterativeCondition
<F> untilCondition;
/**
   * 适用于{@code times}模式,用来维护模式里事件可以连续发生的次数
   */
private
Times
 times;
// 匹配到事件之后的跳过策略
private
final
AfterMatchSkipStrategy
 afterMatchSkipStrategy;
  ...
}

Quantifier是用来描述具体模式行为的,主要有三大类:

  • 1、
  • 2、
  • 3、
  • 4、
  • 5、
  • 6、
  • 7、
  • 8、
  • 9、
  • 10、
  • 11、
  • 12、
  • 13、
  • 14、
  • 15、
  • 16、
  • 17、
  • 18、
  • 19、
  • 20、
  • 21、
  • 22、
  • 23、
  • 24、
  • 25、
  • 1、
  • 2、
  • 3、
  • 4、
  • 5、
  • 6、
  • 7、
  • 8、
  • 9、
  • 10、
  • 11、
  • 12、
  • 13、
  • 14、
  • 15、
  • 16、
  • 17、
  • 18、
  • 19、
  • 20、
  • 21、
  • 22、
  • 23、
  • 24、
  • 25、