Categorize or GroupBy datastream data and process with CEP separately

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Categorize or GroupBy datastream data and process with CEP separately

madhairsilence
Assume I have a datastream

x:1, y:2 , z:3 , x:7 , y:-1, z:0, z:3 , z:2, y:3 ,x: 2 ,y:6
How do I put x,y,z in their own bucket and apply my CEP rule on it.

x:1, x:7,x: 2
y:2, y:-1, y:3 , y:6
z:3, z:0 , z:3, z:2

Or to put it in other way. How do I split the stream in to these categories(one stream for each x,y,z). I would get 3 sub streams which has their own CEP processing.

The challenge here is , the x,y,z are not pre-defined.So I cannot pre-create streams and assign using an if or switch statement.
Reply | Threaded
Open this post in threaded view
|

Re: Categorize or GroupBy datastream data and process with CEP separately

Jamie Grier
I think what you want here is to apply CEP processing on a KeyedStream -
see the last CEP Example here:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/libs/cep.html#examples


On Tue, Jan 3, 2017 at 3:30 AM, madhairsilence <[hidden email]>
wrote:

> Assume I have a datastream
>
> *x:1, y:2 , z:3 , x:7 , y:-1, z:0, z:3 , z:2, y:3 ,x: 2 ,y:6*
> How do I put x,y,z in their own bucket and apply my CEP rule on it.
>
> *x:1, x:7,x: 2
> y:2, y:-1, y:3 , y:6
> z:3, z:0 , z:3, z:2*
> Or to put it in other way. How do I split the stream in to these
> categories(one stream for each x,y,z). I would get 3 sub streams which has
> their own CEP processing.
>
> The challenge here is , the x,y,z are not pre-defined.So I cannot
> pre-create
> streams and assign using an if or switch statement.
>
>
>
> --
> View this message in context: http://apache-flink-mailing-
> list-archive.1008284.n3.nabble.com/Categorize-or-
> GroupBy-datastream-data-and-process-with-CEP-separately-tp15139.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>



--

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Categorize or GroupBy datastream data and process with CEP separately

madhairsilence
Hi

Thanks for the response. But sorry . Am not sure how is it inline with my requirement

Right now, am using SplitterStream to split it based . Which helps me splitting the response to separate streams.

The problem is
I have a dynamic set of splitstream . the x, y, z is not predefined. It can extend to w, u...So I cannot assign it a specific variable. I only way is to assign it an array. Does it make sense if I use "Array of DataStream" and perform CEP inside a for loop (for every stream)
Reply | Threaded
Open this post in threaded view
|

Re: Categorize or GroupBy datastream data and process with CEP separately

madhairsilence
In reply to this post by Jamie Grier

This is my code.

    SplitStream<MonitoringEvent> splitStream =  inputStream.split(new OutputSelector<MonitoringEvent>() {

    @Override
    public Iterable<String> select(MonitoringEvent me) {

        List<String> ml = new ArrayList<String>();              
        ml.add(me.getEventType());                              
        return ml;
}

I have stream of Monitoring Events coming in random order temp : 80, pressure : 70 , humidity :80, temp:30...

With the above code, am splitting the stream , eventType wise i.e temperatureStream, pressureStream.

The problem is , if I know the eventType, i can select it from the splitStream like

splitStream.select('temperatureStream')
but the eventType is dynamic and not pre-defined.

How will I apply CEP for this dynamic stream. The CEP would be like, if the

temperate is > 90 for past 10 minutes ...

pressure is > 90 for past 10 minutes ...
Reply | Threaded
Open this post in threaded view
|

Re: Categorize or GroupBy datastream data and process with CEP separately

Jamie Grier
If you are trying to do the same CEP computation for each event type that's
exactly what will happen for a KeyedStream.  For example if you key by
event type you can think of this like creating a separate substream for
each key/eventType and then applying the CEP operations to each substream
individually.  Is this not what you're trying to do?



On Wed, Jan 4, 2017 at 3:32 AM, madhairsilence <[hidden email]>
wrote:

>
> This is my code.
>
> *    SplitStream<MonitoringEvent> splitStream =  inputStream.split(new
> OutputSelector<MonitoringEvent>() {
>
>     @Override
>     public Iterable<String> select(MonitoringEvent me) {
>
>         List<String> ml = new ArrayList<String>();
>         ml.add(me.getEventType());
>         return ml;
> }*
> I have stream of Monitoring Events coming in random order temp : 80,
> pressure : 70 , humidity :80, temp:30...
>
> With the above code, am splitting the stream , eventType wise i.e
> temperatureStream, pressureStream.
>
> The problem is , if I know the eventType, i can select it from the
> splitStream like
>
> *splitStream.select('temperatureStream')*
> but the eventType is dynamic and not pre-defined.
>
> How will I apply CEP for this dynamic stream. The CEP would be like, if the
>
> *temperate is > 90 for past 10 minutes ...
>
> pressure is > 90 for past 10 minutes ...*
>
>
>
> --
> View this message in context: http://apache-flink-mailing-
> list-archive.1008284.n3.nabble.com/Categorize-or-
> GroupBy-datastream-data-and-process-with-CEP-separately-tp15139p15148.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>



--

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Categorize or GroupBy datastream data and process with CEP separately

madhairsilence
Thanks for you reply and absolutely apologies for my insanity

Thats the place I confuse a lot.

The keyedstream, processe by unique key.  But as per the example is see, I can make a pattern like

temperature > 90...temperature > 80... within 'T' minutes.

What am trying to achieve ,is  "Temperate is > 90 continously for the past ten minutes"

just like the "within(X minutes)", does CEP has "for(X minutes)" syntax?

Thats the reason, I thought of using "SpitStream" and create a "Window" for a given time and the apply CEP for that stream. But I stuck with dynamic "Key" issue.
Reply | Threaded
Open this post in threaded view
|

Re: Categorize or GroupBy datastream data and process with CEP separately

Till Rohrmann
Hi,

Flink’s Pattern API allows you to specify a time interval for a pattern to
occur. Use the within call to specify a valid time interval for your
pattern.

Pattern<Event, ?> pattern = ...;
Pattern<Event, ?> timedPattern = pattern.within(Time.minutes(10));

Cheers,
Till


On Thu, Jan 5, 2017 at 7:39 AM, madhairsilence <[hidden email]>
wrote:

> Thanks for you reply and absolutely apologies for my insanity
>
> Thats the place I confuse a lot.
>
> The keyedstream, processe by unique key.  But as per the example is see, I
> can make a pattern like
>
> temperature > 90...temperature > 80... within 'T' minutes.
>
> What am trying to achieve ,is  "Temperate is > 90 continously for the past
> ten minutes"
>
> just like the "within(X minutes)", does CEP has "for(X minutes)" syntax?
>
> Thats the reason, I thought of using "SpitStream" and create a "Window" for
> a given time and the apply CEP for that stream. But I stuck with dynamic
> "Key" issue.
>
>
>
>
> --
> View this message in context: http://apache-flink-mailing-
> list-archive.1008284.n3.nabble.com/Categorize-or-
> GroupBy-datastream-data-and-process-with-CEP-separately-tp15139p15153.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>