Parallelizing ExecutionConfig.fromCollection

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Parallelizing ExecutionConfig.fromCollection

Greg Hogan
Hi,

CollectionInputFormat currently enforces a parallelism of 1 by implementing
NonParallelInput and serializing the entire Collection. If my understanding
is correct this serialized InputFormat is often the cause of a new job
exceeding the akka message size limit.

As an alternative the Collection elements could be serialized into multiple
InputSplits. Has this idea been considered and rejected?

Thanks,
Greg
Reply | Threaded
Open this post in threaded view
|

Re: Parallelizing ExecutionConfig.fromCollection

Till Rohrmann
Hi Greg,

I think we haven't discussed the opportunity for a parallelized collection
input format, yet. Thanks for bringing this up.

I think it should be possible to implement a generic parallel collection
input format. However, I have two questions here:

1. Is it really a problem for users that their job exceeds the akka frame
size limit when using the collection input format? The collection input
format should be used primarily for testing and, thus, the data should be
rather small.

2. Which message is causing the frame size problem? If it is the task
deployment descriptor, then a parallelized collection input format which
works with input splits can solve the problem. If the problem is rather the
`SubmitJob` message, then we have to tackle the problem differently. The
reason is that the input splits are only created on the `JobManager`.
Before, the collection is simply written into the task config of the data
source `JobVertex`, because we don't know the number of sub tasks yet. In
the latter case, which can also be cause by large closure objects, we
should send the job via the blob manager to the `JobManager` to solve the
problem.

Cheers,
Till

On Mon, Apr 25, 2016 at 3:45 PM, Greg Hogan <[hidden email]> wrote:

> Hi,
>
> CollectionInputFormat currently enforces a parallelism of 1 by implementing
> NonParallelInput and serializing the entire Collection. If my understanding
> is correct this serialized InputFormat is often the cause of a new job
> exceeding the akka message size limit.
>
> As an alternative the Collection elements could be serialized into multiple
> InputSplits. Has this idea been considered and rejected?
>
> Thanks,
> Greg
>
Reply | Threaded
Open this post in threaded view
|

Re: Parallelizing ExecutionConfig.fromCollection

Greg Hogan
Hi Till,

I appreciate the detailed explanation. My specific case has been with the
graph generators. I think it is possible to implement some random sources
using SplittableIterator rather than building a Collection, so it might be
best to rework the graph generator API to better fit the Flink model. For
LCGs we can simply build a skip-ahead table.

Greg

On Mon, Apr 25, 2016 at 10:05 AM, Till Rohrmann <[hidden email]>
wrote:

> Hi Greg,
>
> I think we haven't discussed the opportunity for a parallelized collection
> input format, yet. Thanks for bringing this up.
>
> I think it should be possible to implement a generic parallel collection
> input format. However, I have two questions here:
>
> 1. Is it really a problem for users that their job exceeds the akka frame
> size limit when using the collection input format? The collection input
> format should be used primarily for testing and, thus, the data should be
> rather small.
>
> 2. Which message is causing the frame size problem? If it is the task
> deployment descriptor, then a parallelized collection input format which
> works with input splits can solve the problem. If the problem is rather the
> `SubmitJob` message, then we have to tackle the problem differently. The
> reason is that the input splits are only created on the `JobManager`.
> Before, the collection is simply written into the task config of the data
> source `JobVertex`, because we don't know the number of sub tasks yet. In
> the latter case, which can also be cause by large closure objects, we
> should send the job via the blob manager to the `JobManager` to solve the
> problem.
>
> Cheers,
> Till
>
> On Mon, Apr 25, 2016 at 3:45 PM, Greg Hogan <[hidden email]> wrote:
>
> > Hi,
> >
> > CollectionInputFormat currently enforces a parallelism of 1 by
> implementing
> > NonParallelInput and serializing the entire Collection. If my
> understanding
> > is correct this serialized InputFormat is often the cause of a new job
> > exceeding the akka message size limit.
> >
> > As an alternative the Collection elements could be serialized into
> multiple
> > InputSplits. Has this idea been considered and rejected?
> >
> > Thanks,
> > Greg
> >
>