Hi,
When I try to implement https://issues.apache.org/jira/browse/FLINK-5498 via "dataset.coGroup(another dataset)" with a generated CoGroupFunction.(CoGroupFunction interface: public void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) I couldn't get the right results, then I saw the backend Iterator did not supply a new instance when invoked the "Iterable.iterator()" after debugging. (see org.apache.flink.api.common.operators.util.ListKeyGroupedIterator, it differs from usual iterable collections in java which will implement the iterator() method that supply a new iterator instance for the collection. And this is not mentioned either in comments or document.) IMO, iterable collections' new iterator instance requirements probably useful for other cases, so is it necessary to add this feature? Greatful if someone can tell me the motivation that ListKeyGroupedIterator didn't supply a new iterator instance. What do you think? Best, Lincoln |
Hi,
this is probably an oversight. If it helps you implement the feature, please go ahead and add a sub-issue for solving the Iterator problem. Best, Aljoscha On Tue, 21 Feb 2017 at 16:13 Lin Li <[hidden email]> wrote: > Hi, > > When I try to implement > https://issues.apache.org/jira/browse/FLINK-5498 > via "dataset.coGroup(another dataset)" with a generated > CoGroupFunction.(CoGroupFunction > interface: public void coGroup(Iterable<IN1> first, Iterable<IN2> second, > Collector<O> out) > > I couldn't get the right results, then I saw the backend Iterator did > not supply a new instance when invoked the "Iterable.iterator()" after > debugging. > (see org.apache.flink.api.common.operators.util.ListKeyGroupedIterator, > it differs from usual iterable collections in java which will implement > the iterator() method that supply a new iterator instance for the > collection. And this is not mentioned either in comments or document.) > > IMO, iterable collections' new iterator instance requirements probably > useful for other cases, so is it necessary to add this feature? > Greatful if someone can tell me the motivation that ListKeyGroupedIterator > didn't supply a new iterator instance. > > What do you think? > > Best, Lincoln > |
Hi Lin Li,
I think the oversight is more that we don’t throw a TraversableOnceException if you request more than one iterator as it is the case for the Iterables used for the non collection mode. Otherwise you will have a different behaviour for the collection and the non collection mode. In general, you’re right Lin Li that we don’t honour the Iterable contract which should allow you to create an arbitrary number of iterators over the data. Honestly, I’m not sure why we did this change because it’s not very intuitive. Maybe Ufuk can chime in because he opened FLINK-1023. To give you some background why we don’t allow the Iterable to return multiple iterators over the data is that we would have to keep all the data around in case the user creates a new iterator. Given that the data might grow quite big, this can be a burden. With the iterator contract you know that you can free the resources once the current element has been processed. Cheers, Till On Wed, Feb 22, 2017 at 11:10 AM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > this is probably an oversight. If it helps you implement the feature, > please go ahead and add a sub-issue for solving the Iterator problem. > > Best, > Aljoscha > > On Tue, 21 Feb 2017 at 16:13 Lin Li <[hidden email]> wrote: > > > Hi, > > > > When I try to implement > > https://issues.apache.org/jira/browse/FLINK-5498 > > via "dataset.coGroup(another dataset)" with a generated > > CoGroupFunction.(CoGroupFunction > > interface: public void coGroup(Iterable<IN1> first, Iterable<IN2> second, > > Collector<O> out) > > > > I couldn't get the right results, then I saw the backend Iterator > did > > not supply a new instance when invoked the "Iterable.iterator()" after > > debugging. > > (see org.apache.flink.api.common.operators.util.ListKeyGroupedIterator, > > it differs from usual iterable collections in java which will implement > > the iterator() method that supply a new iterator instance for the > > collection. And this is not mentioned either in comments or document.) > > > > IMO, iterable collections' new iterator instance requirements probably > > useful for other cases, so is it necessary to add this feature? > > Greatful if someone can tell me the motivation that > ListKeyGroupedIterator > > didn't supply a new iterator instance. > > > > What do you think? > > > > Best, Lincoln > > > |
On Wed, Feb 22, 2017 at 11:19 AM, Till Rohrmann <[hidden email]> wrote:
> In general, you’re right Lin Li that we don’t honour the Iterable contract > which should allow you to create an arbitrary number of iterators over the > data. Honestly, I’m not sure why we did this change because it’s not very > intuitive. Maybe Ufuk can chime in because he opened FLINK-1023. The discussion in the issue is pretty detailed. The best summary is probably found in the PR description https://github.com/apache/flink/pull/84: "This patch allows the GroupReduce and the CoGroup to use the beautiful foreach loop syntax." Orignally, the PR ensured that a TraversableOnceException was thrown. |
Thank you for the answer!
The discussion on FLINK-1023 is very clear to me. I agree with that throws a TraversableOnceException when the iterator is requested the second time. @Aljoscha git history shows you removed the exception-thrown code from FLINK-1110, would you mind me create an issue and add it back? BTW, I had submitted a pr for FLINK-5498 ( https://github.com/apache/flink/pull/3379), support left/right outer joins with non-equi-join conditions via coGroup operator with a generated OuterJoinCoGroupFunction. But current implementation is not memory safe when do a many-to-one/many outer join which will copy the opposite side input into an List buffer(This is not pretty though it follows the guideline, just remember the input data within a function call). It's a work-around for now, in the long run, I think we should extend the runtime join operators to support such non-equi-join conditions. Implementation in TableAPI layer could not always ensures the efficiency. Welcome any suggestions on current solution. Best, Lincoln |
I think this was mostly an oversight on my part that was possible because
we didn't have good test-coverage that was enforcing correctness. Please go ahead and open an issue for re-adding the throw. On Wed, 22 Feb 2017 at 13:28 Lin Li <[hidden email]> wrote: > Thank you for the answer! > > The discussion on FLINK-1023 is very clear to me. I agree with that throws > a TraversableOnceException when the iterator is requested the second time. > > @Aljoscha git history shows you removed the exception-thrown code from > FLINK-1110, would you mind me create an issue and add it back? > > BTW, I had submitted a pr for FLINK-5498 ( > https://github.com/apache/flink/pull/3379), support left/right outer joins > with non-equi-join conditions via coGroup operator with a generated > OuterJoinCoGroupFunction. > But current implementation is not memory safe when do a many-to-one/many > outer join which will copy the opposite side input into an List buffer(This > is not pretty though it follows the guideline, just remember the input data > within a function call). It's a work-around for now, in the long run, I > think we should extend the runtime join operators to support such > non-equi-join conditions. Implementation in TableAPI layer could not > always ensures the efficiency. > Welcome any suggestions on current solution. > > Best, Lincoln > |
I created a jira https://issues.apache.org/jira/browse/FLINK-5883, and will
work on this asap. 2017-02-22 21:01 GMT+08:00 Aljoscha Krettek <[hidden email]>: > I think this was mostly an oversight on my part that was possible because > we didn't have good test-coverage that was enforcing correctness. Please go > ahead and open an issue for re-adding the throw. > > On Wed, 22 Feb 2017 at 13:28 Lin Li <[hidden email]> wrote: > > > Thank you for the answer! > > > > The discussion on FLINK-1023 is very clear to me. I agree with that > throws > > a TraversableOnceException when the iterator is requested the second > time. > > > > @Aljoscha git history shows you removed the exception-thrown code from > > FLINK-1110, would you mind me create an issue and add it back? > > > > BTW, I had submitted a pr for FLINK-5498 ( > > https://github.com/apache/flink/pull/3379), support left/right outer > joins > > with non-equi-join conditions via coGroup operator with a generated > > OuterJoinCoGroupFunction. > > But current implementation is not memory safe when do a many-to-one/many > > outer join which will copy the opposite side input into an List > buffer(This > > is not pretty though it follows the guideline, just remember the input > data > > within a function call). It's a work-around for now, in the long run, I > > think we should extend the runtime join operators to support such > > non-equi-join conditions. Implementation in TableAPI layer could not > > always ensures the efficiency. > > Welcome any suggestions on current solution. > > > > Best, Lincoln > > > |
Free forum by Nabble | Edit this page |