Iteration feedback partitioning does not work properly

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Iteration feedback partitioning does not work properly

Gyula Fóra
Hey,

This question is mainly targeted towards Aljoscha but maybe someone can
help me out here:

I think the way feedback partitioning is handled does not work, let me
illustrate with a simple example:

IterativeStream it = ... (parallelism 1)
DataStream mapped = it.map(...) (parallelism 2)
// this does not work as the feedback has parallelism 2 != 1
// it.closeWith(mapped.partitionByHash(someField))
// so we need rebalance the data
it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))

This program will execute but the feedback will not be partitioned by hash
to the mapper instances:
The partitioning will be set from the noOpMap to the iteration sink which
has parallelism different from the mapper (1 vs 2) and then the iteration
source forwards the element to the mapper (always to 0).

So the problem is basically that the iteration source/sink pair gets the
parallelism of the input stream (p=1) not the head operator (p = 2) which
leads to incorrect partitioning.

Did I miss something here?

Cheers,
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Iteration feedback partitioning does not work properly

Aljoscha Krettek-2
Hi,
I think what you would like to to can be achieved by:

IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate()
DataStream mapped = it.map(...)
 it.closeWith(mapped.partitionByHash(someField))

The input is rebalanced to the map inside the iteration as in your example
and the feedback should be partitioned by hash.

Cheers,
Aljoscha


On Tue, 6 Oct 2015 at 00:11 Gyula Fóra <[hidden email]> wrote:

> Hey,
>
> This question is mainly targeted towards Aljoscha but maybe someone can
> help me out here:
>
> I think the way feedback partitioning is handled does not work, let me
> illustrate with a simple example:
>
> IterativeStream it = ... (parallelism 1)
> DataStream mapped = it.map(...) (parallelism 2)
> // this does not work as the feedback has parallelism 2 != 1
> // it.closeWith(mapped.partitionByHash(someField))
> // so we need rebalance the data
>
> it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
>
> This program will execute but the feedback will not be partitioned by hash
> to the mapper instances:
> The partitioning will be set from the noOpMap to the iteration sink which
> has parallelism different from the mapper (1 vs 2) and then the iteration
> source forwards the element to the mapper (always to 0).
>
> So the problem is basically that the iteration source/sink pair gets the
> parallelism of the input stream (p=1) not the head operator (p = 2) which
> leads to incorrect partitioning.
>
> Did I miss something here?
>
> Cheers,
> Gyula
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Iteration feedback partitioning does not work properly

Gyula Fóra
Hi,

This is just a workaround, which actually breaks input order from my
source. I think the iteration construction should be reworked to set the
parallelism of the source/sink to the parallelism of the head operator (and
validate that all heads have the same parallelism).

I thought this was the solution that you described with Stephan in some
older discussion before the rewrite.

Cheers,
Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. okt. 6., K,
9:15):

> Hi,
> I think what you would like to to can be achieved by:
>
> IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate()
> DataStream mapped = it.map(...)
>  it.closeWith(mapped.partitionByHash(someField))
>
> The input is rebalanced to the map inside the iteration as in your example
> and the feedback should be partitioned by hash.
>
> Cheers,
> Aljoscha
>
>
> On Tue, 6 Oct 2015 at 00:11 Gyula Fóra <[hidden email]> wrote:
>
> > Hey,
> >
> > This question is mainly targeted towards Aljoscha but maybe someone can
> > help me out here:
> >
> > I think the way feedback partitioning is handled does not work, let me
> > illustrate with a simple example:
> >
> > IterativeStream it = ... (parallelism 1)
> > DataStream mapped = it.map(...) (parallelism 2)
> > // this does not work as the feedback has parallelism 2 != 1
> > // it.closeWith(mapped.partitionByHash(someField))
> > // so we need rebalance the data
> >
> >
> it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
> >
> > This program will execute but the feedback will not be partitioned by
> hash
> > to the mapper instances:
> > The partitioning will be set from the noOpMap to the iteration sink which
> > has parallelism different from the mapper (1 vs 2) and then the iteration
> > source forwards the element to the mapper (always to 0).
> >
> > So the problem is basically that the iteration source/sink pair gets the
> > parallelism of the input stream (p=1) not the head operator (p = 2) which
> > leads to incorrect partitioning.
> >
> > Did I miss something here?
> >
> > Cheers,
> > Gyula
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Iteration feedback partitioning does not work properly

