|
Hi all,
I’m currently working in my research on a variant of Flink which allows to assign subpartitions while runtime to TaskExecutors (just having a look at joins for the moment). In this variant, I made some changes to the ResultPartitionDeploymentDescriptor which lead to the creation of many more subpartitions than InputChannels. (eg. usually we would have 4 subpartitions and 4 InputChannels with 4 receivers - let’s now imagine we have 40 subpartitions, 4 InputChannels and 4 receivers)
Without further changes, the receivers request their single subpartition - receive it and are done. Only 4 of 40 subpartitions are consumed and also only 1/10 of the data is transferred. (This setup runs without a problem. Nevertheless it’s of course not desired to read only 10% of the data!)
Now I would like to decide while execution to give receivers the additional 36 subpartitions to consume.
My idea was to let the receivers still think receiving their initial subpartition index and add the data for other subpartitions on sender side.
I know that we have to separate between local and remote subpartitions.
In case of local subpartitions this is the SingleInputGate. It usually holds multiple InputChannels. For the local case, the InputChannel is a LocalInputChannel. This LocalInputChannel itself holds one PipelinedSubpartitionView.
Here I add more views for additional subpartition indices on the fly.
In the end, buffers from different subpartitions arrive and are deserialized by the AbstractRecordReader.
Unfortunately I’m loosing data when consuming concurrently from several subpartitions when using a low over partitioning factor. Have not identified why yet…
But more problematic: When reading records which are taller than only some Integer values, I get Exceptions while deserialization. That’s probably the case as soon as one record does not match into one buffer anymore. Am I right?
If that is the case, instead of adding PipelinedSubpartitionViews - I should add complete new InputGates for subpartition assignments. (having new InputChannels, deserializers …)
The InputGates are already created with task instantiation and are described by the InputGateDeploymentDescriptor.
The challenge is now to add SingleInputGates to a running task.
Maybe someone has an idea where to start with this or has any other thoughts - maybe there is an other solution I’m missing.
Thank you.
Benjamin Burkhardt
|