Adjusting numberOfSubtasks at runtime

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Adjusting numberOfSubtasks at runtime

Márton Balassi
Hi,

Apache Storm has this cool feature that one can "rebalance a topology"
during runtime to handle the changes in the streaming workload. Practically
this means adjusting the number of worker machines and the parallelism of
the spouts and bolts.

Translated to Flink terms this would mean changing the numberOfSubtasks
field of the AbstractJobVertices of the JobGraph during runtime (amongst
others, e.g. updating the numberOfOutputChannels in the partitioners
accordingly).

Is there any support for achieving this currently?

Cheers,

Marton
Reply | Threaded
Open this post in threaded view
|

Re: Adjusting numberOfSubtasks at runtime

Márton Balassi
Hey,

Any comments on this please? :)

Thanks.


On Tue, Aug 12, 2014 at 8:10 AM, Márton Balassi <[hidden email]>
wrote:

> Hi,
>
> Apache Storm has this cool feature that one can "rebalance a topology"
> during runtime to handle the changes in the streaming workload. Practically
> this means adjusting the number of worker machines and the parallelism of
> the spouts and bolts.
>
> Translated to Flink terms this would mean changing the numberOfSubtasks
> field of the AbstractJobVertices of the JobGraph during runtime (amongst
> others, e.g. updating the numberOfOutputChannels in the partitioners
> accordingly).
>
> Is there any support for achieving this currently?
>
> Cheers,
>
> Marton
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Adjusting numberOfSubtasks at runtime

Stephan Ewen
Hey Marton!

Sorry for the late answer.

I think this would be a cool feature. It would definitely require work in
the representation and transfer of intermediate results. Ufuk is reworking
those. He is currently on vacation and will probably comment when he is
back.

We can definitely keep that in mind when working on the network stack,
though I feel we have to wait for the base functionality there to be in
place.

A few questions on how that should work:

 - Would this have to be a coordinated action across workers, or can
workers individually do that?

 - How does it work with stateful operators? The state needs repartitioning
as well.

 - Do all workers need to quiesce their inputs, and trigger a finer
partitioning? Or do you partition rather fine before, and change the
assignment of worker on partitions?

Greetings,
Stephan




On Tue, Aug 19, 2014 at 12:43 PM, Márton Balassi <[hidden email]>
wrote:

> Hey,
>
> Any comments on this please? :)
>
> Thanks.
>
>
> On Tue, Aug 12, 2014 at 8:10 AM, Márton Balassi <[hidden email]>
> wrote:
>
> > Hi,
> >
> > Apache Storm has this cool feature that one can "rebalance a topology"
> > during runtime to handle the changes in the streaming workload.
> Practically
> > this means adjusting the number of worker machines and the parallelism of
> > the spouts and bolts.
> >
> > Translated to Flink terms this would mean changing the numberOfSubtasks
> > field of the AbstractJobVertices of the JobGraph during runtime (amongst
> > others, e.g. updating the numberOfOutputChannels in the partitioners
> > accordingly).
> >
> > Is there any support for achieving this currently?
> >
> > Cheers,
> >
> > Marton
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Adjusting numberOfSubtasks at runtime

Márton Balassi
 - Would this have to be a coordinated action across workers, or can
workers individually do that?

There are two main approaches in other frameworks currently:

One that e.g. Storm applies is when the user has to initiate a rebalance on
their term and thus is a globally coordinated action. I am generally
against coordinated actions in a streaming environment if they can be
avoided (as you would normally use this when your workers are already well
or even overutilized) - but this is a fine approach for the first try.

A more advanced solution can also do this automatically based on predefined
load balance thresholds. E.g. sg similar to the way Amazon Kinesis solves
this: for splitting each JobVertex has a hash function producing a
(preferably large) number of buckets and each worker picks up there share
of buckets from a global broker. The buckets can be redistributed across
workers to manage load balance, a new worker can also be added or removed
elastically in this approach. It has it's pitfalls when a bucket is too
busy to be handled by a single machine or when you have too few buckets -
but then usually either the topology should be reconsidered or the hash
function is not the best suit. For a bit more info on this please refer to
the following talk from 21 mins:
http://www.infoq.com/presentations/big-data-aws-kinesis
Just to highlight in this scenario a single worker should be able to
initiate the need for a new worker.

 - How does it work with stateful operators? The state needs repartitioning
as well.

Yes, in this approach state would have to be associated with those buckets.
The speed with which it can be moved can easily be a bottleneck.

 - Do all workers need to quiesce their inputs, and trigger a finer
partitioning? Or do you partition rather fine before, and change the
assignment of worker on partitions?

I'd prefer the rather fine prior partitioning. Of course you can not always
do that, e.g. when you are doing a WordCount a corpus with only a few words
occurring (but with arbitrary frequency) you can still reach a load after
the groupBy operations in the Counter the exceeds your threshold even for
counting the occurrences of a single word - then there is no other
sufficient option but to rewrite the topology to deal with that.



On Tue, Aug 19, 2014 at 1:07 PM, Stephan Ewen <[hidden email]> wrote:

> Hey Marton!
>
> Sorry for the late answer.
>
> I think this would be a cool feature. It would definitely require work in
> the representation and transfer of intermediate results. Ufuk is reworking
> those. He is currently on vacation and will probably comment when he is
> back.
>
> We can definitely keep that in mind when working on the network stack,
> though I feel we have to wait for the base functionality there to be in
> place.
>
> A few questions on how that should work:
>
>  - Would this have to be a coordinated action across workers, or can
> workers individually do that?
>
>  - How does it work with stateful operators? The state needs repartitioning
> as well.
>
>  - Do all workers need to quiesce their inputs, and trigger a finer
> partitioning? Or do you partition rather fine before, and change the
> assignment of worker on partitions?
>
> Greetings,
> Stephan
>
>
>
>
> On Tue, Aug 19, 2014 at 12:43 PM, Márton Balassi <[hidden email]
> >
> wrote:
>
> > Hey,
> >
> > Any comments on this please? :)
> >
> > Thanks.
> >
> >
> > On Tue, Aug 12, 2014 at 8:10 AM, Márton Balassi <
> [hidden email]>
> > wrote:
> >
> > > Hi,
> > >
> > > Apache Storm has this cool feature that one can "rebalance a topology"
> > > during runtime to handle the changes in the streaming workload.
> > Practically
> > > this means adjusting the number of worker machines and the parallelism
> of
> > > the spouts and bolts.
> > >
> > > Translated to Flink terms this would mean changing the numberOfSubtasks
> > > field of the AbstractJobVertices of the JobGraph during runtime
> (amongst
> > > others, e.g. updating the numberOfOutputChannels in the partitioners
> > > accordingly).
> > >
> > > Is there any support for achieving this currently?
> > >
> > > Cheers,
> > >
> > > Marton
> > >
> > >
> >
>