Aljoscha Krettek-2
Ok, I see your point. But I think there will be problems no matter what
parallelism is chosen for the iteration source/sink. If the parallelism of
the head is chosen then there will be an implicit rebalance from the
operation right before the iteration to the iteration head. I think this
should break ordering as well, in your case.

On Tue, 6 Oct 2015 at 10:39 Gyula Fóra <[hidden email]> wrote:

> Hi,
>
> This is just a workaround, which actually breaks input order from my
> source. I think the iteration construction should be reworked to set the
> parallelism of the source/sink to the parallelism of the head operator (and
> validate that all heads have the same parallelism).
>
> I thought this was the solution that you described with Stephan in some
> older discussion before the rewrite.
>
> Cheers,
> Gyula
>
> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. okt. 6.,
> K,
> 9:15):
>
> > Hi,
> > I think what you would like to to can be achieved by:
> >
> > IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate()
> > DataStream mapped = it.map(...)
> >  it.closeWith(mapped.partitionByHash(someField))
> >
> > The input is rebalanced to the map inside the iteration as in your
> example
> > and the feedback should be partitioned by hash.
> >
> > Cheers,
> > Aljoscha
> >
> >
> > On Tue, 6 Oct 2015 at 00:11 Gyula Fóra <[hidden email]> wrote:
> >
> > > Hey,
> > >
> > > This question is mainly targeted towards Aljoscha but maybe someone can
> > > help me out here:
> > >
> > > I think the way feedback partitioning is handled does not work, let me
> > > illustrate with a simple example:
> > >
> > > IterativeStream it = ... (parallelism 1)
> > > DataStream mapped = it.map(...) (parallelism 2)
> > > // this does not work as the feedback has parallelism 2 != 1
> > > // it.closeWith(mapped.partitionByHash(someField))
> > > // so we need rebalance the data
> > >
> > >
> >
> it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
> > >
> > > This program will execute but the feedback will not be partitioned by
> > hash
> > > to the mapper instances:
> > > The partitioning will be set from the noOpMap to the iteration sink
> which
> > > has parallelism different from the mapper (1 vs 2) and then the
> iteration
> > > source forwards the element to the mapper (always to 0).
> > >
> > > So the problem is basically that the iteration source/sink pair gets
> the
> > > parallelism of the input stream (p=1) not the head operator (p = 2)
> which
> > > leads to incorrect partitioning.
> > >
> > > Did I miss something here?
> > >
> > > Cheers,
> > > Gyula
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Iteration feedback partitioning does not work properly

Gyula Fóra
The feedback tuples might get rebalanced but the normal input should not.

But still the main problem is the fact that partitioning is not handled
transparently, and actually does not work when you set the way you expect.

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. okt. 8.,
Cs, 16:33):

