Assume I have a datastream
x:1, y:2 , z:3 , x:7 , y:-1, z:0, z:3 , z:2, y:3 ,x: 2 ,y:6 How do I put x,y,z in their own bucket and apply my CEP rule on it. x:1, x:7,x: 2 y:2, y:-1, y:3 , y:6 z:3, z:0 , z:3, z:2 Or to put it in other way. How do I split the stream in to these categories(one stream for each x,y,z). I would get 3 sub streams which has their own CEP processing. The challenge here is , the x,y,z are not pre-defined.So I cannot pre-create streams and assign using an if or switch statement. |
I think what you want here is to apply CEP processing on a KeyedStream -
see the last CEP Example here: https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/libs/cep.html#examples On Tue, Jan 3, 2017 at 3:30 AM, madhairsilence <[hidden email]> wrote: > Assume I have a datastream > > *x:1, y:2 , z:3 , x:7 , y:-1, z:0, z:3 , z:2, y:3 ,x: 2 ,y:6* > How do I put x,y,z in their own bucket and apply my CEP rule on it. > > *x:1, x:7,x: 2 > y:2, y:-1, y:3 , y:6 > z:3, z:0 , z:3, z:2* > Or to put it in other way. How do I split the stream in to these > categories(one stream for each x,y,z). I would get 3 sub streams which has > their own CEP processing. > > The challenge here is , the x,y,z are not pre-defined.So I cannot > pre-create > streams and assign using an if or switch statement. > > > > -- > View this message in context: http://apache-flink-mailing- > list-archive.1008284.n3.nabble.com/Categorize-or- > GroupBy-datastream-data-and-process-with-CEP-separately-tp15139.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > -- Jamie Grier data Artisans, Director of Applications Engineering @jamiegrier <https://twitter.com/jamiegrier> [hidden email] |
Hi
Thanks for the response. But sorry . Am not sure how is it inline with my requirement Right now, am using SplitterStream to split it based . Which helps me splitting the response to separate streams. The problem is I have a dynamic set of splitstream . the x, y, z is not predefined. It can extend to w, u...So I cannot assign it a specific variable. I only way is to assign it an array. Does it make sense if I use "Array of DataStream" and perform CEP inside a for loop (for every stream) |
In reply to this post by Jamie Grier
This is my code. SplitStream<MonitoringEvent> splitStream = inputStream.split(new OutputSelector<MonitoringEvent>() { @Override public Iterable<String> select(MonitoringEvent me) { List<String> ml = new ArrayList<String>(); ml.add(me.getEventType()); return ml; } I have stream of Monitoring Events coming in random order temp : 80, pressure : 70 , humidity :80, temp:30... With the above code, am splitting the stream , eventType wise i.e temperatureStream, pressureStream. The problem is , if I know the eventType, i can select it from the splitStream like splitStream.select('temperatureStream') but the eventType is dynamic and not pre-defined. How will I apply CEP for this dynamic stream. The CEP would be like, if the temperate is > 90 for past 10 minutes ... pressure is > 90 for past 10 minutes ... |
If you are trying to do the same CEP computation for each event type that's
exactly what will happen for a KeyedStream. For example if you key by event type you can think of this like creating a separate substream for each key/eventType and then applying the CEP operations to each substream individually. Is this not what you're trying to do? On Wed, Jan 4, 2017 at 3:32 AM, madhairsilence <[hidden email]> wrote: > > This is my code. > > * SplitStream<MonitoringEvent> splitStream = inputStream.split(new > OutputSelector<MonitoringEvent>() { > > @Override > public Iterable<String> select(MonitoringEvent me) { > > List<String> ml = new ArrayList<String>(); > ml.add(me.getEventType()); > return ml; > }* > I have stream of Monitoring Events coming in random order temp : 80, > pressure : 70 , humidity :80, temp:30... > > With the above code, am splitting the stream , eventType wise i.e > temperatureStream, pressureStream. > > The problem is , if I know the eventType, i can select it from the > splitStream like > > *splitStream.select('temperatureStream')* > but the eventType is dynamic and not pre-defined. > > How will I apply CEP for this dynamic stream. The CEP would be like, if the > > *temperate is > 90 for past 10 minutes ... > > pressure is > 90 for past 10 minutes ...* > > > > -- > View this message in context: http://apache-flink-mailing- > list-archive.1008284.n3.nabble.com/Categorize-or- > GroupBy-datastream-data-and-process-with-CEP-separately-tp15139p15148.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > -- Jamie Grier data Artisans, Director of Applications Engineering @jamiegrier <https://twitter.com/jamiegrier> [hidden email] |
Thanks for you reply and absolutely apologies for my insanity
Thats the place I confuse a lot. The keyedstream, processe by unique key. But as per the example is see, I can make a pattern like temperature > 90...temperature > 80... within 'T' minutes. What am trying to achieve ,is "Temperate is > 90 continously for the past ten minutes" just like the "within(X minutes)", does CEP has "for(X minutes)" syntax? Thats the reason, I thought of using "SpitStream" and create a "Window" for a given time and the apply CEP for that stream. But I stuck with dynamic "Key" issue. |
Hi,
Flink’s Pattern API allows you to specify a time interval for a pattern to occur. Use the within call to specify a valid time interval for your pattern. Pattern<Event, ?> pattern = ...; Pattern<Event, ?> timedPattern = pattern.within(Time.minutes(10)); Cheers, Till On Thu, Jan 5, 2017 at 7:39 AM, madhairsilence <[hidden email]> wrote: > Thanks for you reply and absolutely apologies for my insanity > > Thats the place I confuse a lot. > > The keyedstream, processe by unique key. But as per the example is see, I > can make a pattern like > > temperature > 90...temperature > 80... within 'T' minutes. > > What am trying to achieve ,is "Temperate is > 90 continously for the past > ten minutes" > > just like the "within(X minutes)", does CEP has "for(X minutes)" syntax? > > Thats the reason, I thought of using "SpitStream" and create a "Window" for > a given time and the apply CEP for that stream. But I stuck with dynamic > "Key" issue. > > > > > -- > View this message in context: http://apache-flink-mailing- > list-archive.1008284.n3.nabble.com/Categorize-or- > GroupBy-datastream-data-and-process-with-CEP-separately-tp15139p15153.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > |
Free forum by Nabble | Edit this page |