|
Hey all,
I was wondering whether for CEP the *AfterMatchSkipStrategy *is applied
during matching or if simply the results are removed after the match. The
question is the result of the experiments I was doing with CEP. Say I have
the readings from some sensor and I want to detect events over some
threshold. So I have something like below:
Pattern.begin[AccelVector]("beginning",
AfterMatchSkipStrategy.skipPastLastEvent())
.where(_.data() < Threshold)
.optional
.followedBy(EventPatternName)
.where(event => event.data() >= Threshold)
.oneOrMore
.greedy
.consecutive()
.followedBy("end")
.where(_.data() < Threshold)
.oneOrMore
.within(Time.minutes(1))
The thing is that sometimes sensors may stop sending data or the data is
lost so I would like to emit events that have technically timed out. I have
created a PatternProcessFunction that simply gets events that have timed
out and check for *EventPatternName* part.
It works fine, but I have noticed weird behavior that the events that get
passed to the *processTimedOutMatch *are repeated as if there was no
*AfterMatchSkipStrategy.*
So, for example say the Threshold=200, and I have the following events for
one of the sensors:
Event1 (timestamp= 1, data = 10)
Event2 (timestamp= 2, data = 250)
Event3 (timestamp= 3, data = 300)
Event4 (timestamp= 4, data = 350)
Event5 (timestamp= 5, data = 400)
Event6 (timestamp= 6, data = 450)
After that, this sensor stops sending data but others are sending data so
the watermark is progressing - this obviously causes timeout of the
pattern. And the issue I have is the fact that *processTimedOutMatch* gets
called multiple times, first for the whole pattern Event1 to Event6 and
each call just skips one event so next, I have Event2 to Event 6, Event3 to
Event6 up to just Event6.
My understanding is that *AfterMatchSkipStrategy *should wipe out those
partial matches or does it work differently for timed out matches?
Thanks in advance,
Best Regards,
Dom.
|