Asterios Katsifodimos created FLINK-1100:
-------------------------------------------- Summary: Optimization oportunity missed Key: FLINK-1100 URL: https://issues.apache.org/jira/browse/FLINK-1100 Project: Flink Issue Type: Improvement Components: Optimizer Affects Versions: 0.6-incubating, 0.7-incubating Reporter: Asterios Katsifodimos Priority: Minor The Optimizer does not see an optimization opportunity. The program I used is the transitive closure of v0.7-incubation and replaced the groupBy.reduce with a simple distinct. The resulting plan (JSON here: https://gist.github.com/asteriosk/7a04cfd19537395eb401, also in the end of the bug, misses an optimization opportunity: the sorted groupReduce can receive an input partitioned on field 0 and sort on 1 in order to apply the distinct function. As a result, the partitioning (on 0) can be reused to forward the results to the input of the next iteration instead of repartitioning. {code:javascript} { "nodes": [ { "id": 2, "type": "source", "pact": "Data Source", "contents": "[(1, 2), (2, 3), (2, 4), (3, 5), (6, 7), (8, 9), (8, 10), (5, 11), (11, 12), (10, 13), (9, 14), (13,", "parallelism": "1", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "step_function": [ { "id": 6, "type": "pact", "pact": "Bulk Partial Solution", "contents": "Partial Solution", "parallelism": "4", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "[1:ASC]" }, { "name": "Grouped on", "value": "[1]" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 5, "type": "pact", "pact": "Join", "contents": "org.apache.flink.api.java.operators.JoinOperator$DefaultJoin$WrappingFlatJoinFunction", "parallelism": "4", "predecessors": [ {"id": 6, "side": "first", "ship_strategy": "Hash Partition on [1]"}, {"id": 2, "side": "second", "ship_strategy": "Hash Partition on [0]"} ], "driver_strategy": "Hybrid Hash (CACHED) (build: [(1, 2), (2, 3), (2, 4), (3, 5), (6, 7), (8, 9), (8, 10), (5, 11), (11, 12), (10, 13), (9, 14), (13,)", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "(unknown)" }, { "name": "Disk I/O", "value": "(unknown)" }, { "name": "CPU", "value": "(unknown)" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 4, "type": "pact", "pact": "Union", "contents": "", "parallelism": "4", "predecessors": [ {"id": 5, "side": "first", "ship_strategy": "Hash Partition on [0, 1]"}, {"id": 6, "side": "second", "ship_strategy": "Hash Partition on [0, 1]"} ], "global_properties": [ { "name": "Partitioning", "value": "HASH_PARTITIONED" }, { "name": "Partitioned on", "value": "[0, 1]" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 3, "type": "pact", "pact": "GroupReduce", "contents": "org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction", "parallelism": "4", "predecessors": [ {"id": 5, "ship_strategy": "Hash Partition on [0, 1]"}, {"id": 6, "ship_strategy": "Hash Partition on [0, 1]"} ], "driver_strategy": "Sorted Group Reduce", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "(unknown)" }, { "name": "CPU", "value": "(unknown)" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] } ], "partial_solution": 6, "next_partial_solution": 3, "id": 1, "type": "bulk_iteration", "pact": "Bulk Iteration", "contents": "Bulk Iteration", "parallelism": "4", "predecessors": [ {"id": 2, "ship_strategy": "Redistribute", "local_strategy": "Sort on [1:ASC]"} ], "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "(unknown)" }, { "name": "Disk I/O", "value": "(unknown)" }, { "name": "CPU", "value": "(unknown)" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 0, "type": "sink", "pact": "Data Sink", "contents": "Print to System.out", "parallelism": "4", "predecessors": [ {"id": 1, "ship_strategy": "Forward"} ], "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] } ] } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) |
Free forum by Nabble | Edit this page |