Flink CEP greedy match of single pattern

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink CEP greedy match of single pattern

Dominik Wosiński
Hey,
I have a question regarding CEP, assume I have a stream of readings from
various sensors. The application is running in EventTime, so according to
the CEP docs the events are buffered and sorted by timestamp ascending.

So, I want to record the situations when reading from the sensor goes above
some threshold. But what I am interested in is to have a whole match for
the period when the event was above the threshold.

I tried to implement a single pattern that was more or less something:


Pattern.begin[Reading]("beginning")
  .where(_.data() <  Threshold)

  .oneOrMore

  .greedy

  .consecutive



But now it produces multiple partial matches that I can't eliminate. For
example for threshold = 350, I have a stream:

300, 400, 500, 300

And then I get the following lists of events [400], [400, 500], [500].

Is there a way to eliminate those partial matches ??

Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP greedy match of single pattern

Till Rohrmann
Hi Dominik,

you can control FlinkCEP's consumption behaviour via the after match skip
strategies [1]. They allow you to control how Flink treats events after a
match has occurred.

If you are interested in the longest possible window of events exceeding
your threshold, then you could also add terminating event which is below
the threshold. Only then you can be sure that any following event won't
continue the window.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#after-match-skip-strategy

Cheers,
Till

On Fri, Feb 21, 2020 at 10:56 AM Dominik Wosiński <[hidden email]> wrote:

> Hey,
> I have a question regarding CEP, assume I have a stream of readings from
> various sensors. The application is running in EventTime, so according to
> the CEP docs the events are buffered and sorted by timestamp ascending.
>
> So, I want to record the situations when reading from the sensor goes above
> some threshold. But what I am interested in is to have a whole match for
> the period when the event was above the threshold.
>
> I tried to implement a single pattern that was more or less something:
>
>
> Pattern.begin[Reading]("beginning")
>   .where(_.data() <  Threshold)
>
>   .oneOrMore
>
>   .greedy
>
>   .consecutive
>
>
>
> But now it produces multiple partial matches that I can't eliminate. For
> example for threshold = 350, I have a stream:
>
> 300, 400, 500, 300
>
> And then I get the following lists of events [400], [400, 500], [500].
>
> Is there a way to eliminate those partial matches ??
>
> Best Regards,
> Dom.
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP greedy match of single pattern

Dominik Wosiński
Hey, thanks for the answer.

But if I add the *AfterMatchSkipStrategy* it simply seems to emit event by
event so in the case described above it does emit: [400], [500]
Shouldn't the *greedy* quantifier guarantee that this will be matched as
many times as possible thus creating [400, 500] ??

Thanks again.
Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP greedy match of single pattern

Dominik Wosiński
P.S.
So now my pattern looks like this:

Pattern.begin[AccelVector](EventPatternName,
AfterMatchSkipStrategy.skipPastLastEvent())
  .where(_.data() > Threshold)
  .oneOrMore
  .greedy
  .consecutive()
  .within(Time.minutes(1))


śr., 25 mar 2020 o 10:03 Dominik Wosiński <[hidden email]> napisał(a):

> Hey, thanks for the answer.
>
> But if I add the *AfterMatchSkipStrategy* it simply seems to emit event
> by event so in the case described above it does emit: [400], [500]
> Shouldn't the *greedy* quantifier guarantee that this will be matched as
> many times as possible thus creating [400, 500] ??
>
> Thanks again.
> Best Regards,
> Dom.
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP greedy match of single pattern

dwysakowicz
Hi Dominik,

I think you are hitting a bug. The greedy quantifier does not work well
if applied for the last element of a pattern. There is a jira issue to
improve support for greedy qualifier[1].

You could work it around with adding an additional state at the end. E.g. :

Pattern.begin[AccelVector](EventPatternName,
AfterMatchSkipStrategy.skipPastLastEvent())
  .where(_.data() > Threshold)
  .oneOrMore
  .greedy
  .consecutive()
  .next("b")
  .where(BooleanConditions.true())
  .within(Time.minutes(1))

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-10587



On 25/03/2020 10:07, Dominik Wosiński wrote:

> P.S.
> So now my pattern looks like this:
>
> Pattern.begin[AccelVector](EventPatternName,
> AfterMatchSkipStrategy.skipPastLastEvent())
>   .where(_.data() > Threshold)
>   .oneOrMore
>   .greedy
>   .consecutive()
>   .within(Time.minutes(1))
>
>
> śr., 25 mar 2020 o 10:03 Dominik Wosiński <[hidden email]> napisał(a):
>
>> Hey, thanks for the answer.
>>
>> But if I add the *AfterMatchSkipStrategy* it simply seems to emit event
>> by event so in the case described above it does emit: [400], [500]
>> Shouldn't the *greedy* quantifier guarantee that this will be matched as
>> many times as possible thus creating [400, 500] ??
>>
>> Thanks again.
>> Best Regards,
>> Dom.
>>


signature.asc (849 bytes) Download Attachment