> Ok, I see your point. But I think there will be problems no matter what
> parallelism is chosen for the iteration source/sink. If the parallelism of
> the head is chosen then there will be an implicit rebalance from the
> operation right before the iteration to the iteration head. I think this
> should break ordering as well, in your case.
>
> On Tue, 6 Oct 2015 at 10:39 Gyula Fóra <[hidden email]> wrote:
>
> > Hi,
> >
> > This is just a workaround, which actually breaks input order from my
> > source. I think the iteration construction should be reworked to set the
> > parallelism of the source/sink to the parallelism of the head operator
> (and
> > validate that all heads have the same parallelism).
> >
> > I thought this was the solution that you described with Stephan in some
> > older discussion before the rewrite.
> >
> > Cheers,
> > Gyula
> >
> > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. okt. 6.,
> > K,
> > 9:15):
> >
> > > Hi,
> > > I think what you would like to to can be achieved by:
> > >
> > > IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate()
> > > DataStream mapped = it.map(...)
> > >  it.closeWith(mapped.partitionByHash(someField))
> > >
> > > The input is rebalanced to the map inside the iteration as in your
> > example
> > > and the feedback should be partitioned by hash.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > >
> > > On Tue, 6 Oct 2015 at 00:11 Gyula Fóra <[hidden email]> wrote:
> > >
> > > > Hey,
> > > >
> > > > This question is mainly targeted towards Aljoscha but maybe someone
> can
> > > > help me out here:
> > > >
> > > > I think the way feedback partitioning is handled does not work, let
> me
> > > > illustrate with a simple example:
> > > >
> > > > IterativeStream it = ... (parallelism 1)
> > > > DataStream mapped = it.map(...) (parallelism 2)
> > > > // this does not work as the feedback has parallelism 2 != 1
> > > > // it.closeWith(mapped.partitionByHash(someField))
> > > > // so we need rebalance the data
> > > >
> > > >
> > >
> >
> it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
> > > >
> > > > This program will execute but the feedback will not be partitioned by
> > > hash
> > > > to the mapper instances:
> > > > The partitioning will be set from the noOpMap to the iteration sink
> > which
> > > > has parallelism different from the mapper (1 vs 2) and then the
> > iteration
> > > > source forwards the element to the mapper (always to 0).
> > > >
> > > > So the problem is basically that the iteration source/sink pair gets
> > the
> > > > parallelism of the input stream (p=1) not the head operator (p = 2)
> > which
> > > > leads to incorrect partitioning.
> > > >
> > > > Did I miss something here?
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Iteration feedback partitioning does not work properly

Stephan Ewen
For me as an outsider to the iterations, I would say that both approaches
are in some way tricky with some unexpected behavior.

Parallelism implicitly from the predecessor (input) or the successor (head
task - what happens if there are multiple with different parallelism?) can
confuse in either way.
I have the feeling that what each one perceives as more consistent or
intuitive depends a bit on their mental model of the iterations (given
their prior experience and expectations).

I agree that we should do something there. But given that we are apparently
not really close to knowing what would be best way to go (or agreeing on
it), I would like to not block 0.10 on this (workarounds are available
after all) and take this for the next release with enough time properly
figure this out and discuss it.

The iterations will anyways need some work for the next release to
integrate them with checkpointing and watermarks, so would you agree that
we tackle this then as part of an effort to advance the iteration feature
as a whole?

Greetings,
Stephan



On Thu, Oct 8, 2015 at 4:42 PM, Gyula Fóra <[hidden email]> wrote:

