Xintong Song created FLINK-19151:
------------------------------------ Summary: Flink does not normalize container resource with correct configurations when Yarn FairScheduler is used Key: FLINK-19151 URL: https://issues.apache.org/jira/browse/FLINK-19151 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.11.2 Reporter: Xintong Song h3. Problem It's a Yarn protocol that the requested container resource will be normalized for allocation. That means, the allocated container may have different resource (larger than or equal to) compared to requested. Currently, Flink matches the allocated containers to the original requests by reading the Yarn configurations and calculate how the requested resources should be normalized. What has been overlooked is that, Yarn FairScheduler (and its subclass SLSFairScheduler) has overridden the normalization behavior. To be specific, * By default, Yarn normalize container resources to integer multiple of "yarn.scheduler.minimum-allocation-[mb|vcores]" * FairScheduler normalize container resources to integer multiple of "yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while making sure the resource is no less than "yarn.scheduler.minimum-allocation-[mb|vcores]" h3. Proposal for short term solution To fix this problem, a quick and easy way is to also read Yarn configuration and learn which scheduler is used, and perform normalization calculations accordingly. This should be good enough to cover behaviors of all the schedulers that Yarn currently provides. The limitation is that, Flink will not be able to deal with custom Yarn schedulers which overrides the normalization behaviors. h3. Proposal for long term solution For long term, it would be good to use Yarn ContainerRequest#allocationRequestId to match the allocated containers with the original requests, so that Flink no longer needs to understand how Yarn normalize container resources. Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution would not work at the moment. Another idea is to support various Hadoop versions with different container matching logics. We can abstract the container matching logics into a dedicating component, and provide different implementations for it. This will allow Flink to take advantages of the new versions (e.g., work well with custom schedulers), while stay compatible with the old versions with without those advantages. Given that we need the resource based matching anyway for the old Hadoop versions, and the cost for maintaining two sets of matching logics, I tend to think this approach as a back-up option to be worked on when we indeed see a need for it. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |