Zhilong Hong created FLINK-21920:
------------------------------------
Summary: Optimize DefaultScheduler#allocateSlots
Key: FLINK-21920
URL:
https://issues.apache.org/jira/browse/FLINK-21920 Project: Flink
Issue Type: Sub-task
Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
Fix For: 1.13.0
Based on the scheduler benchmark introduced in FLINK-21731, we find that there are several procedures related to {{DefaultScheduler#allocateSlots}} have O(N^2) complexity.
The first one is: {{ExecutionSlotSharingGroupBuilder#tryFindAvailableProducerExecutionSlotSharingGroupFor}}. The original implementation is:
{code:java}
for all SchedulingExecutionVertex in DefaultScheduler:
for all consumed SchedulingResultPartition of the SchedulingExecutionVertex:
get the result partition's producer vertex and determine the ExecutionSlotSharingGroup where the producer vertex locates is available for current vertex{code}
This procedure has O(N^2) complexity.
It's obvious that the result partitions in the same ConsumedPartitionGroup have the same producer vertex. So we can just iterate over the ConsumedPartitionGroups instead of all the consumed partitions. This will decrease the complexity from O(N^2) to O(N).
The second one is: {{ExecutionGraphToInputsLocationsRetrieverAdapter#getConsumedResultPartitionsProducers}}. The original implementation is:
{code:java}
for all SchedulingExecutionVertex in DefaultScheduler:
for all ConsumedPartitionGroup of the SchedulingExecutionVertex:
for all IntermediateResultPartition in the ConsumedPartitionGroup:
get producer of the IntermediateResultPartition {code}
This procedure has O(N^2) complexity.
We can see that for each SchedulingExecutionVertex, the producers of its ConsumedPartitionGroup is calculated separately. For SchedulingExecutionVertices in the same ConsumerVertexGroup, they have the same ConsumedPartitionGroup. Thus, we don't need to calculate the producers over and over again. We can use a local cache to cache the producers. This will decrease the complexity from O(N^2) to O(N).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)