> The feedback tuples might get rebalanced but the normal input should not.
>
> But still the main problem is the fact that partitioning is not handled
> transparently, and actually does not work when you set the way you expect.
>
> Gyula
>
> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. okt. 8.,
> Cs, 16:33):
>
> > Ok, I see your point. But I think there will be problems no matter what
> > parallelism is chosen for the iteration source/sink. If the parallelism
> of
> > the head is chosen then there will be an implicit rebalance from the
> > operation right before the iteration to the iteration head. I think this
> > should break ordering as well, in your case.
> >
> > On Tue, 6 Oct 2015 at 10:39 Gyula Fóra <[hidden email]> wrote:
> >
> > > Hi,
> > >
> > > This is just a workaround, which actually breaks input order from my
> > > source. I think the iteration construction should be reworked to set
> the
> > > parallelism of the source/sink to the parallelism of the head operator
> > (and
> > > validate that all heads have the same parallelism).
> > >
> > > I thought this was the solution that you described with Stephan in some
> > > older discussion before the rewrite.
> > >
> > > Cheers,
> > > Gyula
> > >
> > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. okt.
> 6.,
> > > K,
> > > 9:15):
> > >
> > > > Hi,
> > > > I think what you would like to to can be achieved by:
> > > >
> > > > IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate()
> > > > DataStream mapped = it.map(...)
> > > >  it.closeWith(mapped.partitionByHash(someField))
> > > >
> > > > The input is rebalanced to the map inside the iteration as in your
> > > example
> > > > and the feedback should be partitioned by hash.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > >
> > > > On Tue, 6 Oct 2015 at 00:11 Gyula Fóra <[hidden email]> wrote:
> > > >
> > > > > Hey,
> > > > >
> > > > > This question is mainly targeted towards Aljoscha but maybe someone
> > can
> > > > > help me out here:
> > > > >
> > > > > I think the way feedback partitioning is handled does not work, let
> > me
> > > > > illustrate with a simple example:
> > > > >
> > > > > IterativeStream it = ... (parallelism 1)
> > > > > DataStream mapped = it.map(...) (parallelism 2)
> > > > > // this does not work as the feedback has parallelism 2 != 1
> > > > > // it.closeWith(mapped.partitionByHash(someField))
> > > > > // so we need rebalance the data
> > > > >
> > > > >
> > > >
> > >
> >
> it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
> > > > >
> > > > > This program will execute but the feedback will not be partitioned
> by
> > > > hash
> > > > > to the mapper instances:
> > > > > The partitioning will be set from the noOpMap to the iteration sink
> > > which
> > > > > has parallelism different from the mapper (1 vs 2) and then the
> > > iteration
> > > > > source forwards the element to the mapper (always to 0).
> > > > >
> > > > > So the problem is basically that the iteration source/sink pair
> gets
> > > the
> > > > > parallelism of the input stream (p=1) not the head operator (p = 2)
> > > which
> > > > > leads to incorrect partitioning.
> > > > >
> > > > > Did I miss something here?
> > > > >
> > > > > Cheers,
> > > > > Gyula
> > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Iteration feedback partitioning does not work properly

Gyula Fóra
I agree that there are many things that needs to be figured out properly
for iterations, and I am okay with postponing them for the next release if
we want to get this one out quickly.

The only problem is that this probably breaks the SAMOA connector.

Paris can you confirm this?

Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. okt. 8., Cs,
17:12):

