Hi folks,
Is there any way in dataset api to split Dataset[A] to Dataset[A] and Dataset[B] ? Use case belongs to a custom filter component that we want to implement. We will want to direct input elements whose result is false after we apply the predicate. Actually we want to direct input elements that throw exception to another output as well(demultiplexer like component). Thank you in advance... |
Hello,
You can split a DataSet into two DataSets with two filters: val xs: DataSet[A] = ... val split1: DataSet[A] = xs.filter(f1) val split2: DataSet[A] = xs.filter(f2) where f1 and f2 are true for those elements that should go into the first and second DataSets respectively. So far, the splits will just contain elements from the input DataSet, but you can of course apply some map after one of the filters. Does this help? Best, Gábor 2016-05-12 16:03 GMT+02:00 CPC <[hidden email]>: > Hi folks, > > Is there any way in dataset api to split Dataset[A] to Dataset[A] and > Dataset[B] ? Use case belongs to a custom filter component that we want to > implement. We will want to direct input elements whose result is false > after we apply the predicate. Actually we want to direct input elements > that throw exception to another output as well(demultiplexer like > component). > > Thank you in advance... |
Hi Gabor,
Yes functionally this helps. But in this case i am processing an element twice and sending whole data to two different operator . What i am trying to achieve is like datastream split like functionality or a little bit more: In filter like scenario i want to do below pseudo operation: def function(iter: Iterator[URLOutputData], trueEvents: >> Collector[URLOutputData], falseEvents: Collector[URLOutputData], errEvents: >> Collector[URLOutputData]) { > > iter.foreach { > > i => > > try { > > if (predicate(i)) > > trueEvents.collect(i) > > else > > falseEvents.collect(i) > > } catch { > > case _ => errEvents.collect(i) > > } > > } > > } > > different web apps and i want to split dataset based on application category Thanks, On 12 May 2016 at 17:28, Gábor Gévay <[hidden email]> wrote: > Hello, > > You can split a DataSet into two DataSets with two filters: > > val xs: DataSet[A] = ... > val split1: DataSet[A] = xs.filter(f1) > val split2: DataSet[A] = xs.filter(f2) > > where f1 and f2 are true for those elements that should go into the > first and second DataSets respectively. So far, the splits will just > contain elements from the input DataSet, but you can of course apply > some map after one of the filters. > > Does this help? > > Best, > Gábor > > > > 2016-05-12 16:03 GMT+02:00 CPC <[hidden email]>: > > Hi folks, > > > > Is there any way in dataset api to split Dataset[A] to Dataset[A] and > > Dataset[B] ? Use case belongs to a custom filter component that we want > to > > implement. We will want to direct input elements whose result is false > > after we apply the predicate. Actually we want to direct input elements > > that throw exception to another output as well(demultiplexer like > > component). > > > > Thank you in advance... > |
Hi,
I agree that this would be very nice. Unfortunately Flink does only allow one output from an operation right now. Maybe we can extends this somehow in the future. Cheers, Aljoscha On Thu, 12 May 2016 at 17:27 CPC <[hidden email]> wrote: > Hi Gabor, > > Yes functionally this helps. But in this case i am processing an element > twice and sending whole data to two different operator . What i am trying > to achieve is like datastream split like functionality or a little bit > more: > In filter like scenario i want to do below pseudo operation: > > def function(iter: Iterator[URLOutputData], trueEvents: > >> Collector[URLOutputData], falseEvents: Collector[URLOutputData], > errEvents: > >> Collector[URLOutputData]) { > > > > iter.foreach { > > > > i => > > > > try { > > > > if (predicate(i)) > > > > trueEvents.collect(i) > > > > else > > > > falseEvents.collect(i) > > > > } catch { > > > > case _ => errEvents.collect(i) > > > > } > > > > } > > > > } > > > > > Another case could be,suppose i have an input set of web events comes from > different web apps and i want to split dataset based on application > category > > Thanks, > > > On 12 May 2016 at 17:28, Gábor Gévay <[hidden email]> wrote: > > > Hello, > > > > You can split a DataSet into two DataSets with two filters: > > > > val xs: DataSet[A] = ... > > val split1: DataSet[A] = xs.filter(f1) > > val split2: DataSet[A] = xs.filter(f2) > > > > where f1 and f2 are true for those elements that should go into the > > first and second DataSets respectively. So far, the splits will just > > contain elements from the input DataSet, but you can of course apply > > some map after one of the filters. > > > > Does this help? > > > > Best, > > Gábor > > > > > > > > 2016-05-12 16:03 GMT+02:00 CPC <[hidden email]>: > > > Hi folks, > > > > > > Is there any way in dataset api to split Dataset[A] to Dataset[A] and > > > Dataset[B] ? Use case belongs to a custom filter component that we want > > to > > > implement. We will want to direct input elements whose result is false > > > after we apply the predicate. Actually we want to direct input elements > > > that throw exception to another output as well(demultiplexer like > > > component). > > > > > > Thank you in advance... > > > |
Hi,
if it just require implementing a custom operator(i mean does not require changes to network stack or other engine level changes) i can try to implement it since i am working on optimizer and plan generation for a month. Also we are going to implement our etl framework on flink and this kind of scenario is a good fit and a common requirement in etl like flows. If you can help me which parts of the project I should look for , i can try it. Thanks On May 12, 2016 6:54 PM, "Aljoscha Krettek" <[hidden email]> wrote: > Hi, > I agree that this would be very nice. Unfortunately Flink does only allow > one output from an operation right now. Maybe we can extends this somehow > in the future. > > Cheers, > Aljoscha > > On Thu, 12 May 2016 at 17:27 CPC <[hidden email]> wrote: > > > Hi Gabor, > > > > Yes functionally this helps. But in this case i am processing an element > > twice and sending whole data to two different operator . What i am > trying > > to achieve is like datastream split like functionality or a little bit > > more: > > In filter like scenario i want to do below pseudo operation: > > > > def function(iter: Iterator[URLOutputData], trueEvents: > > >> Collector[URLOutputData], falseEvents: Collector[URLOutputData], > > errEvents: > > >> Collector[URLOutputData]) { > > > > > > iter.foreach { > > > > > > i => > > > > > > try { > > > > > > if (predicate(i)) > > > > > > trueEvents.collect(i) > > > > > > else > > > > > > falseEvents.collect(i) > > > > > > } catch { > > > > > > case _ => errEvents.collect(i) > > > > > > } > > > > > > } > > > > > > } > > > > > > > > Another case could be,suppose i have an input set of web events comes > from > > different web apps and i want to split dataset based on application > > category > > > > Thanks, > > > > > > On 12 May 2016 at 17:28, Gábor Gévay <[hidden email]> wrote: > > > > > Hello, > > > > > > You can split a DataSet into two DataSets with two filters: > > > > > > val xs: DataSet[A] = ... > > > val split1: DataSet[A] = xs.filter(f1) > > > val split2: DataSet[A] = xs.filter(f2) > > > > > > where f1 and f2 are true for those elements that should go into the > > > first and second DataSets respectively. So far, the splits will just > > > contain elements from the input DataSet, but you can of course apply > > > some map after one of the filters. > > > > > > Does this help? > > > > > > Best, > > > Gábor > > > > > > > > > > > > 2016-05-12 16:03 GMT+02:00 CPC <[hidden email]>: > > > > Hi folks, > > > > > > > > Is there any way in dataset api to split Dataset[A] to Dataset[A] and > > > > Dataset[B] ? Use case belongs to a custom filter component that we > want > > > to > > > > implement. We will want to direct input elements whose result is > false > > > > after we apply the predicate. Actually we want to direct input > elements > > > > that throw exception to another output as well(demultiplexer like > > > > component). > > > > > > > > Thank you in advance... > > > > > > |
Hi,
it is true that Gabor's approach of using two filters has a certain overhead. However, the overhead should be reasonable. The data stays on the same node and the filter can be very lightweight. I agree that this is not a very nice solution. However, modifying the DataSet API such that an operator can have more than one output would be a very large change. It would require rewriting large portions of the optimizer and job generation. The assumption of a single output is made in many places which are not always easy to spot. To be honest, I don't think this is possible with reasonable effort. Even if it was possible, the change would be so large that somebody would need to spend a lot of time reviewing the changes. I am sorry, this limitation cannot be easily resolved. Fabian 2016-05-12 19:39 GMT+02:00 CPC <[hidden email]>: > Hi, > > if it just require implementing a custom operator(i mean does not require > changes to network stack or other engine level changes) i can try to > implement it since i am working on optimizer and plan generation for a > month. Also we are going to implement our etl framework on flink and this > kind of scenario is a good fit and a common requirement in etl like flows. > If you can help me which parts of the project I should look for , i can try > it. > > Thanks > On May 12, 2016 6:54 PM, "Aljoscha Krettek" <[hidden email]> wrote: > > > Hi, > > I agree that this would be very nice. Unfortunately Flink does only allow > > one output from an operation right now. Maybe we can extends this somehow > > in the future. > > > > Cheers, > > Aljoscha > > > > On Thu, 12 May 2016 at 17:27 CPC <[hidden email]> wrote: > > > > > Hi Gabor, > > > > > > Yes functionally this helps. But in this case i am processing an > element > > > twice and sending whole data to two different operator . What i am > > trying > > > to achieve is like datastream split like functionality or a little bit > > > more: > > > In filter like scenario i want to do below pseudo operation: > > > > > > def function(iter: Iterator[URLOutputData], trueEvents: > > > >> Collector[URLOutputData], falseEvents: Collector[URLOutputData], > > > errEvents: > > > >> Collector[URLOutputData]) { > > > > > > > > iter.foreach { > > > > > > > > i => > > > > > > > > try { > > > > > > > > if (predicate(i)) > > > > > > > > trueEvents.collect(i) > > > > > > > > else > > > > > > > > falseEvents.collect(i) > > > > > > > > } catch { > > > > > > > > case _ => errEvents.collect(i) > > > > > > > > } > > > > > > > > } > > > > > > > > } > > > > > > > > > > > Another case could be,suppose i have an input set of web events comes > > from > > > different web apps and i want to split dataset based on application > > > category > > > > > > Thanks, > > > > > > > > > On 12 May 2016 at 17:28, Gábor Gévay <[hidden email]> wrote: > > > > > > > Hello, > > > > > > > > You can split a DataSet into two DataSets with two filters: > > > > > > > > val xs: DataSet[A] = ... > > > > val split1: DataSet[A] = xs.filter(f1) > > > > val split2: DataSet[A] = xs.filter(f2) > > > > > > > > where f1 and f2 are true for those elements that should go into the > > > > first and second DataSets respectively. So far, the splits will just > > > > contain elements from the input DataSet, but you can of course apply > > > > some map after one of the filters. > > > > > > > > Does this help? > > > > > > > > Best, > > > > Gábor > > > > > > > > > > > > > > > > 2016-05-12 16:03 GMT+02:00 CPC <[hidden email]>: > > > > > Hi folks, > > > > > > > > > > Is there any way in dataset api to split Dataset[A] to Dataset[A] > and > > > > > Dataset[B] ? Use case belongs to a custom filter component that we > > want > > > > to > > > > > implement. We will want to direct input elements whose result is > > false > > > > > after we apply the predicate. Actually we want to direct input > > elements > > > > > that throw exception to another output as well(demultiplexer like > > > > > component). > > > > > > > > > > Thank you in advance... > > > > > > > > > > |
I would like to add that if your predicate does some heavy-weight
computation that you want to avoid duplicating for the filters, then you can insert a map before the filters, where you evaluate the predicate and put the result into a field. Best, Gabor 2016-05-13 11:51 GMT+02:00 Fabian Hueske <[hidden email]>: > Hi, > > it is true that Gabor's approach of using two filters has a certain > overhead. > However, the overhead should be reasonable. The data stays on the same node > and the filter can be very lightweight. > > I agree that this is not a very nice solution. > However, modifying the DataSet API such that an operator can have more than > one output would be a very large change. It would require rewriting large > portions of the optimizer and job generation. The assumption of a single > output is made in many places which are not always easy to spot. To be > honest, I don't think this is possible with reasonable effort. Even if it > was possible, the change would be so large that somebody would need to > spend a lot of time reviewing the changes. > > I am sorry, this limitation cannot be easily resolved. > > Fabian > > > 2016-05-12 19:39 GMT+02:00 CPC <[hidden email]>: > >> Hi, >> >> if it just require implementing a custom operator(i mean does not require >> changes to network stack or other engine level changes) i can try to >> implement it since i am working on optimizer and plan generation for a >> month. Also we are going to implement our etl framework on flink and this >> kind of scenario is a good fit and a common requirement in etl like flows. >> If you can help me which parts of the project I should look for , i can try >> it. >> >> Thanks >> On May 12, 2016 6:54 PM, "Aljoscha Krettek" <[hidden email]> wrote: >> >> > Hi, >> > I agree that this would be very nice. Unfortunately Flink does only allow >> > one output from an operation right now. Maybe we can extends this somehow >> > in the future. >> > >> > Cheers, >> > Aljoscha >> > >> > On Thu, 12 May 2016 at 17:27 CPC <[hidden email]> wrote: >> > >> > > Hi Gabor, >> > > >> > > Yes functionally this helps. But in this case i am processing an >> element >> > > twice and sending whole data to two different operator . What i am >> > trying >> > > to achieve is like datastream split like functionality or a little bit >> > > more: >> > > In filter like scenario i want to do below pseudo operation: >> > > >> > > def function(iter: Iterator[URLOutputData], trueEvents: >> > > >> Collector[URLOutputData], falseEvents: Collector[URLOutputData], >> > > errEvents: >> > > >> Collector[URLOutputData]) { >> > > > >> > > > iter.foreach { >> > > > >> > > > i => >> > > > >> > > > try { >> > > > >> > > > if (predicate(i)) >> > > > >> > > > trueEvents.collect(i) >> > > > >> > > > else >> > > > >> > > > falseEvents.collect(i) >> > > > >> > > > } catch { >> > > > >> > > > case _ => errEvents.collect(i) >> > > > >> > > > } >> > > > >> > > > } >> > > > >> > > > } >> > > > >> > > > >> > > Another case could be,suppose i have an input set of web events comes >> > from >> > > different web apps and i want to split dataset based on application >> > > category >> > > >> > > Thanks, >> > > >> > > >> > > On 12 May 2016 at 17:28, Gábor Gévay <[hidden email]> wrote: >> > > >> > > > Hello, >> > > > >> > > > You can split a DataSet into two DataSets with two filters: >> > > > >> > > > val xs: DataSet[A] = ... >> > > > val split1: DataSet[A] = xs.filter(f1) >> > > > val split2: DataSet[A] = xs.filter(f2) >> > > > >> > > > where f1 and f2 are true for those elements that should go into the >> > > > first and second DataSets respectively. So far, the splits will just >> > > > contain elements from the input DataSet, but you can of course apply >> > > > some map after one of the filters. >> > > > >> > > > Does this help? >> > > > >> > > > Best, >> > > > Gábor >> > > > >> > > > >> > > > >> > > > 2016-05-12 16:03 GMT+02:00 CPC <[hidden email]>: >> > > > > Hi folks, >> > > > > >> > > > > Is there any way in dataset api to split Dataset[A] to Dataset[A] >> and >> > > > > Dataset[B] ? Use case belongs to a custom filter component that we >> > want >> > > > to >> > > > > implement. We will want to direct input elements whose result is >> > false >> > > > > after we apply the predicate. Actually we want to direct input >> > elements >> > > > > that throw exception to another output as well(demultiplexer like >> > > > > component). >> > > > > >> > > > > Thank you in advance... >> > > > >> > > >> > >> |
Free forum by Nabble | Edit this page |