//Creating a window of ten items
WindowedStream<ObservationEvent,Tuple,GlobalWindow> windowStream = inputStream.keyBy("rackId").countWindow(10); // Applying a Window Function , adding some custom evaluating all the values in the window DataStream<ObservationEvent> inactivityStream = windowStream.apply(new WindowFunction<ObservationEvent, ObservationEvent , Tuple , GlobalWindow>() { @Override public void apply(Tuple tuple, GlobalWindow timeWindow, Iterable<ObservationEvent> itr, Collector<ObservationEvent> out) //custom evaluation logic out.collect(new ObservationEvent(1,"temperature", "stable")); } }); //Defining Simple CEP Pattern Pattern<ObservationEvent, ?> inactivityPattern = Pattern.<ObservationEvent>begin("first") .subtype(ObservationEvent.class) .where(new FilterFunction<ObservationEvent>() { @Override public boolean filter(ObservationEvent arg0) throws Exception { System.out.println( arg0 ); //This function is not at all called return false; } }); PatternStream<ObservationEvent> inactivityCEP = CEP.pattern(inactivityStream.keyBy("rackId"), inactivityPattern); When I run this code, the filter function inside the where clause is not at all getting called. I have printed the inactivityStream.print() and I can see the matching value. Now, when I plug in the inputStream directly without applying a window. The pattern is matching I printed inputStream and WindowedStream and I can see they both send similar kind of data. What am I missing |
Anybody?. Got kinda stuck here :(
|
Hi
Because flink use lazy evaluation and because inactivityCEP don't have any output then you pattern not apply. / inactivityCEP.select(new PatternSelectFunction<ObservationEvent, Object>() { @Override public Object select(Map<String, ObservationEvent> pattern) throws Exception { System.out.println(pattern); return null; } }); / >> What am I missing you missing output ;) 2017-01-07 22:47 GMT+04:00 madhairsilence <[hidden email]>: > Anybody?. Got kinda stuck here :( > > > > -- > View this message in context: http://apache-flink-mailing- > list-archive.1008284.n3.nabble.com/DataStream-and-CEP- > Pattern-not-matching-after-applying-a-window-tp15164p15173.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > |
In reply to this post by madhairsilence
Hi
Because flink use lazy evaluation and because inactivityCEP don't have any output then you pattern not apply. / inactivityCEP.select(new PatternSelectFunction<ObservationEvent, Object>() { @Override public Object select(Map<String, ObservationEvent> pattern) throws Exception { System.out.println(pattern); return null; } }); / >> What am I missing you missing output ;) 2017-01-07 22:47 GMT+04:00 madhairsilence <[hidden email]>: > Anybody?. Got kinda stuck here :( > > > > -- > View this message in context: http://apache-flink-mailing- > list-archive.1008284.n3.nabble.com/DataStream-and-CEP- > Pattern-not-matching-after-applying-a-window-tp15164p15173.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > |
Hi..Thanks for the reply.
I can see the pattern is not getting data. So what correct I have to do. I want to apply the pattern only after the window function is executed. Sorry to say, I could hardly get any info from the documentation. |
Free forum by Nabble | Edit this page |