AfterMatchSkipStrategy for timed out patterns

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

AfterMatchSkipStrategy for timed out patterns

Dominik Wosiński
Hey all,

I was wondering whether for CEP the *AfterMatchSkipStrategy *is applied
during matching or if simply the results are removed after the match. The
question is the result of the experiments I was doing with CEP. Say I have
the readings from some sensor and I want to detect events over some
threshold. So I have something like below:

Pattern.begin[AccelVector]("beginning",
AfterMatchSkipStrategy.skipPastLastEvent())
  .where(_.data() < Threshold)
  .optional
  .followedBy(EventPatternName)
  .where(event => event.data() >= Threshold)
  .oneOrMore
  .greedy
  .consecutive()
  .followedBy("end")
  .where(_.data() < Threshold)
  .oneOrMore
  .within(Time.minutes(1))


The thing is that sometimes sensors may stop sending data or the data is
lost so I would like to emit events that have technically timed out. I have
created a PatternProcessFunction that simply gets events that have timed
out and check for *EventPatternName* part.

It works fine, but I have noticed weird behavior that the events that get
passed to the *processTimedOutMatch *are repeated as if there was no
*AfterMatchSkipStrategy.*

So, for example say the Threshold=200, and I have the following events for
one of the sensors:
Event1 (timestamp= 1, data = 10)
Event2 (timestamp= 2, data = 250)
Event3 (timestamp= 3, data = 300)
Event4 (timestamp= 4, data = 350)
Event5 (timestamp= 5, data = 400)
Event6 (timestamp= 6, data = 450)

After that, this sensor stops sending data but others are sending data so
the watermark is progressing - this obviously causes timeout of the
pattern. And the issue I have is the fact that  *processTimedOutMatch* gets
called multiple times, first for the whole pattern Event1 to Event6 and
each call just skips one event so next, I have Event2 to Event 6, Event3 to
Event6 up to just Event6.

My understanding is that *AfterMatchSkipStrategy *should wipe out those
partial matches or does it work differently for timed out matches?

Thanks in advance,
Best Regards,
Dom.