> For me as an outsider to the iterations, I would say that both approaches
> are in some way tricky with some unexpected behavior.
>
> Parallelism implicitly from the predecessor (input) or the successor (head
> task - what happens if there are multiple with different parallelism?) can
> confuse in either way.
> I have the feeling that what each one perceives as more consistent or
> intuitive depends a bit on their mental model of the iterations (given
> their prior experience and expectations).
>
> I agree that we should do something there. But given that we are apparently
> not really close to knowing what would be best way to go (or agreeing on
> it), I would like to not block 0.10 on this (workarounds are available
> after all) and take this for the next release with enough time properly
> figure this out and discuss it.
>
> The iterations will anyways need some work for the next release to
> integrate them with checkpointing and watermarks, so would you agree that
> we tackle this then as part of an effort to advance the iteration feature
> as a whole?
>
> Greetings,
> Stephan
>
>
>
> On Thu, Oct 8, 2015 at 4:42 PM, Gyula Fóra <[hidden email]> wrote:
>
> > The feedback tuples might get rebalanced but the normal input should not.
> >
> > But still the main problem is the fact that partitioning is not handled
> > transparently, and actually does not work when you set the way you
> expect.
> >
> > Gyula
> >
> > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. okt. 8.,
> > Cs, 16:33):
> >
> > > Ok, I see your point. But I think there will be problems no matter what
> > > parallelism is chosen for the iteration source/sink. If the parallelism
> > of
> > > the head is chosen then there will be an implicit rebalance from the
> > > operation right before the iteration to the iteration head. I think
> this
> > > should break ordering as well, in your case.
> > >
> > > On Tue, 6 Oct 2015 at 10:39 Gyula Fóra <[hidden email]> wrote:
> > >
> > > > Hi,
> > > >
> > > > This is just a workaround, which actually breaks input order from my
> > > > source. I think the iteration construction should be reworked to set
> > the
> > > > parallelism of the source/sink to the parallelism of the head
> operator
> > > (and
> > > > validate that all heads have the same parallelism).
> > > >
> > > > I thought this was the solution that you described with Stephan in
> some
> > > > older discussion before the rewrite.
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. okt.
> > 6.,
> > > > K,
> > > > 9:15):
> > > >
> > > > > Hi,
> > > > > I think what you would like to to can be achieved by:
> > > > >
> > > > > IterativeStream it =
> in.map(IdentityMap).setParallelism(2).iterate()
> > > > > DataStream mapped = it.map(...)
> > > > >  it.closeWith(mapped.partitionByHash(someField))
> > > > >
> > > > > The input is rebalanced to the map inside the iteration as in your
> > > > example
> > > > > and the feedback should be partitioned by hash.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > >
> > > > > On Tue, 6 Oct 2015 at 00:11 Gyula Fóra <[hidden email]>
> wrote:
> > > > >
> > > > > > Hey,
> > > > > >
> > > > > > This question is mainly targeted towards Aljoscha but maybe
> someone
> > > can
> > > > > > help me out here:
> > > > > >
> > > > > > I think the way feedback partitioning is handled does not work,
> let
> > > me
> > > > > > illustrate with a simple example:
> > > > > >
> > > > > > IterativeStream it = ... (parallelism 1)
> > > > > > DataStream mapped = it.map(...) (parallelism 2)
> > > > > > // this does not work as the feedback has parallelism 2 != 1
> > > > > > // it.closeWith(mapped.partitionByHash(someField))
> > > > > > // so we need rebalance the data
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
> > > > > >
> > > > > > This program will execute but the feedback will not be
> partitioned
> > by
> > > > > hash
> > > > > > to the mapper instances:
> > > > > > The partitioning will be set from the noOpMap to the iteration
> sink
> > > > which
> > > > > > has parallelism different from the mapper (1 vs 2) and then the
> > > > iteration
> > > > > > source forwards the element to the mapper (always to 0).
> > > > > >
> > > > > > So the problem is basically that the iteration source/sink pair
> > gets
> > > > the
> > > > > > parallelism of the input stream (p=1) not the head operator (p =
> 2)
> > > > which
> > > > > > leads to incorrect partitioning.
> > > > > >
> > > > > > Did I miss something here?
> > > > > >
> > > > > > Cheers,
> > > > > > Gyula
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Iteration feedback partitioning does not work properly

Paris Carbone
Yes it does break it since it is based on backwards partitioning preservation which was the case before Aljischa’s refactoring. I will focus on a 0.10 patch for the samoa connector right after the 0.10 release to see how we can do this.

To be honest the whole thing confuses me a bit. From my understanding in the discussion so far in order to fix this I will need to repartition the preceding operator using an identity mapper before applying its subscription in the flink topology. On the other hand this seems to mess up the ordering according to Gyula so it will not be a perfect solution unless we finally fix iterations properly, correct? I can see the identical problem on Matthia’s Storm port.


On 08 Oct 2015, at 17:19, Gyula Fóra <[hidden email]<mailto:[hidden email]>> wrote:

I agree that there are many things that needs to be figured out properly for iterations, and I am okay with postponing them for the next release if we want to get this one out quickly.

The only problem is that this probably breaks the SAMOA connector.

Paris can you confirm this?

Stephan Ewen <[hidden email]<mailto:[hidden email]>> ezt írta (időpont: 2015. okt. 8., Cs, 17:12):
For me as an outsider to the iterations, I would say that both approaches
are in some way tricky with some unexpected behavior.

Parallelism implicitly from the predecessor (input) or the successor (head
task - what happens if there are multiple with different parallelism?) can
confuse in either way.
I have the feeling that what each one perceives as more consistent or
intuitive depends a bit on their mental model of the iterations (given
their prior experience and expectations).

I agree that we should do something there. But given that we are apparently
not really close to knowing what would be best way to go (or agreeing on
it), I would like to not block 0.10 on this (workarounds are available
after all) and take this for the next release with enough time properly
figure this out and discuss it.

The iterations will anyways need some work for the next release to
integrate them with checkpointing and watermarks, so would you agree that
we tackle this then as part of an effort to advance the iteration feature
as a whole?

Greetings,
Stephan



On Thu, Oct 8, 2015 at 4:42 PM, Gyula Fóra <[hidden email]<mailto:[hidden email]>> wrote:

> The feedback tuples might get rebalanced but the normal input should not.
>
> But still the main problem is the fact that partitioning is not handled
> transparently, and actually does not work when you set the way you expect.
>
> Gyula
>
> Aljoscha Krettek <[hidden email]<mailto:[hidden email]>> ezt írta (időpont: 2015. okt. 8.,
> Cs, 16:33):
>
> > Ok, I see your point. But I think there will be problems no matter what
> > parallelism is chosen for the iteration source/sink. If the parallelism
> of
> > the head is chosen then there will be an implicit rebalance from the
> > operation right before the iteration to the iteration head. I think this
> > should break ordering as well, in your case.
> >
> > On Tue, 6 Oct 2015 at 10:39 Gyula Fóra <[hidden email]<mailto:[hidden email]>> wrote:
> >
> > > Hi,
> > >
> > > This is just a workaround, which actually breaks input order from my
> > > source. I think the iteration construction should be reworked to set
> the
> > > parallelism of the source/sink to the parallelism of the head operator
> > (and
> > > validate that all heads have the same parallelism).
> > >
> > > I thought this was the solution that you described with Stephan in some
> > > older discussion before the rewrite.
> > >
> > > Cheers,
> > > Gyula
> > >
> > > Aljoscha Krettek <[hidden email]<mailto:[hidden email]>> ezt írta (időpont: 2015. okt.
> 6.,
> > > K,
> > > 9:15):
> > >
> > > > Hi,
> > > > I think what you would like to to can be achieved by:
> > > >
> > > > IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate()
> > > > DataStream mapped = it.map(...)
> > > >  it.closeWith(mapped.partitionByHash(someField))
> > > >
> > > > The input is rebalanced to the map inside the iteration as in your
> > > example
> > > > and the feedback should be partitioned by hash.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > >
> > > > On Tue, 6 Oct 2015 at 00:11 Gyula Fóra <[hidden email]<mailto:[hidden email]>> wrote:
> > > >
> > > > > Hey,
> > > > >
> > > > > This question is mainly targeted towards Aljoscha but maybe someone
> > can
> > > > > help me out here:
> > > > >
> > > > > I think the way feedback partitioning is handled does not work, let
> > me
> > > > > illustrate with a simple example:
> > > > >
> > > > > IterativeStream it = ... (parallelism 1)
> > > > > DataStream mapped = it.map(...) (parallelism 2)
> > > > > // this does not work as the feedback has parallelism 2 != 1
> > > > > // it.closeWith(mapped.partitionByHash(someField))
> > > > > // so we need rebalance the data
> > > > >
> > > > >
> > > >
> > >
> >
> it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
> > > > >
> > > > > This program will execute but the feedback will not be partitioned
> by
> > > > hash
> > > > > to the mapper instances:
> > > > > The partitioning will be set from the noOpMap to the iteration sink
> > > which
> > > > > has parallelism different from the mapper (1 vs 2) and then the
> > > iteration
> > > > > source forwards the element to the mapper (always to 0).
> > > > >
> > > > > So the problem is basically that the iteration source/sink pair
> gets
> > > the
> > > > > parallelism of the input stream (p=1) not the head operator (p = 2)
> > > which
> > > > > leads to incorrect partitioning.
> > > > >
> > > > > Did I miss something here?
> > > > >
> > > > > Cheers,
> > > > > Gyula
> > > > >
> > > > >
> > > >
> > >
> >
>