Iaroslav Zeigerman created FLINK-21548:
------------------------------------------ Summary: keyBy operation produces skewed event distribution with low-cardinality keys Key: FLINK-21548 URL: https://issues.apache.org/jira/browse/FLINK-21548 Project: Flink Issue Type: Bug Components: API / DataStream, Runtime / Coordination, Runtime / Task Affects Versions: 1.12.1, 1.11.0 Reporter: Iaroslav Zeigerman Attachments: Screen Shot 2021-03-01 at 10.52.31 AM.png, Screen Shot 2021-03-01 at 10.54.42 AM.png, Screen Shot 2021-03-01 at 10.57.33 AM.png When the cardinality of keys matches the existing parallelism not all downstream tasks are utilized in the downstream operator. Even those that are utilized are not utilized evenly. For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out of 500) will receive any records at all. This behavior can easily be reproduced with the following test case: {code:scala} import org.apache.flink.runtime.state.KeyGroupRangeAssignment import scala.util.Random object Test { val parallelism = 500 val recordsNum = 1000000 def run(): Unit = { val recordIds = (0 to recordsNum).map(_ % parallelism) val tasks = recordIds.map(selectTask) println(s"Total unique keys: ${recordIds.toSet.size}") println(s"Key distribution: ${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}") println("=======================") println(s"Tasks involved: ${tasks.toSet.size}") println(s"Record distribution by task: ${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}") } def selectTask(key: Int): Int = KeyGroupRangeAssignment.assignToKeyGroup( key, parallelism ) } {code} Which produces the following results: {noformat} Total unique keys: 500 Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000)) ======================= Tasks involved: 313 Record distribution by task: Vector((147,10000), (248,10000), ..., (232,2000), (100,2000)) {noformat} Record distribution visualized: !Screen Shot 2021-03-01 at 10.52.31 AM.png! I have determined that in order to achieve the utilization of all tasks the number of unique keys should be at least 5 times of the parallelism value. The relation between number of unique keys and a fraction of utilized tasks appear to be exponential: !Screen Shot 2021-03-01 at 10.54.42 AM.png! But with 5x number of keys the skew is still quite significant: !Screen Shot 2021-03-01 at 10.57.33 AM.png! Given that keys used in my test are integer values for which `hashCode` returns the value itself I tend to believe that the skew is caused by the Flink's murmur hash implementation which is used [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76]. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |