[jira] [Created] (FLINK-9164) times(#,#) quantifier does not seem to work

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-9164) times(#,#) quantifier does not seem to work

Shang Yuanchun (Jira)
Romain Revol created FLINK-9164:
-----------------------------------

             Summary: times(#,#) quantifier does not seem to work
                 Key: FLINK-9164
                 URL: https://issues.apache.org/jira/browse/FLINK-9164
             Project: Flink
          Issue Type: Bug
          Components: CEP
    Affects Versions: 1.4.2
         Environment: Windows 10 Pro 64-bit

Core i7-6820HQ @ 2.7 GHz

16GB RAM

Flink 1.4.2

Scala client

Scala 2.11.12
            Reporter: Romain Revol


Assuming the following piece of code :
{code:java}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val inputs = env.fromElements((1L, 'a'), (20L, 'a'), (22L, 'a'), (30L, 'a'), (40L, 'a'))
    .assignAscendingTimestamps(_._1)

val pattern = Pattern.begin[(Long, Char)]("Start", AfterMatchSkipStrategy.skipPastLastEvent())
    .where(_._2 == 'a').times(1,2)

CEP.pattern(inputs, pattern).select(_("Start")).addSink(println(_))

env.execute("Test"{code}
 

This results in
{code:java}
Buffer((1,a))
Buffer((20,a))
Buffer((22,a))
Buffer((30,a))
Buffer((40,a)){code}
While I would expect
{code:java}
Buffer((1,a), (20,a))
Buffer((22,a), (30,a))
Buffer((40,a){code}
My purpose is to match events by pair if possible, or alone if not. Note that adding greedy does nothing mode but this may be due to https://issues.apache.org/jira/browse/FLINK-8914.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
Reply | Threaded
Open this post in threaded view
|

Re: [jira] [Created] (FLINK-9164) times(#,#) quantifier does not seem to work

Kostas Kloudas
Hi Romain,

What if you remove the AfterMatchSkipStrategy.skipPastLastEvent()?

Kostas

> On Apr 12, 2018, at 1:19 PM, Romain Revol (JIRA) <[hidden email]> wrote:
>
> AfterMatchSkipStrategy.skipPastLastEvent()