Hey,
I have a question regarding CEP, assume I have a stream of readings from various sensors. The application is running in EventTime, so according to the CEP docs the events are buffered and sorted by timestamp ascending. So, I want to record the situations when reading from the sensor goes above some threshold. But what I am interested in is to have a whole match for the period when the event was above the threshold. I tried to implement a single pattern that was more or less something: Pattern.begin[Reading]("beginning") .where(_.data() < Threshold) .oneOrMore .greedy .consecutive But now it produces multiple partial matches that I can't eliminate. For example for threshold = 350, I have a stream: 300, 400, 500, 300 And then I get the following lists of events [400], [400, 500], [500]. Is there a way to eliminate those partial matches ?? Best Regards, Dom. |
Hi Dominik,
you can control FlinkCEP's consumption behaviour via the after match skip strategies [1]. They allow you to control how Flink treats events after a match has occurred. If you are interested in the longest possible window of events exceeding your threshold, then you could also add terminating event which is below the threshold. Only then you can be sure that any following event won't continue the window. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#after-match-skip-strategy Cheers, Till On Fri, Feb 21, 2020 at 10:56 AM Dominik Wosiński <[hidden email]> wrote: > Hey, > I have a question regarding CEP, assume I have a stream of readings from > various sensors. The application is running in EventTime, so according to > the CEP docs the events are buffered and sorted by timestamp ascending. > > So, I want to record the situations when reading from the sensor goes above > some threshold. But what I am interested in is to have a whole match for > the period when the event was above the threshold. > > I tried to implement a single pattern that was more or less something: > > > Pattern.begin[Reading]("beginning") > .where(_.data() < Threshold) > > .oneOrMore > > .greedy > > .consecutive > > > > But now it produces multiple partial matches that I can't eliminate. For > example for threshold = 350, I have a stream: > > 300, 400, 500, 300 > > And then I get the following lists of events [400], [400, 500], [500]. > > Is there a way to eliminate those partial matches ?? > > Best Regards, > Dom. > |
Hey, thanks for the answer.
But if I add the *AfterMatchSkipStrategy* it simply seems to emit event by event so in the case described above it does emit: [400], [500] Shouldn't the *greedy* quantifier guarantee that this will be matched as many times as possible thus creating [400, 500] ?? Thanks again. Best Regards, Dom. |
P.S.
So now my pattern looks like this: Pattern.begin[AccelVector](EventPatternName, AfterMatchSkipStrategy.skipPastLastEvent()) .where(_.data() > Threshold) .oneOrMore .greedy .consecutive() .within(Time.minutes(1)) śr., 25 mar 2020 o 10:03 Dominik Wosiński <[hidden email]> napisał(a): > Hey, thanks for the answer. > > But if I add the *AfterMatchSkipStrategy* it simply seems to emit event > by event so in the case described above it does emit: [400], [500] > Shouldn't the *greedy* quantifier guarantee that this will be matched as > many times as possible thus creating [400, 500] ?? > > Thanks again. > Best Regards, > Dom. > |
Hi Dominik,
I think you are hitting a bug. The greedy quantifier does not work well if applied for the last element of a pattern. There is a jira issue to improve support for greedy qualifier[1]. You could work it around with adding an additional state at the end. E.g. : Pattern.begin[AccelVector](EventPatternName, AfterMatchSkipStrategy.skipPastLastEvent()) .where(_.data() > Threshold) .oneOrMore .greedy .consecutive() .next("b") .where(BooleanConditions.true()) .within(Time.minutes(1)) Best, Dawid [1] https://issues.apache.org/jira/browse/FLINK-10587 On 25/03/2020 10:07, Dominik Wosiński wrote: > P.S. > So now my pattern looks like this: > > Pattern.begin[AccelVector](EventPatternName, > AfterMatchSkipStrategy.skipPastLastEvent()) > .where(_.data() > Threshold) > .oneOrMore > .greedy > .consecutive() > .within(Time.minutes(1)) > > > śr., 25 mar 2020 o 10:03 Dominik Wosiński <[hidden email]> napisał(a): > >> Hey, thanks for the answer. >> >> But if I add the *AfterMatchSkipStrategy* it simply seems to emit event >> by event so in the case described above it does emit: [400], [500] >> Shouldn't the *greedy* quantifier guarantee that this will be matched as >> many times as possible thus creating [400, 500] ?? >> >> Thanks again. >> Best Regards, >> Dom. >> signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |