Arvid Heise created FLINK-22686:
----------------------------------- Summary: Incompatible subtask mappings while resuming from unaligned checkpoints Key: FLINK-22686 URL: https://issues.apache.org/jira/browse/FLINK-22686 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.13.0 Reporter: Arvid Heise A user [reported|https://lists.apache.org/x/list.html?user@...:lte=1M:Flink%201.13.0%20reactive%20mode:%20Job%20stop%20and%20cannot%20restore%20from%20checkpoint] that he encountered an internal error while resuming during reactive mode. There isn't an immediate connection to reactive mode, so it's safe to assume that one rescaling case was not covered. {noformat} Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: are multiple operators ingesting/producing intermediate results with varying degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70, 77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182, 189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106, 113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9, 16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135, 142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45, 52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164, 171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81, 88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193, 200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110, 117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13, 20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139, 146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}. at org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:986) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$25(AdaptiveScheduler.java:976) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] at java.lang.Thread.run(Thread.java:834) ~[?:?] {noformat} Here it seems that the same gate gets input from a range-partitioned and a round-robin partitioned channel at the same time. During the implementation of FLINK-19801, we couldn't find such a case and optimized the implementation accordingly. We have asked the user to provide his topology. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |