Caizhi Weng created FLINK-16543:
-----------------------------------
Summary: Support setting schedule mode by config for Blink planner in batch mode
Key: FLINK-16543
URL:
https://issues.apache.org/jira/browse/FLINK-16543 Project: Flink
Issue Type: Improvement
Components: Runtime / Configuration, Table SQL / Runtime
Reporter: Caizhi Weng
Currently Blink planner is bound to use the {{LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST}} schedule mode in batch mode. It is hard coded in the {{ExecutorUtils.setBatchProperties}} method.
{code:java}
public static void setBatchProperties(StreamGraph streamGraph, TableConfig tableConfig) {
streamGraph.getStreamNodes().forEach(
sn -> sn.setResources(ResourceSpec.UNKNOWN, ResourceSpec.UNKNOWN));
streamGraph.setChaining(true);
streamGraph.setAllVerticesInSameSlotSharingGroupByDefault(false);
streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
streamGraph.setStateBackend(null);
if (streamGraph.getCheckpointConfig().isCheckpointingEnabled()) {
throw new IllegalArgumentException("Checkpoint is not supported for batch jobs.");
}
if (ExecutorUtils.isShuffleModeAllBatch(tableConfig)) {
streamGraph.setBlockingConnectionsBetweenChains(true);
}
}
{code}
By under certain use cases where execution time is short, especially under OLAP use cases, {{LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST}} might not be the best choice, as it will cause data to be spilled onto disks when shuffling. Under such use cases, {{EAGER}} schedule mode with {{PIPELINED}} shuffle mode is preferred.
Currently we can set shuffle mode by the {{table.exec.shuffle-mode}} table config, and we would like to add another config to change the schedule mode for Blink planner in batch mode.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)