Multiple rebalances are incorrectly ignored in some cases.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple rebalances are incorrectly ignored in some cases.

David Morávek
Hello Flinkers,

we have run into unexpected behaviour with chained Reshuffles in Apache
Beam's Flink runner (batch).

In flink optimizer, when we `.rebalance()` dataset, is output channel is
marked as `FORCED_REBALANCED`. When we chain this with another
`.rebalance()`, the latter is ignored because it's source is already
`FORCED_REBALANCED`, thus requested property is met. This is correct
beaviour because rebalance is idempotent.

When we include `flatMap` in between rebalances ->
`.rebalance().flatMap(...).rebalance()`, we need to reshuffle again,
because dataset distribution may have changed (eg. you can possibli emit
unbouded stream from a single element). Unfortunatelly `flatMap` output is
still incorrectly marked as `FORCED_REBALANCED` and the second reshuffle
gets ignored.

We have worked around this by replacing repartition with identity map
function with Optimizer.HINT_SHIP_STRATEGY_REPARTITION.

I have a feeling that this is just a workaround and should be fixed in
flink optimizer itself.

Relavant Beam jira - https://issues.apache.org/jira/browse/BEAM-9824

WDYT?

Thanks,

D.
Reply | Threaded
Open this post in threaded view
|

Re: Multiple rebalances are incorrectly ignored in some cases.

Aljoscha Krettek-2
On 27.04.20 09:34, David Morávek wrote:

> When we include `flatMap` in between rebalances ->
> `.rebalance().flatMap(...).rebalance()`, we need to reshuffle again,
> because dataset distribution may have changed (eg. you can possibli emit
> unbouded stream from a single element). Unfortunatelly `flatMap` output is
> still incorrectly marked as `FORCED_REBALANCED` and the second reshuffle
> gets ignored.

This indeed seems incorrect. Did you look into the Flink code to see why
the output of the flatMap is `FORCED_REBALANCED`?

Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: Multiple rebalances are incorrectly ignored in some cases.

David Morávek-2
Hello Aljoscha,

unfortunately not, I'm not really familiar with the optimizer code and it's
really complex to debug :(

this method is as far as I got -
https://github.com/apache/flink/blob/master/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java#L301

D.


On Mon, Apr 27, 2020 at 11:24 AM Aljoscha Krettek <[hidden email]>
wrote:

> On 27.04.20 09:34, David Morávek wrote:
>
> > When we include `flatMap` in between rebalances ->
> > `.rebalance().flatMap(...).rebalance()`, we need to reshuffle again,
> > because dataset distribution may have changed (eg. you can possibli emit
> > unbouded stream from a single element). Unfortunatelly `flatMap` output
> is
> > still incorrectly marked as `FORCED_REBALANCED` and the second reshuffle
> > gets ignored.
>
> This indeed seems incorrect. Did you look into the Flink code to see why
> the output of the flatMap is `FORCED_REBALANCED`?
>
> Aljoscha
>